Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LocalLedger Refactoring + Catchpoint Service #1049

Merged
merged 27 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
92361b9
Read genesis fewer times.
winder Jun 22, 2022
2efc02b
Put full node catchup in a function.
winder Jun 22, 2022
16b7a08
Pass the indexer logger into the block processor.
winder Jun 22, 2022
b3cc397
Open ledger with consistent prefix
winder Jun 22, 2022
59babb6
Catchup service implementation.
winder Jun 22, 2022
4d7c868
Fix linter.
winder Jun 22, 2022
2d73c24
Move ledger init into constructor.
winder Jun 22, 2022
3697cfb
Comment out catchup code to avoid circular dependency.
winder Jun 22, 2022
65fc4fc
Fix warnings.
winder Jun 22, 2022
aea0408
Combine multiple ReadGenesis functions.
winder Jun 22, 2022
060729b
Fix test.
winder Jun 22, 2022
bec28c5
Move local_ledger migration to blockprocessor.
winder Jun 22, 2022
9ae7a8f
Use logger instead of printf for catchup service status.
winder Jun 22, 2022
378a410
Update submodule
winder Jun 22, 2022
5933ef5
Switch to NewWrappedLogger, general cleanup.
winder Jun 22, 2022
8dc818c
Make CreateInitState private.
winder Jun 22, 2022
9a2d5c2
Use a separate logger for the daemon test.
winder Jun 22, 2022
dbbd6b9
Cleanup after e2e test.
winder Jun 22, 2022
57a1de0
Revert "Cleanup after e2e test."
winder Jun 22, 2022
35e27e5
troubleshooting.
winder Jun 22, 2022
185e206
Revert "Revert "Cleanup after e2e test.""
winder Jun 22, 2022
871298a
debugging
winder Jun 22, 2022
d65c1f6
debugging
winder Jun 22, 2022
f890049
debugging
winder Jun 22, 2022
ef807aa
debugging
winder Jun 22, 2022
b52b464
Remove debugging logic.
winder Jun 22, 2022
c05917b
Simplify MakeTestLedger
winder Jun 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions api/handlers_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,27 @@ import (
"testing"
"time"

"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/crypto/merklesignature"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/indexer/processor"
"github.com/algorand/indexer/processor/blockprocessor"
"github.com/labstack/echo/v4"
"github.com/sirupsen/logrus"
test2 "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand-sdk/encoding/json"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/crypto/merklesignature"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/indexer/processor"

"github.com/algorand/indexer/api/generated/v2"
"github.com/algorand/indexer/idb"
"github.com/algorand/indexer/idb/postgres"
pgtest "github.com/algorand/indexer/idb/postgres/testing"
"github.com/algorand/indexer/processor/blockprocessor"
"github.com/algorand/indexer/util/test"
)

Expand Down Expand Up @@ -74,7 +75,9 @@ func setupIdb(t *testing.T, genesis bookkeeping.Genesis) (*postgres.IndexerDb, f
err = db.LoadGenesis(genesis)
require.NoError(t, err)

l := test.MakeTestLedger("ledger")
log, _ := test2.NewNullLogger()
l, err := test.MakeTestLedger(log)
require.NoError(t, err)
proc, err := blockprocessor.MakeProcessorWithLedger(l, db.AddBlock)
require.NoError(t, err, "failed to open ledger")
return db, newShutdownFunc, proc, l
Expand Down
58 changes: 10 additions & 48 deletions cmd/algorand-indexer/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package main
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/signal"
"path/filepath"
Expand All @@ -14,24 +12,22 @@ import (
"syscall"
"time"

"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/go-algorand/util"

"github.com/algorand/indexer/api"
"github.com/algorand/indexer/api/generated/v2"
"github.com/algorand/indexer/config"
"github.com/algorand/indexer/fetcher"
"github.com/algorand/indexer/idb"
"github.com/algorand/indexer/importer"
localledger "github.com/algorand/indexer/migrations/local_ledger"
"github.com/algorand/indexer/processor"
"github.com/algorand/indexer/processor/blockprocessor"
iutil "github.com/algorand/indexer/util"
"github.com/algorand/indexer/util/metrics"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

var (
Expand Down Expand Up @@ -233,39 +229,21 @@ var daemonCmd = &cobra.Command{

// Initial import if needed.
genesisReader := importer.GetGenesisFile(genesisJSONPath, bot.Algod(), logger)
_, err := importer.EnsureInitialImport(db, genesisReader, logger)
genesis, err := iutil.ReadGenesis(genesisReader)
maybeFail(err, "Error reading genesis file")

_, err = importer.EnsureInitialImport(db, genesis, logger)
maybeFail(err, "importer.EnsureInitialImport() error")

// sync local ledger
nextDBRound, err := db.GetNextRoundToAccount()
maybeFail(err, "Error getting DB round")
if nextDBRound > 0 {
if catchpoint != "" {
round, _, err := ledgercore.ParseCatchpointLabel(catchpoint)
if err != nil {
maybeFail(err, "catchpoint error")
}
if uint64(round) >= nextDBRound {
logger.Warnf("round for given catchpoint is ahead of db round. skip fast catchup")
} else {
err = localledger.RunMigrationFastCatchup(logging.NewLogger(), catchpoint, &opts)
maybeFail(err, "Error running ledger migration in fast catchup mode")
}

}
err = localledger.RunMigrationSimple(nextDBRound-1, &opts)
maybeFail(err, "Error running ledger migration")
}

logger.Info("Initializing block import handler.")
imp := importer.NewImporter(db)

logger.Info("Initializing local ledger.")
genesisReader = importer.GetGenesisFile(genesisJSONPath, bot.Algod(), logger)
genesis, err := readGenesis(genesisReader)
maybeFail(err, "Error reading genesis file")

proc, err := blockprocessor.MakeProcessor(&genesis, nextDBRound, indexerDataDir, imp.ImportBlock)
proc, err := blockprocessor.MakeProcessorWithLedgerInit(logger, catchpoint, &genesis, nextDBRound, opts, imp.ImportBlock)
if err != nil {
maybeFail(err, "blockprocessor.MakeProcessor() err %v", err)
}
Expand Down Expand Up @@ -450,19 +428,3 @@ func handleBlock(block *rpcs.EncodedBlockCert, proc processor.Processor) error {

return nil
}

func readGenesis(reader io.Reader) (bookkeeping.Genesis, error) {
var genesis bookkeeping.Genesis
if reader == nil {
return bookkeeping.Genesis{}, fmt.Errorf("readGenesis() err: reader is nil")
}
gbytes, err := ioutil.ReadAll(reader)
if err != nil {
return bookkeeping.Genesis{}, fmt.Errorf("readGenesis() err: %w", err)
}
err = protocol.DecodeJSON(gbytes, &genesis)
if err != nil {
return bookkeeping.Genesis{}, fmt.Errorf("readGenesis() err: %w", err)
}
return genesis, nil
}
43 changes: 8 additions & 35 deletions cmd/algorand-indexer/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@ package main
import (
"context"
"errors"
"io"
"strings"
"sync"
"testing"
"time"

"github.com/algorand/go-algorand-sdk/encoding/json"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/rpcs"

"github.com/algorand/indexer/processor/blockprocessor"
itest "github.com/algorand/indexer/util/test"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
)

type mockImporter struct {
Expand All @@ -39,7 +39,9 @@ func TestImportRetryAndCancel(t *testing.T) {

// create handler with mock importer and start, it should generate errors until cancelled.
imp := &mockImporter{}
l := itest.MakeTestLedger("ledger")
ledgerLogger, _ := test.NewNullLogger()
l, err := itest.MakeTestLedger(ledgerLogger)
require.NoError(t, err)
defer l.Close()
proc, err := blockprocessor.MakeProcessorWithLedger(l, nil)
assert.Nil(t, err)
Expand Down Expand Up @@ -73,32 +75,3 @@ func TestImportRetryAndCancel(t *testing.T) {
cancel()
wg.Wait()
}

func TestReadGenesis(t *testing.T) {
var reader io.Reader
// nil reader
_, err := readGenesis(reader)
assert.Contains(t, err.Error(), "readGenesis() err: reader is nil")
// no match struct field
genesisStr := "{\"version\": 2}"
reader = strings.NewReader(genesisStr)
_, err = readGenesis(reader)
assert.Contains(t, err.Error(), "json decode error")

genesis := bookkeeping.Genesis{
SchemaID: "1",
Network: "test",
Proto: "test",
RewardsPool: "AAAA",
FeeSink: "AAAA",
}

// read and decode genesis
reader = strings.NewReader(string(json.Encode(genesis)))
_, err = readGenesis(reader)
assert.Nil(t, err)
// read from empty reader
_, err = readGenesis(reader)
assert.Contains(t, err.Error(), "readGenesis() err: EOF")

}
16 changes: 7 additions & 9 deletions cmd/import-validator/core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"sync"
"time"

"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"

"github.com/algorand/go-algorand-sdk/client/v2/algod"
"github.com/algorand/go-algorand/agreement"
Expand Down Expand Up @@ -52,7 +52,7 @@ func getGenesis(client *algod.Client) (bookkeeping.Genesis, error) {
return res, nil
}

func openIndexerDb(postgresConnStr string, genesis *bookkeeping.Genesis, logger *logrus.Logger) (*postgres.IndexerDb, error) {
func openIndexerDb(postgresConnStr string, genesis *bookkeeping.Genesis, logger *log.Logger) (*postgres.IndexerDb, error) {
db, availableCh, err :=
postgres.OpenPostgres(postgresConnStr, idb.IndexerDbOptions{}, logger)
if err != nil {
Expand Down Expand Up @@ -92,9 +92,7 @@ func openIndexerDb(postgresConnStr string, genesis *bookkeeping.Genesis, logger
return db, nil
}

func openLedger(ledgerPath string, genesis *bookkeeping.Genesis) (*ledger.Ledger, error) {
logger := logging.NewLogger()

func openLedger(logger *log.Logger, ledgerPath string, genesis *bookkeeping.Genesis) (*ledger.Ledger, error) {
accounts := make(map[basics.Address]basics.AccountData)
for _, alloc := range genesis.Allocation {
address, err := basics.UnmarshalChecksumAddress(alloc.Address)
Expand All @@ -115,7 +113,7 @@ func openLedger(ledgerPath string, genesis *bookkeeping.Genesis) (*ledger.Ledger
}

l, err := ledger.OpenLedger(
logger, path.Join(ledgerPath, "ledger"), false, initState, config.GetDefaultLocal())
logging.NewWrappedLogger(logger), path.Join(ledgerPath, "ledger"), false, initState, config.GetDefaultLocal())
if err != nil {
return nil, fmt.Errorf("openLedger() open err: %w", err)
}
Expand Down Expand Up @@ -309,7 +307,7 @@ func checkModifiedState(db *postgres.IndexerDb, l *ledger.Ledger, block *bookkee
return nil
}

func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logger *logrus.Logger) error {
func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logger *log.Logger) error {
nextRoundIndexer, err := db.GetNextRoundToAccount()
if err != nil {
return fmt.Errorf("catchup err: %w", err)
Expand Down Expand Up @@ -391,7 +389,7 @@ func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logg

// Run is a blocking call that runs the import validator service.
func Run(args ImportValidatorArgs) {
logger := logrus.New()
logger := log.New()

bot, err := fetcher.ForNetAndToken(args.AlgodAddr, args.AlgodToken, logger)
if err != nil {
Expand All @@ -408,7 +406,7 @@ func Run(args ImportValidatorArgs) {
fmt.Printf("error opening indexer database err: %v", err)
os.Exit(1)
}
l, err := openLedger(args.AlgodLedger, &genesis)
l, err := openLedger(logger, args.AlgodLedger, &genesis)
if err != nil {
fmt.Printf("error opening algod database err: %v", err)
os.Exit(1)
Expand Down
6 changes: 5 additions & 1 deletion idb/postgres/postgres_integration_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"testing"

test2 "github.com/sirupsen/logrus/hooks/test"

"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/indexer/processor"
Expand Down Expand Up @@ -35,7 +37,9 @@ func setupIdb(t *testing.T, genesis bookkeeping.Genesis) (*IndexerDb, func(), pr
shutdownFunc()
}

l := test.MakeTestLedger("ledger")
log, _ := test2.NewNullLogger()
l, err := test.MakeTestLedger(log)
require.NoError(t, err)
proc, err := blockprocessor.MakeProcessorWithLedger(l, db.AddBlock)
require.NoError(t, err, "failed to open ledger")

Expand Down
35 changes: 23 additions & 12 deletions idb/postgres/postgres_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,29 @@ import (
"testing"
"time"

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sirupsen/logrus"
test2 "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/go-codec/codec"

"github.com/algorand/indexer/api/generated/v2"
"github.com/algorand/indexer/idb"
"github.com/algorand/indexer/idb/postgres/internal/encoding"
"github.com/algorand/indexer/idb/postgres/internal/schema"
pgtest "github.com/algorand/indexer/idb/postgres/internal/testing"
pgutil "github.com/algorand/indexer/idb/postgres/internal/util"
"github.com/algorand/indexer/importer"
"github.com/algorand/indexer/processor/blockprocessor"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/algorand/indexer/idb"
pgtest "github.com/algorand/indexer/idb/postgres/internal/testing"
"github.com/algorand/indexer/util"
"github.com/algorand/indexer/util/test"
)

Expand Down Expand Up @@ -1411,7 +1414,9 @@ func TestAddBlockIncrementsMaxRoundAccounted(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, uint64(0), round)

l := test.MakeTestLedger("ledger")
log, _ := test2.NewNullLogger()
l, err := test.MakeTestLedger(log)
require.NoError(t, err)
defer l.Close()
proc, err := blockprocessor.MakeProcessorWithLedger(l, db.AddBlock)
require.NoError(t, err, "failed to open ledger")
Expand Down Expand Up @@ -1707,7 +1712,9 @@ func TestSearchForInnerTransactionReturnsRootTransaction(t *testing.T) {
rootTxid := appCall.Txn.ID()

err = pgutil.TxWithRetry(pdb, serializable, func(tx pgx.Tx) error {
l := test.MakeTestLedger("ledger")
log, _ := test2.NewNullLogger()
l, err := test.MakeTestLedger(log)
require.NoError(t, err)
defer l.Close()
proc, err := blockprocessor.MakeProcessorWithLedger(l, db.AddBlock)
require.NoError(t, err, "failed to open ledger")
Expand Down Expand Up @@ -2141,7 +2148,9 @@ func TestGenesisHashCheckAtInitialImport(t *testing.T) {
require.ErrorIs(t, err, idb.ErrorNotInitialized)
logger := logrus.New()
genesisReader := bytes.NewReader(protocol.EncodeJSON(genesis))
imported, err := importer.EnsureInitialImport(db, genesisReader, logger)
gen, err := util.ReadGenesis(genesisReader)
require.NoError(t, err)
imported, err := importer.EnsureInitialImport(db, gen, logger)
require.NoError(t, err)
require.True(t, true, imported)
// network state should be set
Expand All @@ -2152,8 +2161,10 @@ func TestGenesisHashCheckAtInitialImport(t *testing.T) {
// change genesis value
genesis.Network = "testnest"
genesisReader = bytes.NewReader(protocol.EncodeJSON(genesis))
gen, err = util.ReadGenesis(genesisReader)
require.NoError(t, err)
// different genesisHash, should fail
_, err = importer.EnsureInitialImport(db, genesisReader, logger)
_, err = importer.EnsureInitialImport(db, gen, logger)
require.Error(t, err)
require.Contains(t, err.Error(), "genesis hash not matching")

Expand Down
Loading