Skip to content

Commit

Permalink
Misc Local Ledger cleanup (#1086)
Browse files Browse the repository at this point in the history
  • Loading branch information
winder authored Jul 5, 2022
1 parent f977817 commit 1b89a6b
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 105 deletions.
2 changes: 1 addition & 1 deletion cmd/algorand-indexer/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func runBlockImporter(ctx context.Context, cfg *daemonConfig, wg *sync.WaitGroup
genesis, err := iutil.ReadGenesis(genesisReader)
maybeFail(err, "Error reading genesis file")

_, err = importer.EnsureInitialImport(db, genesis, logger)
_, err = importer.EnsureInitialImport(db, genesis)
maybeFail(err, "importer.EnsureInitialImport() error")

// sync local ledger
Expand Down
3 changes: 0 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ const FileType = "yml"
// gets confused and thinks the binary is a config file with no extension.
const FileName = "indexer"

// ConfigPaths are the different locations that algorand-indexer should look for config files.
var ConfigPaths = [...]string{".", "$HOME", "$HOME/.algorand-indexer/", "$HOME/.config/algorand-indexer/", "/etc/algorand-indexer/"}

// BindFlagSet glues cobra and viper together via FlagSets
func BindFlagSet(flags *pflag.FlagSet) {
flags.VisitAll(func(f *pflag.Flag) {
Expand Down
19 changes: 10 additions & 9 deletions idb/postgres/postgres_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sirupsen/logrus"
test2 "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -63,11 +62,12 @@ func TestMaxRound(t *testing.T) {
assert.NoError(t, err)
defer pdb.Close()

db.Exec(
_, err = db.Exec(
context.Background(),
`INSERT INTO metastate (k, v) values ($1, $2)`,
"state",
`{"next_account_round":123454322}`)
assert.NoError(t, err)

round, err := pdb.GetNextRoundToAccount()
require.NoError(t, err)
Expand All @@ -86,11 +86,12 @@ func TestAccountedRoundNextRound0(t *testing.T) {
assert.NoError(t, err)
defer pdb.Close()

db.Exec(
_, err = db.Exec(
context.Background(),
`INSERT INTO metastate (k, v) values ($1, $2)`,
"state",
`{"next_account_round":0}`)
assert.NoError(t, err)

round, err := pdb.GetNextRoundToAccount()
require.NoError(t, err)
Expand Down Expand Up @@ -2114,9 +2115,9 @@ func TestGenesisHashCheckAtDBSetup(t *testing.T) {
// connect with different genesis configs
genesis.Network = "testnest"
// different genesisHash, should fail
idb, _, err := OpenPostgres(connStr, idb.IndexerDbOptions{}, nil)
idbImpl, _, err := OpenPostgres(connStr, idb.IndexerDbOptions{}, nil)
assert.NoError(t, err)
err = idb.LoadGenesis(genesis)
err = idbImpl.LoadGenesis(genesis)
assert.Error(t, err)
assert.Contains(t, err.Error(), "genesis hash not matching")
}
Expand All @@ -2142,15 +2143,15 @@ func TestGenesisHashCheckAtInitialImport(t *testing.T) {
jsonCodecHandle := new(codec.JsonHandle)
enc := codec.NewEncoderBytes(&buf, jsonCodecHandle)
enc.MustEncode(state)
db.setMetastate(nil, schema.StateMetastateKey, string(buf))
err = db.setMetastate(nil, schema.StateMetastateKey, string(buf))
assert.NoError(t, err)
// network state not initialized
networkState, err := db.getNetworkState(context.Background(), nil)
require.ErrorIs(t, err, idb.ErrorNotInitialized)
logger := logrus.New()
genesisReader := bytes.NewReader(protocol.EncodeJSON(genesis))
gen, err := util.ReadGenesis(genesisReader)
require.NoError(t, err)
imported, err := importer.EnsureInitialImport(db, gen, logger)
imported, err := importer.EnsureInitialImport(db, gen)
require.NoError(t, err)
require.True(t, true, imported)
// network state should be set
Expand All @@ -2164,7 +2165,7 @@ func TestGenesisHashCheckAtInitialImport(t *testing.T) {
gen, err = util.ReadGenesis(genesisReader)
require.NoError(t, err)
// different genesisHash, should fail
_, err = importer.EnsureInitialImport(db, gen, logger)
_, err = importer.EnsureInitialImport(db, gen)
require.Error(t, err)
require.Contains(t, err.Error(), "genesis hash not matching")

Expand Down
4 changes: 2 additions & 2 deletions importer/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (h *ImportHelper) Import(db idb.IndexerDb, args []string) {
genesis, err := util.ReadGenesis(genesisReader)
maybeFail(err, h.Log, "readGenesis() error")

_, err = EnsureInitialImport(db, genesis, h.Log)
_, err = EnsureInitialImport(db, genesis)
maybeFail(err, h.Log, "EnsureInitialImport() error")

imp := NewImporter(db)
Expand Down Expand Up @@ -188,7 +188,7 @@ func importFile(fname string, imp Importer, l *log.Logger, genesisPath string) (
}

// EnsureInitialImport imports the genesis block if needed. Returns true if the initial import occurred.
func EnsureInitialImport(db idb.IndexerDb, genesis bookkeeping.Genesis, l *log.Logger) (bool, error) {
func EnsureInitialImport(db idb.IndexerDb, genesis bookkeeping.Genesis) (bool, error) {
_, err := db.GetNextRoundToAccount()
// Exit immediately or crash if we don't see ErrorNotInitialized.
if err != idb.ErrorNotInitialized {
Expand Down
21 changes: 3 additions & 18 deletions processor/blockprocessor/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,9 @@ func MakeProcessorWithLedger(l *ledger.Ledger, handler func(block *ledgercore.Va

// MakeProcessorWithLedgerInit creates a block processor and initializes the ledger.
func MakeProcessorWithLedgerInit(ctx context.Context, logger *log.Logger, catchpoint string, genesis *bookkeeping.Genesis, nextDBRound uint64, opts idb.IndexerDbOptions, handler func(block *ledgercore.ValidatedBlock) error) (processor.Processor, error) {
if nextDBRound > 0 {
if catchpoint != "" {
round, _, err := ledgercore.ParseCatchpointLabel(catchpoint)
if err != nil {
return &blockProcessor{}, fmt.Errorf("MakeProcessorWithCatchup() label err: %w", err)
}
if uint64(round) >= nextDBRound {
return &blockProcessor{}, fmt.Errorf("invalid catchpoint: catchpoint round %d should not be ahead of target round %d", uint64(round), nextDBRound-1)
}
err = InitializeLedgerFastCatchup(ctx, logger, catchpoint, opts.IndexerDatadir, *genesis)
if err != nil {
return &blockProcessor{}, fmt.Errorf("MakeProcessorWithCatchup() fast catchup err: %w", err)
}
}
err := InitializeLedgerSimple(ctx, logger, nextDBRound-1, &opts)
if err != nil {
return &blockProcessor{}, fmt.Errorf("MakeProcessorWithCatchup() slow catchup err: %w", err)
}
err := InitializeLedger(ctx, logger, catchpoint, nextDBRound, *genesis, &opts)
if err != nil {
return nil, fmt.Errorf("MakeProcessorWithLedgerInit() err: %w", err)
}
return MakeProcessor(logger, genesis, nextDBRound, opts.IndexerDatadir, handler)
}
Expand Down
85 changes: 44 additions & 41 deletions processor/blockprocessor/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ import (

log "github.com/sirupsen/logrus"

"github.com/algorand/go-algorand-sdk/client/v2/algod"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/rpcs"

"github.com/algorand/indexer/fetcher"
Expand All @@ -20,8 +18,49 @@ import (
"github.com/algorand/indexer/processor/blockprocessor/internal"
)

// InitializeLedgerSimple executes the migration core functionality.
func InitializeLedgerSimple(ctx context.Context, logger *log.Logger, round uint64, opts *idb.IndexerDbOptions) error {
// InitializeLedger will initialize a ledger to the directory given by the
// IndexerDbOpts.
// nextRound - next round to process after initializing.
// catchpoint - if provided, attempt to use fast catchup.
func InitializeLedger(ctx context.Context, logger *log.Logger, catchpoint string, nextRound uint64, genesis bookkeeping.Genesis, opts *idb.IndexerDbOptions) error {
if nextRound > 0 {
if catchpoint != "" {
round, _, err := ledgercore.ParseCatchpointLabel(catchpoint)
if err != nil {
return fmt.Errorf("InitializeLedger() label err: %w", err)
}
if uint64(round) >= nextRound {
return fmt.Errorf("invalid catchpoint: catchpoint round %d should not be ahead of target round %d", uint64(round), nextRound-1)
}
err = InitializeLedgerFastCatchup(ctx, logger, catchpoint, opts.IndexerDatadir, genesis)
if err != nil {
return fmt.Errorf("InitializeLedger() fast catchup err: %w", err)
}
}
err := InitializeLedgerSimple(ctx, logger, nextRound-1, &genesis, opts)
if err != nil {
return fmt.Errorf("InitializeLedger() slow catchup err: %w", err)
}
}
return nil
}

// InitializeLedgerFastCatchup executes the migration core functionality.
func InitializeLedgerFastCatchup(ctx context.Context, logger *log.Logger, catchpoint, dataDir string, genesis bookkeeping.Genesis) error {
if dataDir == "" {
return fmt.Errorf("InitializeLedgerFastCatchup() err: indexer data directory missing")
}

err := internal.CatchupServiceCatchup(ctx, logger, catchpoint, dataDir, genesis)
if err != nil {
return fmt.Errorf("InitializeLedgerFastCatchup() err: %w", err)
}
return nil
}

// InitializeLedgerSimple initializes a ledger with the block processor by
// sending it one block at a time and letting it update the ledger as usual.
func InitializeLedgerSimple(ctx context.Context, logger *log.Logger, round uint64, genesis *bookkeeping.Genesis, opts *idb.IndexerDbOptions) error {
ctx, cf := context.WithCancel(ctx)
defer cf()
var bot fetcher.Fetcher
Expand All @@ -35,12 +74,8 @@ func InitializeLedgerSimple(ctx context.Context, logger *log.Logger, round uint6
return fmt.Errorf("InitializeLedgerSimple() err: %w", err)
}
logger.Info("initializing ledger")
genesis, err := getGenesis(bot.Algod())
if err != nil {
return fmt.Errorf("InitializeLedgerSimple() err: %w", err)
}

proc, err := MakeProcessor(logger, &genesis, round, opts.IndexerDatadir, nil)
proc, err := MakeProcessor(logger, genesis, round, opts.IndexerDatadir, nil)
if err != nil {
return fmt.Errorf("RunMigration() err: %w", err)
}
Expand Down Expand Up @@ -136,25 +171,6 @@ func fullNodeCatchup(ctx context.Context, logger *log.Logger, round basics.Round
}
*/

// InitializeLedgerFastCatchup executes the migration core functionality.
func InitializeLedgerFastCatchup(ctx context.Context, logger *log.Logger, catchpoint, dataDir string, genesis bookkeeping.Genesis) error {
if dataDir == "" {
return fmt.Errorf("InitializeLedgerFastCatchup() err: indexer data directory missing")
}
// catchpoint round
round, _, err := ledgercore.ParseCatchpointLabel(catchpoint)
if err != nil {
return fmt.Errorf("InitializeLedgerFastCatchup() err: %w", err)
}

err = internal.CatchupServiceCatchup(ctx, logger, round, catchpoint, dataDir, genesis)
//err = fullNodeCatchup(ctx, logger, round, catchpoint, dataDir, genesis)
if err != nil {
return fmt.Errorf("InitializeLedgerFastCatchup() err: %w", err)
}
return nil
}

// blockHandler creates a handler complying to the fetcher block handler interface. In case of a failure it keeps
// attempting to add the block until the fetcher shuts down.
func blockHandler(logger *log.Logger, dbRound uint64, proc processor.Processor, cancel context.CancelFunc, retryDelay time.Duration) func(context.Context, *rpcs.EncodedBlockCert) error {
Expand Down Expand Up @@ -191,20 +207,7 @@ func handleBlock(logger *log.Logger, block *rpcs.EncodedBlockCert, proc processo
logger.Infof("Initialize Ledger: added block %d to ledger", block.Block.Round())
return nil
}
func getGenesis(client *algod.Client) (bookkeeping.Genesis, error) {
data, err := client.GetGenesis().Do(context.Background())
if err != nil {
return bookkeeping.Genesis{}, fmt.Errorf("getGenesis() client err: %w", err)
}

var res bookkeeping.Genesis
err = protocol.DecodeJSON([]byte(data), &res)
if err != nil {
return bookkeeping.Genesis{}, fmt.Errorf("getGenesis() decode err: %w", err)
}

return res, nil
}
func getFetcher(logger *log.Logger, opts *idb.IndexerDbOptions) (fetcher.Fetcher, error) {
var err error
var bot fetcher.Fetcher
Expand Down
29 changes: 9 additions & 20 deletions processor/blockprocessor/initialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package blockprocessor
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"os"
"testing"
"time"

"github.com/jarcoal/httpmock"
"github.com/sirupsen/logrus"
test2 "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -17,7 +17,6 @@ import (
"github.com/algorand/go-algorand-sdk/encoding/json"
"github.com/algorand/go-algorand-sdk/encoding/msgpack"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/rpcs"

"github.com/algorand/indexer/idb"
Expand Down Expand Up @@ -70,7 +69,7 @@ func TestRunMigration(t *testing.T) {

// migrate 3 rounds
log, _ := test2.NewNullLogger()
err = InitializeLedgerSimple(context.Background(), log, 3, &opts)
err = InitializeLedgerSimple(context.Background(), log, 3, &genesis, &opts)
assert.NoError(t, err)
l, err := util.MakeLedger(log, false, &genesis, opts.IndexerDatadir)
assert.NoError(t, err)
Expand All @@ -79,7 +78,7 @@ func TestRunMigration(t *testing.T) {
l.Close()

// migration continues from last round
err = InitializeLedgerSimple(context.Background(), log, 6, &opts)
err = InitializeLedgerSimple(context.Background(), log, 6, &genesis, &opts)
assert.NoError(t, err)

l, err = util.MakeLedger(log, false, &genesis, opts.IndexerDatadir)
Expand All @@ -89,33 +88,23 @@ func TestRunMigration(t *testing.T) {
}

func TestInitializeLedgerFastCatchup_Errors(t *testing.T) {
genesis := test.MakeGenesis()
log, _ := test2.NewNullLogger()
err := InitializeLedgerFastCatchup(context.Background(), log, "asdf", "", bookkeeping.Genesis{})
err := InitializeLedgerFastCatchup(context.Background(), log, "asdf", "", genesis)
require.EqualError(t, err, "InitializeLedgerFastCatchup() err: indexer data directory missing")

err = InitializeLedgerFastCatchup(context.Background(), log, "asdf", t.TempDir(), bookkeeping.Genesis{})
require.EqualError(t, err, "InitializeLedgerFastCatchup() err: catchpoint parsing failed")
err = InitializeLedgerFastCatchup(context.Background(), log, "asdf", t.TempDir(), genesis)
require.ErrorContains(t, err, "catchpoint parsing failed")

tryToRun := func(ctx context.Context) {
var addr basics.Address
genesis := bookkeeping.Genesis{
SchemaID: "1",
Network: "test",
Proto: "future",
Allocation: nil,
RewardsPool: addr.String(),
FeeSink: addr.String(),
Timestamp: 0,
Comment: "",
DevMode: false,
}
genesis := test.MakeGenesis()
err = InitializeLedgerFastCatchup(
ctx,
logrus.New(),
"21890000#BOGUSTCNVEDIBNRPNCKWRBQLJ7ILXIJBYKAHF67TLUOYRUGHW7ZA",
t.TempDir(),
genesis)
require.EqualError(t, err, "InitializeLedgerFastCatchup() err: context canceled")
require.ErrorContains(t, err, "context canceled")
}

// Run with an immediate cancel
Expand Down
Loading

0 comments on commit 1b89a6b

Please sign in to comment.