-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ledger init status #1058
Ledger init status #1058
Changes from all commits
ecdc289
c12d84f
2dad7da
a465041
eb2ac87
8f6e0bf
15fc48c
762a17e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,9 +4,7 @@ import ( | |
"context" | ||
"fmt" | ||
"os" | ||
"os/signal" | ||
"path/filepath" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/algorand/go-algorand-sdk/client/v2/algod" | ||
|
@@ -26,21 +24,9 @@ import ( | |
) | ||
|
||
// InitializeLedgerSimple executes the migration core functionality. | ||
func InitializeLedgerSimple(logger *log.Logger, round uint64, opts *idb.IndexerDbOptions) error { | ||
ctx, cf := context.WithCancel(context.Background()) | ||
func InitializeLedgerSimple(ctx context.Context, logger *log.Logger, round uint64, opts *idb.IndexerDbOptions) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this function supposed to block until initialization is complete? It looks like it starts fetcher and then runs in the background. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
ctx, cf := context.WithCancel(ctx) | ||
defer cf() | ||
{ | ||
cancelCh := make(chan os.Signal, 1) | ||
signal.Notify(cancelCh, syscall.SIGTERM, syscall.SIGINT) | ||
go func() { | ||
<-cancelCh | ||
logger.Errorf("Ledger migration interrupted") | ||
// exit process if migration is interrupted so that | ||
// migration state doesn't get updated in db | ||
os.Exit(1) | ||
}() | ||
} | ||
|
||
var bot fetcher.Fetcher | ||
var err error | ||
if opts.IndexerDatadir == "" { | ||
|
@@ -81,9 +67,10 @@ func InitializeLedgerSimple(logger *log.Logger, round uint64, opts *idb.IndexerD | |
return nil | ||
} | ||
|
||
func fullNodeCatchup(logger *log.Logger, round basics.Round, catchpoint, dataDir string, genesis bookkeeping.Genesis) error { | ||
func fullNodeCatchup(ctx context.Context, logger *log.Logger, round basics.Round, catchpoint, dataDir string, genesis bookkeeping.Genesis) error { | ||
ctx, cf := context.WithCancel(ctx) | ||
defer cf() | ||
wrappedLogger := logging.NewWrappedLogger(logger) | ||
|
||
node, err := node.MakeFull( | ||
wrappedLogger, | ||
dataDir, | ||
|
@@ -93,30 +80,45 @@ func fullNodeCatchup(logger *log.Logger, round basics.Round, catchpoint, dataDir | |
if err != nil { | ||
return err | ||
} | ||
// remove node directory after when exiting fast catchup mode | ||
defer os.RemoveAll(filepath.Join(dataDir, genesis.ID())) | ||
node.Start() | ||
time.Sleep(5 * time.Second) | ||
logger.Info("algod node running") | ||
defer func() { | ||
node.Stop() | ||
logger.Info("algod node stopped") | ||
}() | ||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
case <-time.After(5 * time.Second): | ||
logger.Info("algod node running") | ||
} | ||
|
||
status, err := node.Status() | ||
if err != nil { | ||
return err | ||
} | ||
node.StartCatchup(catchpoint) | ||
// If the node isn't in fast catchup mode, catchpoint will be empty. | ||
|
||
logger.Infof("Running fast catchup using catchpoint %s", catchpoint) | ||
for status.LastRound < round { | ||
time.Sleep(2 * time.Second) | ||
status, err = node.Status() | ||
if status.CatchpointCatchupTotalBlocks > 0 { | ||
logger.Debugf("current round %d ", status.LastRound) | ||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
case <-time.After(2 * time.Second): | ||
status, err = node.Status() | ||
logger.Infof("Catchpoint Catchup Total Accounts %d ", status.CatchpointCatchupTotalAccounts) | ||
logger.Infof("Catchpoint Catchup Processed Accounts %d ", status.CatchpointCatchupProcessedAccounts) | ||
logger.Infof("Catchpoint Catchup Verified Accounts %d ", status.CatchpointCatchupVerifiedAccounts) | ||
logger.Infof("Catchpoint Catchup Total Blocks %d ", status.CatchpointCatchupTotalBlocks) | ||
logger.Infof("Catchpoint Catchup Acquired Blocks %d ", status.CatchpointCatchupAcquiredBlocks) | ||
} | ||
|
||
} | ||
logger.Info("fast catchup completed") | ||
node.Stop() | ||
logger.Info("algod node stopped") | ||
logger.Infof("fast catchup completed in %v", status.CatchupTime.Seconds()) | ||
return nil | ||
} | ||
|
||
// InitializeLedgerFastCatchup executes the migration core functionality. | ||
func InitializeLedgerFastCatchup(logger *log.Logger, catchpoint, dataDir string, genesis bookkeeping.Genesis) error { | ||
func InitializeLedgerFastCatchup(ctx context.Context, logger *log.Logger, catchpoint, dataDir string, genesis bookkeeping.Genesis) error { | ||
if dataDir == "" { | ||
return fmt.Errorf("InitializeLedgerFastCatchup() err: indexer data directory missing") | ||
} | ||
|
@@ -128,11 +130,12 @@ func InitializeLedgerFastCatchup(logger *log.Logger, catchpoint, dataDir string, | |
|
||
// TODO: switch to catchup service catchup. | ||
//err = internal.CatchupServiceCatchup(logger, round, catchpoint, dataDir, genesis) | ||
err = fullNodeCatchup(logger, round, catchpoint, dataDir, genesis) | ||
err = fullNodeCatchup(ctx, logger, round, catchpoint, dataDir, genesis) | ||
if err != nil { | ||
return fmt.Errorf("fullNodeCatchup() err: %w", err) | ||
} | ||
|
||
// remove node directory after fast catchup completes | ||
defer os.RemoveAll(filepath.Join(dataDir, genesis.ID())) | ||
// move ledger to indexer directory | ||
ledgerFiles := []string{ | ||
"ledger.block.sqlite", | ||
|
@@ -185,6 +188,7 @@ func handleBlock(block *rpcs.EncodedBlockCert, proc processor.Processor) error { | |
"block %d import failed", block.Block.Round()) | ||
return fmt.Errorf("handleBlock() err: %w", err) | ||
} | ||
logger.Infof("Initialize Ledger: added block %d to ledger", block.Block.Round()) | ||
return nil | ||
} | ||
func getGenesis(client *algod.Client) (bookkeeping.Genesis, error) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be the indexer directory?