Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

conduit: Convert daemon to conduit pipeline #1208

Merged
merged 5 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type ServerImplementation struct {

db idb.IndexerDb

fetcher error
dataError func() error

timeout time.Duration

Expand Down Expand Up @@ -139,8 +139,10 @@ func (si *ServerImplementation) MakeHealthCheck(ctx echo.Context) error {
errors = append(errors, fmt.Sprintf("database error: %s", health.Error))
}

if si.fetcher != nil && si.fetcher.Error() != "" {
errors = append(errors, fmt.Sprintf("fetcher error: %s", si.fetcher.Error()))
if si.dataError != nil {
if err := si.dataError(); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if err := si.dataError(); err != nil {
if si.dataError != nil && si.dataError() != nil {
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I've separated it out is that we need to add err to the set of known errors. We could just call si.dataError() again inside the block to reference the error, but because the pipeline will keep retrying it's possible that the first call would show an error but the second call might return nil.

This way we have a single call which either returns an error that is added to the health check or it's nil.

errors = append(errors, fmt.Sprintf("data error: %s", err))
}
}

return ctx.JSON(http.StatusOK, common.HealthCheckResponse{
Expand Down
4 changes: 2 additions & 2 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (e ExtraOptions) handlerTimeout() time.Duration {
}

// Serve starts an http server for the indexer API. This call blocks.
func Serve(ctx context.Context, serveAddr string, db idb.IndexerDb, fetcherError error, log *log.Logger, options ExtraOptions) {
func Serve(ctx context.Context, serveAddr string, db idb.IndexerDb, dataError func() error, log *log.Logger, options ExtraOptions) {
e := echo.New()
e.HideBanner = true

Expand Down Expand Up @@ -123,7 +123,7 @@ func Serve(ctx context.Context, serveAddr string, db idb.IndexerDb, fetcherError
api := ServerImplementation{
EnableAddressSearchRoundRewind: options.DeveloperMode,
db: db,
fetcher: fetcherError,
dataError: dataError,
timeout: options.handlerTimeout(),
log: log,
disabledParams: disabledMap,
Expand Down
184 changes: 68 additions & 116 deletions cmd/algorand-indexer/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,25 @@ import (
"path/filepath"
"runtime/pprof"
"strings"
"sync"
"syscall"
"time"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"

"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/go-algorand/util"

"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/indexer/api"
"github.com/algorand/indexer/api/generated/v2"
"github.com/algorand/indexer/conduit"
"github.com/algorand/indexer/config"
_ "github.com/algorand/indexer/exporters/postgresql"
"github.com/algorand/indexer/fetcher"
"github.com/algorand/indexer/idb"
"github.com/algorand/indexer/importer"
"github.com/algorand/indexer/processors"
"github.com/algorand/indexer/processors/blockprocessor"
_ "github.com/algorand/indexer/importers/algod"
_ "github.com/algorand/indexer/processors/blockprocessor"
iutil "github.com/algorand/indexer/util"
"github.com/algorand/indexer/util/metrics"
)

// GetConfigFromDataDir Given the data directory, configuration filename and a list of types, see if
Expand Down Expand Up @@ -313,19 +310,18 @@ func runDaemon(daemonConfig *daemonConfig) error {
}()
}

var bot fetcher.Fetcher
if daemonConfig.noAlgod {
logger.Info("algod block following disabled")
} else if daemonConfig.algodAddr != "" && daemonConfig.algodToken != "" {
bot, err = fetcher.ForNetAndToken(daemonConfig.algodAddr, daemonConfig.algodToken, logger)
maybeFail(err, "fetcher setup, %v", err)
} else if daemonConfig.algodDataDir != "" {
bot, err = fetcher.ForDataDir(daemonConfig.algodDataDir, logger)
maybeFail(err, "fetcher setup, %v", err)
} else {
if daemonConfig.algodDataDir != "" {
daemonConfig.algodAddr, daemonConfig.algodToken, _, err = fetcher.AlgodArgsForDataDir(daemonConfig.algodDataDir)
maybeFail(err, "algod data dir err, %v", err)
} else if daemonConfig.algodAddr == "" || daemonConfig.algodToken == "" {
// no algod was found
logger.Info("no algod was found, provide either --algod OR --algod-net and --algod-token to enable")
daemonConfig.noAlgod = true
}
if daemonConfig.noAlgod {
logger.Info("algod block following disabled")
}

opts := idb.IndexerDbOptions{}
if daemonConfig.noAlgod && !daemonConfig.allowMigration {
opts.ReadOnly = true
Expand All @@ -339,10 +335,13 @@ func runDaemon(daemonConfig *daemonConfig) error {

db, availableCh := indexerDbFromFlags(opts)
defer db.Close()
var wg sync.WaitGroup
if bot != nil {
wg.Add(1)
go runBlockImporter(ctx, daemonConfig, &wg, db, availableCh, bot, opts)
var dataError func() error
if daemonConfig.noAlgod != true {
pipeline := runConduitPipeline(ctx, availableCh, daemonConfig)
if pipeline != nil {
dataError = pipeline.Error
defer pipeline.Stop()
}
} else {
logger.Info("No block importer configured.")
}
Expand All @@ -352,61 +351,66 @@ func runDaemon(daemonConfig *daemonConfig) error {

options := makeOptions(daemonConfig)

api.Serve(ctx, daemonConfig.daemonServerAddr, db, bot, logger, options)
wg.Wait()
api.Serve(ctx, daemonConfig.daemonServerAddr, db, dataError, logger, options)
return err
}

func runBlockImporter(ctx context.Context, cfg *daemonConfig, wg *sync.WaitGroup, db idb.IndexerDb, dbAvailable chan struct{}, bot fetcher.Fetcher, opts idb.IndexerDbOptions) {
func makeConduitConfig(dCfg *daemonConfig) conduit.PipelineConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

return conduit.PipelineConfig{
ConduitConfig: &conduit.Config{},
PipelineLogLevel: logger.GetLevel().String(),
Importer: conduit.NameConfigPair{
Name: "algod",
Config: map[string]interface{}{
"netaddr": dCfg.algodAddr,
"token": dCfg.algodToken,
},
},
Processors: []conduit.NameConfigPair{
{
Name: "block_evaluator",
Config: map[string]interface{}{
"catchpoint": dCfg.catchpoint,
"indexer-data-dir": dCfg.indexerDataDir,
"algod-data-dir": dCfg.algodDataDir,
"algod-token": dCfg.algodToken,
"algod-addr": dCfg.algodAddr,
},
},
},
Exporter: conduit.NameConfigPair{
Name: "postgresql",
Config: map[string]interface{}{
"connection-string": postgresAddr,
"max-conn": dCfg.maxConn,
"test": dummyIndexerDb,
},
},
}

}

func runConduitPipeline(ctx context.Context, dbAvailable chan struct{}, dCfg *daemonConfig) conduit.Pipeline {
// Need to redefine exitHandler() for every go-routine
defer exitHandler()
defer wg.Done()

// Wait until the database is available.
<-dbAvailable

// Initial import if needed.
genesisReader := importer.GetGenesisFile(cfg.genesisJSONPath, bot.Algod(), logger)
genesis, err := iutil.ReadGenesis(genesisReader)
maybeFail(err, "Error reading genesis file")

_, err = importer.EnsureInitialImport(db, genesis)
maybeFail(err, "importer.EnsureInitialImport() error")

// sync local ledger
nextDBRound, err := db.GetNextRoundToAccount()
maybeFail(err, "Error getting DB round")

logger.Info("Initializing block import handler.")
imp := importer.NewImporter(db)

processorOpts := processors.BlockProcessorConfig{
Catchpoint: cfg.catchpoint,
IndexerDatadir: opts.IndexerDatadir,
AlgodDataDir: opts.AlgodDataDir,
AlgodToken: opts.AlgodToken,
AlgodAddr: opts.AlgodAddr,
}

logger.Info("Initializing local ledger.")
proc, err := blockprocessor.MakeBlockProcessorWithLedgerInit(ctx, logger, nextDBRound, &genesis, processorOpts, imp.ImportBlock)
if err != nil {
maybeFail(err, "blockprocessor.MakeBlockProcessor() err %v", err)
var pipeline conduit.Pipeline
var err error
pcfg := makeConduitConfig(dCfg)
if pipeline, err = conduit.MakePipeline(ctx, &pcfg, logger); err != nil {
logger.Errorf("%v", err)
panic(exit{1})
}

bot.SetNextRound(proc.NextRoundToProcess())
handler := blockHandler(&proc, imp.ImportBlock, 1*time.Second)
bot.SetBlockHandler(handler)

logger.Info("Starting block importer.")
err = bot.Run(ctx)
err = pipeline.Init()
if err != nil {
// If context is not expired.
if ctx.Err() == nil {
logger.WithError(err).Errorf("fetcher exited with error")
panic(exit{1})
}
logger.Errorf("%v", err)
panic(exit{1})
}
pipeline.Start()
return pipeline
}

// makeOptions converts CLI options to server options
Expand Down Expand Up @@ -473,55 +477,3 @@ func makeOptions(daemonConfig *daemonConfig) (options api.ExtraOptions) {

return
}

// blockHandler creates a handler complying to the fetcher block handler interface. In case of a failure it keeps
// attempting to add the block until the fetcher shuts down.
func blockHandler(proc *blockprocessor.BlockProcessor, blockHandler func(block *ledgercore.ValidatedBlock) error, retryDelay time.Duration) func(context.Context, *rpcs.EncodedBlockCert) error {
return func(ctx context.Context, block *rpcs.EncodedBlockCert) error {
for {
err := handleBlock(block, proc, blockHandler)
if err == nil {
// return on success.
return nil
}

// Delay or terminate before next attempt.
select {
case <-ctx.Done():
return err
case <-time.After(retryDelay):
break
}
}
}
}

func handleBlock(block *rpcs.EncodedBlockCert, proc *blockprocessor.BlockProcessor, handler func(block *ledgercore.ValidatedBlock) error) error {
start := time.Now()
f := blockprocessor.MakeBlockProcessorHandlerAdapter(proc, handler)
err := f(block)
if err != nil {
logger.WithError(err).Errorf(
"block %d import failed", block.Block.Round())
return fmt.Errorf("handleBlock() err: %w", err)
}
dt := time.Since(start)

// Ignore round 0 (which is empty).
if block.Block.Round() > 0 {
metrics.BlockImportTimeSeconds.Observe(dt.Seconds())
metrics.ImportedTxnsPerBlock.Observe(float64(len(block.Block.Payset)))
metrics.ImportedRoundGauge.Set(float64(block.Block.Round()))
txnCountByType := make(map[string]int)
for _, txn := range block.Block.Payset {
txnCountByType[string(txn.Txn.Type)]++
}
for k, v := range txnCountByType {
metrics.ImportedTxns.WithLabelValues(k).Set(float64(v))
}
}

logger.Infof("round r=%d (%d txn) imported in %s", block.Block.Round(), len(block.Block.Payset), dt.String())

return nil
}
74 changes: 2 additions & 72 deletions cmd/algorand-indexer/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,90 +2,20 @@ package main

import (
"bytes"
"context"
"errors"
"fmt"
"github.com/algorand/indexer/util"
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/stretchr/testify/assert"
"io/fs"
"os"
"path"
"path/filepath"
"sync"
"testing"
"time"

"github.com/sirupsen/logrus/hooks/test"
"github.com/spf13/pflag"
"github.com/stretchr/testify/assert"

"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/rpcs"

"github.com/algorand/indexer/config"
"github.com/algorand/indexer/processors/blockprocessor"
itest "github.com/algorand/indexer/util/test"
)

type mockImporter struct {
}

var errMockImportBlock = errors.New("invalid round blockCert.Block.Round(): 1234 nextRoundToProcess: 1")

func (imp *mockImporter) ImportBlock(vb *ledgercore.ValidatedBlock) error {
return nil
}

func TestImportRetryAndCancel(t *testing.T) {
// connect debug logger
nullLogger, hook := test.NewNullLogger()
logger = nullLogger

// cancellable context
ctx := context.Background()
cctx, cancel := context.WithCancel(ctx)

// create handler with mock importer and start, it should generate errors until cancelled.
imp := &mockImporter{}
ledgerLogger, _ := test.NewNullLogger()
l, err := itest.MakeTestLedger(ledgerLogger)
assert.NoError(t, err)
defer l.Close()
proc, err := blockprocessor.MakeBlockProcessorWithLedger(logger, l, imp.ImportBlock)
assert.Nil(t, err)
handler := blockHandler(&proc, imp.ImportBlock, 50*time.Millisecond)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
block := rpcs.EncodedBlockCert{
Block: bookkeeping.Block{
BlockHeader: bookkeeping.BlockHeader{
Round: 1234,
},
},
}
handler(cctx, &block)
}()

// accumulate some errors
for len(hook.Entries) < 5 {
time.Sleep(25 * time.Millisecond)
}

for _, entry := range hook.Entries {
assert.Equal(t, entry.Message, "block 1234 import failed")

tmpStr := entry.Data["error"].(error).Error()
assert.Contains(t, tmpStr, errMockImportBlock.Error())
}

// Wait for handler to exit.
cancel()
wg.Wait()
}

func createTempDir(t *testing.T) string {
dir, err := os.MkdirTemp("", "indexer")
if err != nil {
Expand Down
Loading