Skip to content

Commit

Permalink
Add block metadata to pipeline (#1258)
Browse files Browse the repository at this point in the history
  • Loading branch information
shiqizng authored Oct 25, 2022
1 parent ace1808 commit 99a848c
Show file tree
Hide file tree
Showing 20 changed files with 742 additions and 376 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@ coverage.txt

# asdf
.tool-versions

# conduit example
cmd/conduit/data
4 changes: 3 additions & 1 deletion cmd/algorand-indexer/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,9 @@ func runDaemon(daemonConfig *daemonConfig) error {

func makeConduitConfig(dCfg *daemonConfig) conduit.PipelineConfig {
return conduit.PipelineConfig{
ConduitConfig: &conduit.Config{},
ConduitConfig: &conduit.Config{
ConduitDataDir: dCfg.indexerDataDir,
},
PipelineLogLevel: logger.GetLevel().String(),
Importer: conduit.NameConfigPair{
Name: "algod",
Expand Down
3 changes: 2 additions & 1 deletion conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package conduit

import (
"fmt"
"strings"

"github.com/algorand/go-algorand/util"
"github.com/spf13/pflag"
"strings"
)

// DefaultConfigName is the default conduit configuration filename.
Expand Down
312 changes: 312 additions & 0 deletions conduit/gen/lookup.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions conduit/init_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ type PipelineInitProvider struct {
genesis *bookkeeping.Genesis
}

// Genesis produces genesis pointer
func (a *PipelineInitProvider) Genesis() *bookkeeping.Genesis {
// GetGenesis produces genesis pointer
func (a *PipelineInitProvider) GetGenesis() *bookkeeping.Genesis {
return a.genesis
}

Expand Down
147 changes: 113 additions & 34 deletions conduit/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@ package conduit

import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/algorand/indexer/util/metrics"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"os"
"path"
"path/filepath"
"runtime/pprof"
"sync"
"time"

"github.com/algorand/go-algorand-sdk/encoding/json"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/indexer/util/metrics"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"gopkg.in/yaml.v3"

"github.com/algorand/indexer/data"
"github.com/algorand/indexer/exporters"
Expand Down Expand Up @@ -145,7 +149,15 @@ type pipelineImpl struct {
exporter *exporters.Exporter
completeCallback []OnCompleteFunc

round basics.Round
pipelineMetadata PipelineMetaData
pipelineMetadataFilePath string
}

// PipelineMetaData contains the metadata for the pipeline
type PipelineMetaData struct {
GenesisHash string `json:"genesis-hash"`
Network string `json:"network"`
NextRound uint64 `json:"next-round"`
}

func (p *pipelineImpl) Error() error {
Expand Down Expand Up @@ -203,62 +215,79 @@ func (p *pipelineImpl) Init() error {

// TODO Need to change interfaces to accept config of map[string]interface{}

// Initialize Exporter first since the pipeline derives its round from the Exporter
exporterLogger := log.New()
// Make sure we are thread-safe
exporterLogger.SetOutput(p.logger.Out)
exporterLogger.SetFormatter(makePluginLogFormatter(plugins.Exporter, (*p.exporter).Metadata().Name()))

jsonEncode := string(json.Encode(p.cfg.Exporter.Config))
err := (*p.exporter).Init(p.ctx, plugins.PluginConfig(jsonEncode), exporterLogger)
exporterName := (*p.exporter).Metadata().Name()
if err != nil {
return fmt.Errorf("Pipeline.Start(): could not initialize Exporter (%s): %w", exporterName, err)
}
p.logger.Infof("Initialized Exporter: %s", exporterName)

// Initialize Importer
importerLogger := log.New()
// Make sure we are thread-safe
importerLogger.SetOutput(p.logger.Out)
importerName := (*p.importer).Metadata().Name()
importerLogger.SetFormatter(makePluginLogFormatter(plugins.Importer, importerName))

// TODO modify/fix ?
jsonEncode = string(json.Encode(p.cfg.Importer.Config))
genesis, err := (*p.importer).Init(p.ctx, plugins.PluginConfig(jsonEncode), importerLogger)

configs, err := yaml.Marshal(p.cfg.Importer.Config)
if err != nil {
return fmt.Errorf("Pipeline.Start(): could not serialize Importer.Config: %w", err)
}
genesis, err := (*p.importer).Init(p.ctx, plugins.PluginConfig(configs), importerLogger)
if err != nil {
return fmt.Errorf("Pipeline.Start(): could not initialize importer (%s): %w", importerName, err)
}
p.round = basics.Round((*p.exporter).Round())
err = (*p.exporter).HandleGenesis(*genesis)

// initialize or load pipeline metadata
gh := crypto.HashObj(genesis).String()
p.pipelineMetadata.GenesisHash = gh
p.pipelineMetadata.Network = string(genesis.Network)
p.pipelineMetadata, err = p.initializeOrLoadBlockMetadata()
if err != nil {
return fmt.Errorf("Pipeline.Start(): exporter could not handle genesis (%s): %w", exporterName, err)
return fmt.Errorf("Pipeline.Start(): could not read metadata: %w", err)
}
if p.pipelineMetadata.GenesisHash != gh {
return fmt.Errorf("Pipeline.Start(): genesis hash in metadata does not match expected value: actual %s, expected %s", gh, p.pipelineMetadata.GenesisHash)
}

p.logger.Infof("Initialized Importer: %s", importerName)

// Initialize Processors
// InitProvider
round := basics.Round(p.pipelineMetadata.NextRound)
var initProvider data.InitProvider = &PipelineInitProvider{
currentRound: &p.round,
currentRound: &round,
genesis: genesis,
}
p.initProvider = &initProvider

// Initialize Processors
for idx, processor := range p.processors {
processorLogger := log.New()
// Make sure we are thread-safe
processorLogger.SetOutput(p.logger.Out)
processorLogger.SetFormatter(makePluginLogFormatter(plugins.Processor, (*processor).Metadata().Name()))
jsonEncode = string(json.Encode(p.cfg.Processors[idx].Config))
err := (*processor).Init(p.ctx, *p.initProvider, plugins.PluginConfig(jsonEncode), processorLogger)
configs, err = yaml.Marshal(p.cfg.Processors[idx].Config)
if err != nil {
return fmt.Errorf("Pipeline.Start(): could not serialize Processors[%d].Config : %w", idx, err)
}
err := (*processor).Init(p.ctx, *p.initProvider, plugins.PluginConfig(configs), processorLogger)
processorName := (*processor).Metadata().Name()
if err != nil {
return fmt.Errorf("Pipeline.Start(): could not initialize processor (%s): %w", processorName, err)
}
p.logger.Infof("Initialized Processor: %s", processorName)
}

// Initialize Exporter
exporterLogger := log.New()
// Make sure we are thread-safe
exporterLogger.SetOutput(p.logger.Out)
exporterLogger.SetFormatter(makePluginLogFormatter(plugins.Exporter, (*p.exporter).Metadata().Name()))

configs, err = yaml.Marshal(p.cfg.Exporter.Config)
if err != nil {
return fmt.Errorf("Pipeline.Start(): could not serialize Exporter.Config : %w", err)
}
err = (*p.exporter).Init(p.ctx, *p.initProvider, plugins.PluginConfig(configs), exporterLogger)
exporterName := (*p.exporter).Metadata().Name()
if err != nil {
return fmt.Errorf("Pipeline.Start(): could not initialize Exporter (%s): %w", exporterName, err)
}
p.logger.Infof("Initialized Exporter: %s", exporterName)

// Register callbacks.
p.registerLifecycleCallbacks()
return err
Expand Down Expand Up @@ -325,9 +354,9 @@ func (p *pipelineImpl) Start() {
return
default:
{
p.logger.Infof("Pipeline round: %v", p.round)
p.logger.Infof("Pipeline round: %v", p.pipelineMetadata.NextRound)
// fetch block
blkData, err := (*p.importer).GetBlock(uint64(p.round))
blkData, err := (*p.importer).GetBlock(p.pipelineMetadata.NextRound)
if err != nil {
p.logger.Errorf("%v", err)
p.setError(err)
Expand Down Expand Up @@ -363,12 +392,13 @@ func (p *pipelineImpl) Start() {
}
importTime := time.Since(start)
// Ignore round 0 (which is empty).
if p.round > 0 {
if p.pipelineMetadata.NextRound > 0 {
p.addMetrics(blkData, importTime)
}
// Increment Round
p.setError(nil)
p.round++
p.pipelineMetadata.NextRound++
p.encodeMetadataToFile()
}
}

Expand All @@ -380,6 +410,55 @@ func (p *pipelineImpl) Wait() {
p.wg.Wait()
}

func (p *pipelineImpl) encodeMetadataToFile() error {
tempFilename := fmt.Sprintf("%s.temp", p.pipelineMetadataFilePath)
file, err := os.Create(tempFilename)
if err != nil {
return fmt.Errorf("encodeMetadataToFile(): failed to create temp metadata file: %w", err)
}
defer file.Close()
err = json.NewEncoder(file).Encode(p.pipelineMetadata)
if err != nil {
return fmt.Errorf("encodeMetadataToFile(): failed to write temp metadata: %w", err)
}

err = os.Rename(tempFilename, p.pipelineMetadataFilePath)
if err != nil {
return fmt.Errorf("encodeMetadataToFile(): failed to replace metadata file: %w", err)
}
return nil
}

func (p *pipelineImpl) initializeOrLoadBlockMetadata() (PipelineMetaData, error) {
p.pipelineMetadataFilePath = path.Join(p.cfg.ConduitConfig.ConduitDataDir, "metadata.json")
if stat, err := os.Stat(p.pipelineMetadataFilePath); errors.Is(err, os.ErrNotExist) || (stat != nil && stat.Size() == 0) {
if stat != nil && stat.Size() == 0 {
err = os.Remove(p.pipelineMetadataFilePath)
if err != nil {
return p.pipelineMetadata, fmt.Errorf("Init(): error creating file: %w", err)
}
}
err = p.encodeMetadataToFile()
if err != nil {
return p.pipelineMetadata, fmt.Errorf("Init(): error creating file: %w", err)
}
} else {
if err != nil {
return p.pipelineMetadata, fmt.Errorf("error opening file: %w", err)
}
var data []byte
data, err = os.ReadFile(p.pipelineMetadataFilePath)
if err != nil {
return p.pipelineMetadata, fmt.Errorf("error reading metadata: %w", err)
}
err = json.Unmarshal(data, &p.pipelineMetadata)
if err != nil {
return p.pipelineMetadata, fmt.Errorf("error reading metadata: %w", err)
}
}
return p.pipelineMetadata, nil
}

// MakePipeline creates a Pipeline
func MakePipeline(ctx context.Context, cfg *PipelineConfig, logger *log.Logger) (Pipeline, error) {

Expand Down
Loading

0 comments on commit 99a848c

Please sign in to comment.