diff --git a/cmd/algorand-indexer/daemon.go b/cmd/algorand-indexer/daemon.go index d21fb26f3..9ca4324d3 100644 --- a/cmd/algorand-indexer/daemon.go +++ b/cmd/algorand-indexer/daemon.go @@ -243,7 +243,7 @@ var daemonCmd = &cobra.Command{ imp := importer.NewImporter(db) logger.Info("Initializing local ledger.") - proc, err := blockprocessor.MakeProcessorWithLedgerInit(ctx, logger, catchpoint, &genesis, nextDBRound, opts, imp.ImportBlock) + proc, err := blockprocessor.MakeProcessorWithLedgerInit(ctx, cf, logger, catchpoint, &genesis, nextDBRound, opts, imp.ImportBlock) if err != nil { maybeFail(err, "blockprocessor.MakeProcessor() err %v", err) } diff --git a/processor/blockprocessor/block_processor.go b/processor/blockprocessor/block_processor.go index 6ff2f1064..39571d9a9 100644 --- a/processor/blockprocessor/block_processor.go +++ b/processor/blockprocessor/block_processor.go @@ -39,7 +39,7 @@ func MakeProcessorWithLedger(l *ledger.Ledger, handler func(block *ledgercore.Va } // MakeProcessorWithLedgerInit creates a block processor and initializes the ledger. -func MakeProcessorWithLedgerInit(ctx context.Context, logger *log.Logger, catchpoint string, genesis *bookkeeping.Genesis, nextDBRound uint64, opts idb.IndexerDbOptions, handler func(block *ledgercore.ValidatedBlock) error) (processor.Processor, error) { +func MakeProcessorWithLedgerInit(ctx context.Context, cf context.CancelFunc, logger *log.Logger, catchpoint string, genesis *bookkeeping.Genesis, nextDBRound uint64, opts idb.IndexerDbOptions, handler func(block *ledgercore.ValidatedBlock) error) (processor.Processor, error) { if nextDBRound > 0 { if catchpoint != "" { round, _, err := ledgercore.ParseCatchpointLabel(catchpoint) @@ -56,7 +56,7 @@ func MakeProcessorWithLedgerInit(ctx context.Context, logger *log.Logger, catchp } } - err := InitializeLedgerSimple(ctx, logger, nextDBRound-1, &opts) + err := InitializeLedgerSimple(ctx, cf, logger, nextDBRound-1, &opts) if err != nil { return &blockProcessor{}, fmt.Errorf("MakeProcessorWithCatchup() slow catchup err: %w", err) } diff --git a/processor/blockprocessor/initialize.go b/processor/blockprocessor/initialize.go index a914e3ef9..96bb7c15f 100644 --- a/processor/blockprocessor/initialize.go +++ b/processor/blockprocessor/initialize.go @@ -4,9 +4,7 @@ import ( "context" "fmt" "os" - "os/signal" "path/filepath" - "syscall" "time" "github.com/algorand/go-algorand-sdk/client/v2/algod" @@ -26,18 +24,7 @@ import ( ) // InitializeLedgerSimple executes the migration core functionality. -func InitializeLedgerSimple(ctx context.Context, logger *log.Logger, round uint64, opts *idb.IndexerDbOptions) error { - ctx, cf := context.WithCancel(ctx) - defer cf() - { - cancelCh := make(chan os.Signal, 1) - signal.Notify(cancelCh, syscall.SIGTERM, syscall.SIGINT) - go func() { - <-cancelCh - logger.Errorf("Ledger initalization aborted") - os.Exit(1) - }() - } +func InitializeLedgerSimple(ctx context.Context, cf context.CancelFunc, logger *log.Logger, round uint64, opts *idb.IndexerDbOptions) error { var bot fetcher.Fetcher var err error @@ -91,41 +78,40 @@ func fullNodeCatchup(ctx context.Context, logger *log.Logger, round basics.Round if err != nil { return err } - // remove node directory after when exiting fast catchup mode - defer os.RemoveAll(filepath.Join(dataDir, genesis.ID())) - // stop node if indexer exits during ledger init - _, cf := context.WithCancel(ctx) - defer cf() - { - cancelCh := make(chan os.Signal, 1) - signal.Notify(cancelCh, syscall.SIGTERM, syscall.SIGINT) - go func() { - <-cancelCh - logger.Errorf("Ledger initalization aborted") - node.Stop() - os.Exit(1) - }() + node.Start() + defer func() { + node.Stop() + logger.Info("algod node stopped") + }() + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Second): + logger.Info("algod node running") } - node.Start() - time.Sleep(5 * time.Second) - logger.Info("algod node running") status, err := node.Status() + if err != nil { + return err + } node.StartCatchup(catchpoint) - // If the node isn't in fast catchup mode, catchpoint will be empty. + logger.Infof("Running fast catchup using catchpoint %s", catchpoint) for status.LastRound < round { - time.Sleep(2 * time.Second) - status, err = node.Status() - logger.Infof("Catchpoint Catchup Total Accounts %d ", status.CatchpointCatchupTotalAccounts) - logger.Infof("Catchpoint Catchup Processed Accounts %d ", status.CatchpointCatchupProcessedAccounts) - logger.Infof("Catchpoint Catchup Verified Accounts %d ", status.CatchpointCatchupVerifiedAccounts) - logger.Infof("Catchpoint Catchup Total Blocks %d ", status.CatchpointCatchupTotalBlocks) - logger.Infof("Catchpoint Catchup Acquired Blocks %d ", status.CatchpointCatchupAcquiredBlocks) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(2 * time.Second): + status, err = node.Status() + logger.Infof("Catchpoint Catchup Total Accounts %d ", status.CatchpointCatchupTotalAccounts) + logger.Infof("Catchpoint Catchup Processed Accounts %d ", status.CatchpointCatchupProcessedAccounts) + logger.Infof("Catchpoint Catchup Verified Accounts %d ", status.CatchpointCatchupVerifiedAccounts) + logger.Infof("Catchpoint Catchup Total Blocks %d ", status.CatchpointCatchupTotalBlocks) + logger.Infof("Catchpoint Catchup Acquired Blocks %d ", status.CatchpointCatchupAcquiredBlocks) + } + } logger.Infof("fast catchup completed in %v", status.CatchupTime.Seconds()) - node.Stop() - logger.Info("algod node stopped") return nil } @@ -146,7 +132,8 @@ func InitializeLedgerFastCatchup(ctx context.Context, logger *log.Logger, catchp if err != nil { return fmt.Errorf("fullNodeCatchup() err: %w", err) } - + // remove node directory after fast catchup completes + defer os.RemoveAll(filepath.Join(dataDir, genesis.ID())) // move ledger to indexer directory ledgerFiles := []string{ "ledger.block.sqlite",