Skip to content

Commit

Permalink
contexts and bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
shiqizng committed Jun 27, 2022
1 parent 2dad7da commit a465041
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 45 deletions.
2 changes: 1 addition & 1 deletion cmd/algorand-indexer/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ var daemonCmd = &cobra.Command{
imp := importer.NewImporter(db)

logger.Info("Initializing local ledger.")
proc, err := blockprocessor.MakeProcessorWithLedgerInit(ctx, logger, catchpoint, &genesis, nextDBRound, opts, imp.ImportBlock)
proc, err := blockprocessor.MakeProcessorWithLedgerInit(ctx, cf, logger, catchpoint, &genesis, nextDBRound, opts, imp.ImportBlock)
if err != nil {
maybeFail(err, "blockprocessor.MakeProcessor() err %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions processor/blockprocessor/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func MakeProcessorWithLedger(l *ledger.Ledger, handler func(block *ledgercore.Va
}

// MakeProcessorWithLedgerInit creates a block processor and initializes the ledger.
func MakeProcessorWithLedgerInit(ctx context.Context, logger *log.Logger, catchpoint string, genesis *bookkeeping.Genesis, nextDBRound uint64, opts idb.IndexerDbOptions, handler func(block *ledgercore.ValidatedBlock) error) (processor.Processor, error) {
func MakeProcessorWithLedgerInit(ctx context.Context, cf context.CancelFunc, logger *log.Logger, catchpoint string, genesis *bookkeeping.Genesis, nextDBRound uint64, opts idb.IndexerDbOptions, handler func(block *ledgercore.ValidatedBlock) error) (processor.Processor, error) {
if nextDBRound > 0 {
if catchpoint != "" {
round, _, err := ledgercore.ParseCatchpointLabel(catchpoint)
Expand All @@ -56,7 +56,7 @@ func MakeProcessorWithLedgerInit(ctx context.Context, logger *log.Logger, catchp
}

}
err := InitializeLedgerSimple(ctx, logger, nextDBRound-1, &opts)
err := InitializeLedgerSimple(ctx, cf, logger, nextDBRound-1, &opts)
if err != nil {
return &blockProcessor{}, fmt.Errorf("MakeProcessorWithCatchup() slow catchup err: %w", err)
}
Expand Down
71 changes: 29 additions & 42 deletions processor/blockprocessor/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import (
"context"
"fmt"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"

"github.com/algorand/go-algorand-sdk/client/v2/algod"
Expand All @@ -26,18 +24,7 @@ import (
)

// InitializeLedgerSimple executes the migration core functionality.
func InitializeLedgerSimple(ctx context.Context, logger *log.Logger, round uint64, opts *idb.IndexerDbOptions) error {
ctx, cf := context.WithCancel(ctx)
defer cf()
{
cancelCh := make(chan os.Signal, 1)
signal.Notify(cancelCh, syscall.SIGTERM, syscall.SIGINT)
go func() {
<-cancelCh
logger.Errorf("Ledger initalization aborted")
os.Exit(1)
}()
}
func InitializeLedgerSimple(ctx context.Context, cf context.CancelFunc, logger *log.Logger, round uint64, opts *idb.IndexerDbOptions) error {

var bot fetcher.Fetcher
var err error
Expand Down Expand Up @@ -91,41 +78,40 @@ func fullNodeCatchup(ctx context.Context, logger *log.Logger, round basics.Round
if err != nil {
return err
}
// remove node directory after when exiting fast catchup mode
defer os.RemoveAll(filepath.Join(dataDir, genesis.ID()))
// stop node if indexer exits during ledger init
_, cf := context.WithCancel(ctx)
defer cf()
{
cancelCh := make(chan os.Signal, 1)
signal.Notify(cancelCh, syscall.SIGTERM, syscall.SIGINT)
go func() {
<-cancelCh
logger.Errorf("Ledger initalization aborted")
node.Stop()
os.Exit(1)
}()
node.Start()
defer func() {
node.Stop()
logger.Info("algod node stopped")
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
logger.Info("algod node running")
}

node.Start()
time.Sleep(5 * time.Second)
logger.Info("algod node running")
status, err := node.Status()
if err != nil {
return err
}
node.StartCatchup(catchpoint)
// If the node isn't in fast catchup mode, catchpoint will be empty.

logger.Infof("Running fast catchup using catchpoint %s", catchpoint)
for status.LastRound < round {
time.Sleep(2 * time.Second)
status, err = node.Status()
logger.Infof("Catchpoint Catchup Total Accounts %d ", status.CatchpointCatchupTotalAccounts)
logger.Infof("Catchpoint Catchup Processed Accounts %d ", status.CatchpointCatchupProcessedAccounts)
logger.Infof("Catchpoint Catchup Verified Accounts %d ", status.CatchpointCatchupVerifiedAccounts)
logger.Infof("Catchpoint Catchup Total Blocks %d ", status.CatchpointCatchupTotalBlocks)
logger.Infof("Catchpoint Catchup Acquired Blocks %d ", status.CatchpointCatchupAcquiredBlocks)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(2 * time.Second):
status, err = node.Status()
logger.Infof("Catchpoint Catchup Total Accounts %d ", status.CatchpointCatchupTotalAccounts)
logger.Infof("Catchpoint Catchup Processed Accounts %d ", status.CatchpointCatchupProcessedAccounts)
logger.Infof("Catchpoint Catchup Verified Accounts %d ", status.CatchpointCatchupVerifiedAccounts)
logger.Infof("Catchpoint Catchup Total Blocks %d ", status.CatchpointCatchupTotalBlocks)
logger.Infof("Catchpoint Catchup Acquired Blocks %d ", status.CatchpointCatchupAcquiredBlocks)
}

}
logger.Infof("fast catchup completed in %v", status.CatchupTime.Seconds())
node.Stop()
logger.Info("algod node stopped")
return nil
}

Expand All @@ -146,7 +132,8 @@ func InitializeLedgerFastCatchup(ctx context.Context, logger *log.Logger, catchp
if err != nil {
return fmt.Errorf("fullNodeCatchup() err: %w", err)
}

// remove node directory after fast catchup completes
defer os.RemoveAll(filepath.Join(dataDir, genesis.ID()))
// move ledger to indexer directory
ledgerFiles := []string{
"ledger.block.sqlite",
Expand Down

0 comments on commit a465041

Please sign in to comment.