From 99a848cbc13d86b79445147fe00573cf5b284ddc Mon Sep 17 00:00:00 2001 From: shiqizng <80276844+shiqizng@users.noreply.github.com> Date: Tue, 25 Oct 2022 11:54:31 -0400 Subject: [PATCH] Add block metadata to pipeline (#1258) --- .gitignore | 3 + cmd/algorand-indexer/daemon.go | 4 +- conduit/config.go | 3 +- conduit/gen/lookup.go | 312 ++++++++++++++++++ conduit/init_provider.go | 4 +- conduit/pipeline.go | 147 +++++++-- conduit/pipeline_test.go | 207 +++++++++++- data/block_export_data.go | 2 +- exporters/README.md | 9 +- exporters/example/example_exporter.go | 3 +- exporters/example/example_exporter_test.go | 11 +- exporters/exporter.go | 12 +- exporters/filewriter/file_exporter.go | 110 +----- exporters/filewriter/file_exporter_test.go | 151 ++------- exporters/noop/noop_exporter.go | 7 +- exporters/noop/noop_exporter_test.go | 18 +- exporters/postgresql/postgresql_exporter.go | 45 ++- .../postgresql/postgresql_exporter_test.go | 43 ++- processors/blockprocessor/block_processor.go | 2 +- util/test/testutil.go | 25 ++ 20 files changed, 742 insertions(+), 376 deletions(-) create mode 100644 conduit/gen/lookup.go diff --git a/.gitignore b/.gitignore index 0ee7921c4..d9cbef393 100644 --- a/.gitignore +++ b/.gitignore @@ -44,3 +44,6 @@ coverage.txt # asdf .tool-versions + +# conduit example +cmd/conduit/data diff --git a/cmd/algorand-indexer/daemon.go b/cmd/algorand-indexer/daemon.go index d6647cdd8..7e6a3d913 100644 --- a/cmd/algorand-indexer/daemon.go +++ b/cmd/algorand-indexer/daemon.go @@ -360,7 +360,9 @@ func runDaemon(daemonConfig *daemonConfig) error { func makeConduitConfig(dCfg *daemonConfig) conduit.PipelineConfig { return conduit.PipelineConfig{ - ConduitConfig: &conduit.Config{}, + ConduitConfig: &conduit.Config{ + ConduitDataDir: dCfg.indexerDataDir, + }, PipelineLogLevel: logger.GetLevel().String(), Importer: conduit.NameConfigPair{ Name: "algod", diff --git a/conduit/config.go b/conduit/config.go index 6e334a965..af7bf31d5 100644 --- a/conduit/config.go +++ b/conduit/config.go @@ -2,9 +2,10 @@ package conduit import ( "fmt" + "strings" + "github.com/algorand/go-algorand/util" "github.com/spf13/pflag" - "strings" ) // DefaultConfigName is the default conduit configuration filename. diff --git a/conduit/gen/lookup.go b/conduit/gen/lookup.go new file mode 100644 index 000000000..37b40dad5 --- /dev/null +++ b/conduit/gen/lookup.go @@ -0,0 +1,312 @@ +// Code generated via go generate. DO NOT EDIT. +package fields + +import "github.com/algorand/go-algorand/data/transactions" + +// SignedTxnMap generates a map with the key as the codec tag and the value as a function +// that returns the associated variable +var SignedTxnMap = map[string]func(*transactions.SignedTxnInBlock) interface{}{ + "aca": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.ApplyData.AssetClosingAmount) + }, + "apid": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.ApplyData.ApplicationID) + }, + "ca": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.ApplyData.ClosingAmount) + }, + "caid": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.ApplyData.ConfigAsset) + }, + "dt": func(i *transactions.SignedTxnInBlock) interface{} { return &((*i).SignedTxnWithAD.ApplyData.EvalDelta) }, + "dt.gd": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.ApplyData.EvalDelta.GlobalDelta) + }, + "dt.itx": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.ApplyData.EvalDelta.InnerTxns) + }, + "dt.ld": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.ApplyData.EvalDelta.LocalDeltas) + }, + "dt.lg": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.ApplyData.EvalDelta.Logs) + }, + "hgh": func(i *transactions.SignedTxnInBlock) interface{} { return &((*i).HasGenesisHash) }, + "hgi": func(i *transactions.SignedTxnInBlock) interface{} { return &((*i).HasGenesisID) }, + "lsig": func(i *transactions.SignedTxnInBlock) interface{} { return &((*i).SignedTxnWithAD.SignedTxn.Lsig) }, + "lsig.arg": func(i *transactions.SignedTxnInBlock) interface{} { return &((*i).SignedTxnWithAD.SignedTxn.Lsig.Args) }, + "lsig.l": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Lsig.Logic) + }, + "lsig.msig": func(i *transactions.SignedTxnInBlock) interface{} { return &((*i).SignedTxnWithAD.SignedTxn.Lsig.Msig) }, + "lsig.msig.subsig": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Lsig.Msig.Subsigs) + }, + "lsig.msig.thr": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Lsig.Msig.Threshold) + }, + "lsig.msig.v": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Lsig.Msig.Version) + }, + "lsig.sig": func(i *transactions.SignedTxnInBlock) interface{} { return &((*i).SignedTxnWithAD.SignedTxn.Lsig.Sig) }, + "msig": func(i *transactions.SignedTxnInBlock) interface{} { return &((*i).SignedTxnWithAD.SignedTxn.Msig) }, + "msig.subsig": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Msig.Subsigs) + }, + "msig.thr": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Msig.Threshold) + }, + "msig.v": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Msig.Version) + }, + "rc": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.ApplyData.CloseRewards) + }, + "rr": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.ApplyData.ReceiverRewards) + }, + "rs": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.ApplyData.SenderRewards) + }, + "sgnr": func(i *transactions.SignedTxnInBlock) interface{} { return &((*i).SignedTxnWithAD.SignedTxn.AuthAddr) }, + "sig": func(i *transactions.SignedTxnInBlock) interface{} { return &((*i).SignedTxnWithAD.SignedTxn.Sig) }, + "txn": func(i *transactions.SignedTxnInBlock) interface{} { return &((*i).SignedTxnWithAD.SignedTxn.Txn) }, + "txn.aamt": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetTransferTxnFields.AssetAmount) + }, + "txn.aclose": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetTransferTxnFields.AssetCloseTo) + }, + "txn.afrz": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetFreezeTxnFields.AssetFrozen) + }, + "txn.amt": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.Amount) + }, + "txn.apaa": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.ApplicationCallTxnFields.ApplicationArgs) + }, + "txn.apan": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.ApplicationCallTxnFields.OnCompletion) + }, + "txn.apap": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.ApplicationCallTxnFields.ApprovalProgram) + }, + "txn.apar": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetConfigTxnFields.AssetParams) + }, + "txn.apar.am": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetConfigTxnFields.AssetParams.MetadataHash) + }, + "txn.apar.an": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetConfigTxnFields.AssetParams.AssetName) + }, + "txn.apar.au": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetConfigTxnFields.AssetParams.URL) + }, + "txn.apar.c": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetConfigTxnFields.AssetParams.Clawback) + }, + "txn.apar.dc": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetConfigTxnFields.AssetParams.Decimals) + }, + "txn.apar.df": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetConfigTxnFields.AssetParams.DefaultFrozen) + }, + "txn.apar.f": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetConfigTxnFields.AssetParams.Freeze) + }, + "txn.apar.m": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetConfigTxnFields.AssetParams.Manager) + }, + "txn.apar.r": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetConfigTxnFields.AssetParams.Reserve) + }, + "txn.apar.t": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetConfigTxnFields.AssetParams.Total) + }, + "txn.apar.un": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetConfigTxnFields.AssetParams.UnitName) + }, + "txn.apas": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.ApplicationCallTxnFields.ForeignAssets) + }, + "txn.apat": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.ApplicationCallTxnFields.Accounts) + }, + "txn.apep": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.ApplicationCallTxnFields.ExtraProgramPages) + }, + "txn.apfa": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.ApplicationCallTxnFields.ForeignApps) + }, + "txn.apgs": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.ApplicationCallTxnFields.GlobalStateSchema) + }, + "txn.apgs.nbs": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.ApplicationCallTxnFields.GlobalStateSchema.NumByteSlice) + }, + "txn.apgs.nui": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.ApplicationCallTxnFields.GlobalStateSchema.NumUint) + }, + "txn.apid": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.ApplicationCallTxnFields.ApplicationID) + }, + "txn.apls": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.ApplicationCallTxnFields.LocalStateSchema) + }, + "txn.apls.nbs": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.ApplicationCallTxnFields.LocalStateSchema.NumByteSlice) + }, + "txn.apls.nui": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.ApplicationCallTxnFields.LocalStateSchema.NumUint) + }, + "txn.apsu": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.ApplicationCallTxnFields.ClearStateProgram) + }, + "txn.arcv": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetTransferTxnFields.AssetReceiver) + }, + "txn.asnd": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetTransferTxnFields.AssetSender) + }, + "txn.caid": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetConfigTxnFields.ConfigAsset) + }, + "txn.close": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.CloseRemainderTo) + }, + "txn.fadd": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetFreezeTxnFields.FreezeAccount) + }, + "txn.faid": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetFreezeTxnFields.FreezeAsset) + }, + "txn.fee": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.Header.Fee) + }, + "txn.fv": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.Header.FirstValid) + }, + "txn.gen": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.Header.GenesisID) + }, + "txn.gh": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.Header.GenesisHash) + }, + "txn.grp": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.Header.Group) + }, + "txn.lv": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.Header.LastValid) + }, + "txn.lx": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.Header.Lease) + }, + "txn.nonpart": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.KeyregTxnFields.Nonparticipation) + }, + "txn.note": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.Header.Note) + }, + "txn.rcv": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.PaymentTxnFields.Receiver) + }, + "txn.rekey": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.Header.RekeyTo) + }, + "txn.selkey": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.KeyregTxnFields.SelectionPK) + }, + "txn.snd": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.Header.Sender) + }, + "txn.sp": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.StateProof) + }, + "txn.sp.P": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.StateProof.PartProofs) + }, + "txn.sp.P.hsh": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.StateProof.PartProofs.HashFactory) + }, + "txn.sp.P.hsh.t": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.StateProof.PartProofs.HashFactory.HashType) + }, + "txn.sp.P.pth": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.StateProof.PartProofs.Path) + }, + "txn.sp.P.td": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.StateProof.PartProofs.TreeDepth) + }, + "txn.sp.S": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.StateProof.SigProofs) + }, + "txn.sp.S.hsh": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.StateProof.SigProofs.HashFactory) + }, + "txn.sp.S.hsh.t": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.StateProof.SigProofs.HashFactory.HashType) + }, + "txn.sp.S.pth": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.StateProof.SigProofs.Path) + }, + "txn.sp.S.td": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.StateProof.SigProofs.TreeDepth) + }, + "txn.sp.c": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.StateProof.SigCommit) + }, + "txn.sp.pr": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.StateProof.PositionsToReveal) + }, + "txn.sp.r": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.StateProof.Reveals) + }, + "txn.sp.v": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.StateProof.MerkleSignatureSaltVersion) + }, + "txn.sp.w": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.StateProof.SignedWeight) + }, + "txn.spmsg": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.Message) + }, + "txn.spmsg.P": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.Message.LnProvenWeight) + }, + "txn.spmsg.b": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.Message.BlockHeadersCommitment) + }, + "txn.spmsg.f": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.Message.FirstAttestedRound) + }, + "txn.spmsg.l": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.Message.LastAttestedRound) + }, + "txn.spmsg.v": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.Message.VotersCommitment) + }, + "txn.sprfkey": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.KeyregTxnFields.StateProofPK) + }, + "txn.sptype": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.StateProofTxnFields.StateProofType) + }, + "txn.type": func(i *transactions.SignedTxnInBlock) interface{} { return &((*i).SignedTxnWithAD.SignedTxn.Txn.Type) }, + "txn.votefst": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.KeyregTxnFields.VoteFirst) + }, + "txn.votekd": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.KeyregTxnFields.VoteKeyDilution) + }, + "txn.votekey": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.KeyregTxnFields.VotePK) + }, + "txn.votelst": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.KeyregTxnFields.VoteLast) + }, + "txn.xaid": func(i *transactions.SignedTxnInBlock) interface{} { + return &((*i).SignedTxnWithAD.SignedTxn.Txn.AssetTransferTxnFields.XferAsset) + }, +} diff --git a/conduit/init_provider.go b/conduit/init_provider.go index 4acefce5e..360066e12 100644 --- a/conduit/init_provider.go +++ b/conduit/init_provider.go @@ -11,8 +11,8 @@ type PipelineInitProvider struct { genesis *bookkeeping.Genesis } -// Genesis produces genesis pointer -func (a *PipelineInitProvider) Genesis() *bookkeeping.Genesis { +// GetGenesis produces genesis pointer +func (a *PipelineInitProvider) GetGenesis() *bookkeeping.Genesis { return a.genesis } diff --git a/conduit/pipeline.go b/conduit/pipeline.go index 97a972201..357057ae1 100644 --- a/conduit/pipeline.go +++ b/conduit/pipeline.go @@ -2,18 +2,22 @@ package conduit import ( "context" + "encoding/json" + "errors" "fmt" - "github.com/algorand/indexer/util/metrics" - log "github.com/sirupsen/logrus" - "github.com/spf13/viper" "os" + "path" "path/filepath" "runtime/pprof" "sync" "time" - "github.com/algorand/go-algorand-sdk/encoding/json" + "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/indexer/util/metrics" + log "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "gopkg.in/yaml.v3" "github.com/algorand/indexer/data" "github.com/algorand/indexer/exporters" @@ -145,7 +149,15 @@ type pipelineImpl struct { exporter *exporters.Exporter completeCallback []OnCompleteFunc - round basics.Round + pipelineMetadata PipelineMetaData + pipelineMetadataFilePath string +} + +// PipelineMetaData contains the metadata for the pipeline +type PipelineMetaData struct { + GenesisHash string `json:"genesis-hash"` + Network string `json:"network"` + NextRound uint64 `json:"next-round"` } func (p *pipelineImpl) Error() error { @@ -203,20 +215,6 @@ func (p *pipelineImpl) Init() error { // TODO Need to change interfaces to accept config of map[string]interface{} - // Initialize Exporter first since the pipeline derives its round from the Exporter - exporterLogger := log.New() - // Make sure we are thread-safe - exporterLogger.SetOutput(p.logger.Out) - exporterLogger.SetFormatter(makePluginLogFormatter(plugins.Exporter, (*p.exporter).Metadata().Name())) - - jsonEncode := string(json.Encode(p.cfg.Exporter.Config)) - err := (*p.exporter).Init(p.ctx, plugins.PluginConfig(jsonEncode), exporterLogger) - exporterName := (*p.exporter).Metadata().Name() - if err != nil { - return fmt.Errorf("Pipeline.Start(): could not initialize Exporter (%s): %w", exporterName, err) - } - p.logger.Infof("Initialized Exporter: %s", exporterName) - // Initialize Importer importerLogger := log.New() // Make sure we are thread-safe @@ -224,34 +222,48 @@ func (p *pipelineImpl) Init() error { importerName := (*p.importer).Metadata().Name() importerLogger.SetFormatter(makePluginLogFormatter(plugins.Importer, importerName)) - // TODO modify/fix ? - jsonEncode = string(json.Encode(p.cfg.Importer.Config)) - genesis, err := (*p.importer).Init(p.ctx, plugins.PluginConfig(jsonEncode), importerLogger) - + configs, err := yaml.Marshal(p.cfg.Importer.Config) + if err != nil { + return fmt.Errorf("Pipeline.Start(): could not serialize Importer.Config: %w", err) + } + genesis, err := (*p.importer).Init(p.ctx, plugins.PluginConfig(configs), importerLogger) if err != nil { return fmt.Errorf("Pipeline.Start(): could not initialize importer (%s): %w", importerName, err) } - p.round = basics.Round((*p.exporter).Round()) - err = (*p.exporter).HandleGenesis(*genesis) + + // initialize or load pipeline metadata + gh := crypto.HashObj(genesis).String() + p.pipelineMetadata.GenesisHash = gh + p.pipelineMetadata.Network = string(genesis.Network) + p.pipelineMetadata, err = p.initializeOrLoadBlockMetadata() if err != nil { - return fmt.Errorf("Pipeline.Start(): exporter could not handle genesis (%s): %w", exporterName, err) + return fmt.Errorf("Pipeline.Start(): could not read metadata: %w", err) } + if p.pipelineMetadata.GenesisHash != gh { + return fmt.Errorf("Pipeline.Start(): genesis hash in metadata does not match expected value: actual %s, expected %s", gh, p.pipelineMetadata.GenesisHash) + } + p.logger.Infof("Initialized Importer: %s", importerName) - // Initialize Processors + // InitProvider + round := basics.Round(p.pipelineMetadata.NextRound) var initProvider data.InitProvider = &PipelineInitProvider{ - currentRound: &p.round, + currentRound: &round, genesis: genesis, } p.initProvider = &initProvider + // Initialize Processors for idx, processor := range p.processors { processorLogger := log.New() // Make sure we are thread-safe processorLogger.SetOutput(p.logger.Out) processorLogger.SetFormatter(makePluginLogFormatter(plugins.Processor, (*processor).Metadata().Name())) - jsonEncode = string(json.Encode(p.cfg.Processors[idx].Config)) - err := (*processor).Init(p.ctx, *p.initProvider, plugins.PluginConfig(jsonEncode), processorLogger) + configs, err = yaml.Marshal(p.cfg.Processors[idx].Config) + if err != nil { + return fmt.Errorf("Pipeline.Start(): could not serialize Processors[%d].Config : %w", idx, err) + } + err := (*processor).Init(p.ctx, *p.initProvider, plugins.PluginConfig(configs), processorLogger) processorName := (*processor).Metadata().Name() if err != nil { return fmt.Errorf("Pipeline.Start(): could not initialize processor (%s): %w", processorName, err) @@ -259,6 +271,23 @@ func (p *pipelineImpl) Init() error { p.logger.Infof("Initialized Processor: %s", processorName) } + // Initialize Exporter + exporterLogger := log.New() + // Make sure we are thread-safe + exporterLogger.SetOutput(p.logger.Out) + exporterLogger.SetFormatter(makePluginLogFormatter(plugins.Exporter, (*p.exporter).Metadata().Name())) + + configs, err = yaml.Marshal(p.cfg.Exporter.Config) + if err != nil { + return fmt.Errorf("Pipeline.Start(): could not serialize Exporter.Config : %w", err) + } + err = (*p.exporter).Init(p.ctx, *p.initProvider, plugins.PluginConfig(configs), exporterLogger) + exporterName := (*p.exporter).Metadata().Name() + if err != nil { + return fmt.Errorf("Pipeline.Start(): could not initialize Exporter (%s): %w", exporterName, err) + } + p.logger.Infof("Initialized Exporter: %s", exporterName) + // Register callbacks. p.registerLifecycleCallbacks() return err @@ -325,9 +354,9 @@ func (p *pipelineImpl) Start() { return default: { - p.logger.Infof("Pipeline round: %v", p.round) + p.logger.Infof("Pipeline round: %v", p.pipelineMetadata.NextRound) // fetch block - blkData, err := (*p.importer).GetBlock(uint64(p.round)) + blkData, err := (*p.importer).GetBlock(p.pipelineMetadata.NextRound) if err != nil { p.logger.Errorf("%v", err) p.setError(err) @@ -363,12 +392,13 @@ func (p *pipelineImpl) Start() { } importTime := time.Since(start) // Ignore round 0 (which is empty). - if p.round > 0 { + if p.pipelineMetadata.NextRound > 0 { p.addMetrics(blkData, importTime) } // Increment Round p.setError(nil) - p.round++ + p.pipelineMetadata.NextRound++ + p.encodeMetadataToFile() } } @@ -380,6 +410,55 @@ func (p *pipelineImpl) Wait() { p.wg.Wait() } +func (p *pipelineImpl) encodeMetadataToFile() error { + tempFilename := fmt.Sprintf("%s.temp", p.pipelineMetadataFilePath) + file, err := os.Create(tempFilename) + if err != nil { + return fmt.Errorf("encodeMetadataToFile(): failed to create temp metadata file: %w", err) + } + defer file.Close() + err = json.NewEncoder(file).Encode(p.pipelineMetadata) + if err != nil { + return fmt.Errorf("encodeMetadataToFile(): failed to write temp metadata: %w", err) + } + + err = os.Rename(tempFilename, p.pipelineMetadataFilePath) + if err != nil { + return fmt.Errorf("encodeMetadataToFile(): failed to replace metadata file: %w", err) + } + return nil +} + +func (p *pipelineImpl) initializeOrLoadBlockMetadata() (PipelineMetaData, error) { + p.pipelineMetadataFilePath = path.Join(p.cfg.ConduitConfig.ConduitDataDir, "metadata.json") + if stat, err := os.Stat(p.pipelineMetadataFilePath); errors.Is(err, os.ErrNotExist) || (stat != nil && stat.Size() == 0) { + if stat != nil && stat.Size() == 0 { + err = os.Remove(p.pipelineMetadataFilePath) + if err != nil { + return p.pipelineMetadata, fmt.Errorf("Init(): error creating file: %w", err) + } + } + err = p.encodeMetadataToFile() + if err != nil { + return p.pipelineMetadata, fmt.Errorf("Init(): error creating file: %w", err) + } + } else { + if err != nil { + return p.pipelineMetadata, fmt.Errorf("error opening file: %w", err) + } + var data []byte + data, err = os.ReadFile(p.pipelineMetadataFilePath) + if err != nil { + return p.pipelineMetadata, fmt.Errorf("error reading metadata: %w", err) + } + err = json.Unmarshal(data, &p.pipelineMetadata) + if err != nil { + return p.pipelineMetadata, fmt.Errorf("error reading metadata: %w", err) + } + } + return p.pipelineMetadata, nil +} + // MakePipeline creates a Pipeline func MakePipeline(ctx context.Context, cfg *PipelineConfig, logger *log.Logger) (Pipeline, error) { diff --git a/conduit/pipeline_test.go b/conduit/pipeline_test.go index cc1a18f05..89663138c 100644 --- a/conduit/pipeline_test.go +++ b/conduit/pipeline_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/algorand/go-algorand/crypto" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -145,13 +146,14 @@ var uniqueBlockData = data.BlockData{ type mockImporter struct { mock.Mock importers.Importer + genesis bookkeeping.Genesis finalRound basics.Round returnError bool onCompleteError bool } func (m *mockImporter) Init(_ context.Context, _ plugins.PluginConfig, _ *log.Logger) (*bookkeeping.Genesis, error) { - return &bookkeeping.Genesis{}, nil + return &m.genesis, nil } func (m *mockImporter) Close() error { @@ -236,7 +238,7 @@ func (m *mockExporter) Metadata() exporters.ExporterMetadata { } } -func (m *mockExporter) Init(_ context.Context, _ plugins.PluginConfig, _ *log.Logger) error { +func (m *mockExporter) Init(_ context.Context, _ data.InitProvider, _ plugins.PluginConfig, _ *log.Logger) error { return nil } @@ -244,14 +246,6 @@ func (m *mockExporter) Close() error { return nil } -func (m *mockExporter) HandleGenesis(_ bookkeeping.Genesis) error { - return nil -} - -func (m *mockExporter) Round() uint64 { - return 0 -} - func (m *mockExporter) Receive(exportData data.BlockData) error { var err error if m.returnError { @@ -294,14 +288,17 @@ func TestPipelineRun(t *testing.T) { pImpl := pipelineImpl{ ctx: ctx, cf: cf, - cfg: &PipelineConfig{}, logger: log.New(), initProvider: nil, importer: &pImporter, processors: []*processors.Processor{&pProcessor}, - completeCallback: []OnCompleteFunc{cbComplete.OnComplete}, exporter: &pExporter, - round: 0, + completeCallback: []OnCompleteFunc{cbComplete.OnComplete}, + pipelineMetadata: PipelineMetaData{ + NextRound: 0, + GenesisHash: "", + }, + pipelineMetadataFilePath: filepath.Join(t.TempDir(), "metadata.json"), } go func() { @@ -331,6 +328,10 @@ func TestPipelineCpuPidFiles(t *testing.T) { pImpl := pipelineImpl{ cfg: &PipelineConfig{ + ConduitConfig: &Config{ + Flags: nil, + ConduitDataDir: t.TempDir(), + }, Importer: NameConfigPair{ Name: "", Config: map[string]interface{}{}, @@ -351,7 +352,11 @@ func TestPipelineCpuPidFiles(t *testing.T) { importer: &pImporter, processors: []*processors.Processor{&pProcessor}, exporter: &pExporter, - round: 0, + pipelineMetadata: PipelineMetaData{ + GenesisHash: "", + Network: "", + NextRound: 0, + }, } err := pImpl.Init() @@ -409,7 +414,7 @@ func TestPipelineErrors(t *testing.T) { processors: []*processors.Processor{&pProcessor}, exporter: &pExporter, completeCallback: []OnCompleteFunc{cbComplete.OnComplete}, - round: 0, + pipelineMetadata: PipelineMetaData{}, } mImporter.returnError = true @@ -476,7 +481,6 @@ func Test_pipelineImpl_registerLifecycleCallbacks(t *testing.T) { importer: &pImporter, processors: []*processors.Processor{&pProcessor, &pProcessor}, exporter: &pExporter, - round: 0, } // Each plugin implements the Completed interface, so there should be 4 @@ -484,3 +488,174 @@ func Test_pipelineImpl_registerLifecycleCallbacks(t *testing.T) { pImpl.registerLifecycleCallbacks() assert.Len(t, pImpl.completeCallback, 4) } + +// TestBlockMetaDataFile tests that metadata.json file is created as expected +func TestBlockMetaDataFile(t *testing.T) { + + var pImporter importers.Importer = &mockImporter{} + var pProcessor processors.Processor = &mockProcessor{} + var pExporter exporters.Exporter = &mockExporter{} + + datadir := t.TempDir() + pImpl := pipelineImpl{ + cfg: &PipelineConfig{ + ConduitConfig: &Config{ + Flags: nil, + ConduitDataDir: datadir, + }, + Importer: NameConfigPair{ + Name: "", + Config: map[string]interface{}{}, + }, + Processors: []NameConfigPair{ + { + Name: "", + Config: map[string]interface{}{}, + }, + }, + Exporter: NameConfigPair{ + Name: "", + Config: map[string]interface{}{}, + }, + }, + logger: log.New(), + initProvider: nil, + importer: &pImporter, + processors: []*processors.Processor{&pProcessor}, + exporter: &pExporter, + pipelineMetadata: PipelineMetaData{ + NextRound: 3, + }, + } + + err := pImpl.Init() + assert.NoError(t, err) + + // Test that file is created + blockMetaDataFile := filepath.Join(datadir, "metadata.json") + _, err = os.Stat(blockMetaDataFile) + assert.NoError(t, err) + + // Test that file loads correctly + metaData, err := pImpl.initializeOrLoadBlockMetadata() + assert.NoError(t, err) + assert.Equal(t, pImpl.pipelineMetadata.GenesisHash, metaData.GenesisHash) + assert.Equal(t, pImpl.pipelineMetadata.NextRound, metaData.NextRound) + assert.Equal(t, pImpl.pipelineMetadata.Network, metaData.Network) + + // Test that file encodes correctly + pImpl.pipelineMetadata.GenesisHash = "HASH" + pImpl.pipelineMetadata.NextRound = 7 + err = pImpl.encodeMetadataToFile() + assert.NoError(t, err) + metaData, err = pImpl.initializeOrLoadBlockMetadata() + assert.NoError(t, err) + assert.Equal(t, "HASH", metaData.GenesisHash) + assert.Equal(t, uint64(7), metaData.NextRound) + assert.Equal(t, pImpl.pipelineMetadata.Network, metaData.Network) + + // invalid file directory + pImpl.cfg.ConduitConfig.ConduitDataDir = "datadir" + metaData, err = pImpl.initializeOrLoadBlockMetadata() + assert.Contains(t, err.Error(), "Init(): error creating file") + err = pImpl.encodeMetadataToFile() + assert.Contains(t, err.Error(), "encodeMetadataToFile(): failed to create temp metadata file") +} + +func TestGenesisHash(t *testing.T) { + var pImporter importers.Importer = &mockImporter{genesis: bookkeeping.Genesis{Network: "test"}} + var pProcessor processors.Processor = &mockProcessor{} + var pExporter exporters.Exporter = &mockExporter{} + datadir := t.TempDir() + pImpl := pipelineImpl{ + cfg: &PipelineConfig{ + ConduitConfig: &Config{ + Flags: nil, + ConduitDataDir: datadir, + }, + Importer: NameConfigPair{ + Name: "", + Config: map[string]interface{}{}, + }, + Processors: []NameConfigPair{ + { + Name: "", + Config: map[string]interface{}{}, + }, + }, + Exporter: NameConfigPair{ + Name: "", + Config: map[string]interface{}{}, + }, + }, + logger: log.New(), + initProvider: nil, + importer: &pImporter, + processors: []*processors.Processor{&pProcessor}, + exporter: &pExporter, + pipelineMetadata: PipelineMetaData{ + GenesisHash: "", + Network: "", + NextRound: 3, + }, + } + + // write genesis hash to metadata.json + err := pImpl.Init() + assert.NoError(t, err) + + // read genesis hash from metadata.json + blockmetaData, err := pImpl.initializeOrLoadBlockMetadata() + assert.NoError(t, err) + assert.Equal(t, blockmetaData.GenesisHash, crypto.HashObj(&bookkeeping.Genesis{Network: "test"}).String()) + assert.Equal(t, blockmetaData.Network, "test") + + // mock a different genesis hash + pImporter = &mockImporter{genesis: bookkeeping.Genesis{Network: "dev"}} + pImpl.importer = &pImporter + err = pImpl.Init() + assert.Contains(t, err.Error(), "genesis hash in metadata does not match") +} + +func TestInitError(t *testing.T) { + var pImporter importers.Importer = &mockImporter{genesis: bookkeeping.Genesis{Network: "test"}} + var pProcessor processors.Processor = &mockProcessor{} + var pExporter exporters.Exporter = &mockExporter{} + datadir := "data" + pImpl := pipelineImpl{ + cfg: &PipelineConfig{ + ConduitConfig: &Config{ + Flags: nil, + ConduitDataDir: datadir, + }, + Importer: NameConfigPair{ + Name: "", + Config: map[string]interface{}{}, + }, + Processors: []NameConfigPair{ + { + Name: "", + Config: map[string]interface{}{}, + }, + }, + Exporter: NameConfigPair{ + Name: "unknown", + Config: map[string]interface{}{}, + }, + }, + logger: log.New(), + initProvider: nil, + importer: &pImporter, + processors: []*processors.Processor{&pProcessor}, + exporter: &pExporter, + pipelineMetadata: PipelineMetaData{ + GenesisHash: "", + Network: "", + NextRound: 3, + }, + } + + // could not read metadata + err := pImpl.Init() + assert.Contains(t, err.Error(), "could not read metadata") +} diff --git a/data/block_export_data.go b/data/block_export_data.go index 23f022c41..ed86afc67 100644 --- a/data/block_export_data.go +++ b/data/block_export_data.go @@ -18,7 +18,7 @@ type RoundProvider interface { // InitProvider is the interface that can be used when initializing to get common algod related // variables type InitProvider interface { - Genesis() *bookkeeping.Genesis + GetGenesis() *bookkeeping.Genesis NextDBRound() basics.Round } diff --git a/exporters/README.md b/exporters/README.md index fad759a26..8667d83f2 100644 --- a/exporters/README.md +++ b/exporters/README.md @@ -25,14 +25,7 @@ type Exporter interface { // Receive is called for each block to be processed by the exporter. // Should return an error on failure--retries are configurable. Receive(exportData data.BlockData) error - - // HandleGenesis is an Exporter's opportunity to do initial validation and handling of the Genesis block. - // If validation (such as a check to ensure `genesis` matches a previously stored genesis block) or handling fails, - // it returns an error. - HandleGenesis(genesis bookkeeping.Genesis) error - - // Round returns the next round not yet processed by the Exporter. Atomically updated when Receive successfully completes. - Round() uint64 + } ``` diff --git a/exporters/example/example_exporter.go b/exporters/example/example_exporter.go index af56f7a53..b73ce780d 100644 --- a/exporters/example/example_exporter.go +++ b/exporters/example/example_exporter.go @@ -2,6 +2,7 @@ package example import ( "context" + "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/indexer/data" "github.com/algorand/indexer/exporters" @@ -34,7 +35,7 @@ func (exp *exampleExporter) Metadata() exporters.ExporterMetadata { } // Init provides the opportunity for your Exporter to initialize connections, store config variables, etc. -func (exp *exampleExporter) Init(_ context.Context, _ plugins.PluginConfig, _ *logrus.Logger) error { +func (exp *exampleExporter) Init(_ context.Context, _ data.InitProvider, _ plugins.PluginConfig, _ *logrus.Logger) error { panic("not implemented") } diff --git a/exporters/example/example_exporter_test.go b/exporters/example/example_exporter_test.go index c72a8e81e..00fe9165c 100644 --- a/exporters/example/example_exporter_test.go +++ b/exporters/example/example_exporter_test.go @@ -4,7 +4,6 @@ import ( "context" "testing" - "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/indexer/data" "github.com/algorand/indexer/plugins" "github.com/stretchr/testify/assert" @@ -23,7 +22,7 @@ func TestExporterMetadata(t *testing.T) { } func TestExporterInit(t *testing.T) { - assert.Panics(t, func() { exExp.Init(context.Background(), "", nil) }) + assert.Panics(t, func() { exExp.Init(context.Background(), nil, "", nil) }) } func TestExporterConfig(t *testing.T) { @@ -37,11 +36,3 @@ func TestExporterClose(t *testing.T) { func TestExporterReceive(t *testing.T) { assert.Panics(t, func() { exExp.Receive(data.BlockData{}) }) } - -func TestExporterHandleGenesis(t *testing.T) { - assert.Panics(t, func() { exExp.HandleGenesis(bookkeeping.Genesis{}) }) -} - -func TestExporterRound(t *testing.T) { - assert.Panics(t, func() { exExp.Round() }) -} diff --git a/exporters/exporter.go b/exporters/exporter.go index 233317ba2..e66ace84d 100644 --- a/exporters/exporter.go +++ b/exporters/exporter.go @@ -2,7 +2,7 @@ package exporters import ( "context" - "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/indexer/data" "github.com/algorand/indexer/plugins" "github.com/sirupsen/logrus" @@ -17,7 +17,7 @@ type Exporter interface { // Typically used for things like initializing network connections. // The ExporterConfig passed to Connect will contain the Unmarhsalled config file specific to this plugin. // Should return an error if it fails--this will result in the Indexer process terminating. - Init(ctx context.Context, cfg plugins.PluginConfig, logger *logrus.Logger) error + Init(ctx context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig, logger *logrus.Logger) error // Config returns the configuration options used to create an Exporter. // Initialized during Connect, it should return nil until the Exporter has been Connected. @@ -31,12 +31,4 @@ type Exporter interface { // Receive is called for each block to be processed by the exporter. // Should return an error on failure--retries are configurable. Receive(exportData data.BlockData) error - - // HandleGenesis is an Exporter's opportunity to do initial validation and handling of the Genesis block. - // If validation (such as a check to ensure `genesis` matches a previously stored genesis block) or handling fails, - // it returns an error. - HandleGenesis(genesis bookkeeping.Genesis) error - - // Round returns the next round not yet processed by the Exporter. Atomically updated when Receive successfully completes. - Round() uint64 } diff --git a/exporters/filewriter/file_exporter.go b/exporters/filewriter/file_exporter.go index 78fdd8cf7..b6152c6ef 100644 --- a/exporters/filewriter/file_exporter.go +++ b/exporters/filewriter/file_exporter.go @@ -11,9 +11,6 @@ import ( "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" - "github.com/algorand/go-algorand/crypto" - "github.com/algorand/go-algorand/data/bookkeeping" - "github.com/algorand/indexer/data" "github.com/algorand/indexer/exporters" "github.com/algorand/indexer/plugins" @@ -22,11 +19,9 @@ import ( const exporterName = "file_writer" type fileExporter struct { - round uint64 - blockMetadataFile string - blockMetadata BlockMetaData - cfg ExporterConfig - logger *logrus.Logger + round uint64 + cfg ExporterConfig + logger *logrus.Logger } var fileExporterMetadata = exporters.ExporterMetadata{ @@ -35,28 +30,19 @@ var fileExporterMetadata = exporters.ExporterMetadata{ ExpDeprecated: false, } -// BlockMetaData contains the metadata for block file storage -type BlockMetaData struct { - GenesisHash string `json:"genesis-hash"` - Network string `json:"network"` - NextRound uint64 `json:"next-round"` -} - // Constructor is the ExporterConstructor implementation for the filewriter exporter type Constructor struct{} // New initializes a fileExporter func (c *Constructor) New() exporters.Exporter { - return &fileExporter{ - round: 0, - } + return &fileExporter{} } func (exp *fileExporter) Metadata() exporters.ExporterMetadata { return fileExporterMetadata } -func (exp *fileExporter) Init(_ context.Context, cfg plugins.PluginConfig, logger *logrus.Logger) error { +func (exp *fileExporter) Init(_ context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig, logger *logrus.Logger) error { exp.logger = logger if err := exp.unmarhshalConfig(string(cfg)); err != nil { return fmt.Errorf("connect failure in unmarshalConfig: %w", err) @@ -67,42 +53,7 @@ func (exp *fileExporter) Init(_ context.Context, cfg plugins.PluginConfig, logge } else if err != nil { return fmt.Errorf("Init() error: %w", err) } - // initialize block metadata - file := path.Join(exp.cfg.BlocksDir, "metadata.json") - exp.blockMetadataFile = file - var stat os.FileInfo - if stat, err = os.Stat(file); errors.Is(err, os.ErrNotExist) || (stat != nil && stat.Size() == 0) { - if stat != nil && stat.Size() == 0 { - // somehow it did not finish initializing - err = os.Remove(file) - if err != nil { - return fmt.Errorf("Init(): error creating file: %w", err) - } - } - err = exp.encodeMetadataToFile() - if err != nil { - return fmt.Errorf("Init(): error creating file: %w", err) - } - exp.blockMetadata = BlockMetaData{ - GenesisHash: "", - Network: "", - NextRound: exp.round, - } - } else { - if err != nil { - return fmt.Errorf("error opening file: %w", err) - } - var data []byte - data, err = os.ReadFile(file) - if err != nil { - return fmt.Errorf("error reading metadata: %w", err) - } - err = json.Unmarshal(data, &exp.blockMetadata) - if err != nil { - return fmt.Errorf("error reading metadata: %w", err) - } - } - exp.round = exp.blockMetadata.NextRound + exp.round = uint64(initProvider.NextDBRound()) return err } @@ -111,33 +62,13 @@ func (exp *fileExporter) Config() plugins.PluginConfig { return plugins.PluginConfig(ret) } -func (exp *fileExporter) encodeMetadataToFile() error { - tempFilename := fmt.Sprintf("%s.temp", exp.blockMetadataFile) - file, err := os.Create(tempFilename) - if err != nil { - return fmt.Errorf("encodeMetadataToFile(): failed to create temp metadata file: %w", err) - } - defer file.Close() - err = json.NewEncoder(file).Encode(exp.blockMetadata) - if err != nil { - return fmt.Errorf("encodeMetadataToFile(): failed to write temp metadata: %w", err) - } - - err = os.Rename(tempFilename, exp.blockMetadataFile) - if err != nil { - return fmt.Errorf("encodeMetadataToFile(): failed to replace metadata file: %w", err) - } - - return nil -} - func (exp *fileExporter) Close() error { exp.logger.Infof("latest round on file: %d", exp.round) return nil } func (exp *fileExporter) Receive(exportData data.BlockData) error { - if exp.blockMetadataFile == "" { + if exp.logger == nil { return fmt.Errorf("exporter not initialized") } if exportData.Round() != exp.round { @@ -156,36 +87,9 @@ func (exp *fileExporter) Receive(exportData data.BlockData) error { } exp.logger.Infof("Added block %d", exportData.Round()) exp.round++ - exp.blockMetadata.NextRound = exp.round - err = exp.encodeMetadataToFile() - if err != nil { - return fmt.Errorf("Receive() metadata encoding err %w", err) - } return nil } -func (exp *fileExporter) HandleGenesis(genesis bookkeeping.Genesis) error { - // check genesis hash - gh := crypto.HashObj(genesis).String() - if exp.blockMetadata.GenesisHash == "" { - exp.blockMetadata.GenesisHash = gh - exp.blockMetadata.Network = string(genesis.Network) - err := exp.encodeMetadataToFile() - if err != nil { - return fmt.Errorf("HandleGenesis() metadata encoding err %w", err) - } - } else { - if exp.blockMetadata.GenesisHash != gh { - return fmt.Errorf("HandleGenesis() genesis hash in metadata does not match expected value: actual %s, expected %s", gh, exp.blockMetadata.GenesisHash) - } - } - return nil -} - -func (exp *fileExporter) Round() uint64 { - return exp.round -} - func (exp *fileExporter) unmarhshalConfig(cfg string) error { return yaml.Unmarshal([]byte(cfg), &exp.cfg) } diff --git a/exporters/filewriter/file_exporter_test.go b/exporters/filewriter/file_exporter_test.go index befbfc8b4..5737d7ecc 100644 --- a/exporters/filewriter/file_exporter_test.go +++ b/exporters/filewriter/file_exporter_test.go @@ -1,27 +1,26 @@ -package filewriter_test +package filewriter import ( "context" "encoding/json" "fmt" - "io/ioutil" "os" + "path/filepath" "testing" - "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/indexer/data" - "github.com/algorand/indexer/exporters/filewriter" "github.com/algorand/indexer/plugins" + testutil "github.com/algorand/indexer/util/test" "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" ) var logger *logrus.Logger -var fileCons = &filewriter.Constructor{} -var config = "block-dir: /tmp/blocks\n" +var fileCons = &Constructor{} +var round = basics.Round(2) func init() { logger, _ = test.NewNullLogger() @@ -38,80 +37,20 @@ func TestExporterMetadata(t *testing.T) { } func TestExporterInit(t *testing.T) { - ctx := context.Background() - fileExp := fileCons.New() - assert.Equal(t, uint64(0), fileExp.Round()) - // creates a new output file - err := fileExp.Init(ctx, plugins.PluginConfig(config), logger) + config := fmt.Sprintf("block-dir: %s/blocks\n", t.TempDir()) + fileExp := fileExporter{} + defer fileExp.Close() + err := fileExp.Init(context.Background(), testutil.MockedInitProvider(&round), plugins.PluginConfig(config), logger) assert.NoError(t, err) pluginConfig := fileExp.Config() assert.Equal(t, config, string(pluginConfig)) - assert.Equal(t, uint64(0), fileExp.Round()) - fileExp.Close() - // can open existing file - err = fileExp.Init(ctx, plugins.PluginConfig(config), logger) - assert.NoError(t, err) - fileExp.Close() - // re-initializes empty file - path := "/tmp/blocks/metadata.json" - assert.NoError(t, os.Remove(path)) - f, err := os.Create(path) - f.Close() - assert.NoError(t, err) - err = fileExp.Init(ctx, plugins.PluginConfig(config), logger) - assert.NoError(t, err) - fileExp.Close() -} - -func TestExporterHandleGenesis(t *testing.T) { - ctx := context.Background() - fileExp := fileCons.New() - fileExp.Init(ctx, plugins.PluginConfig(config), logger) - genesisA := bookkeeping.Genesis{ - SchemaID: "test", - Network: "test", - Proto: "test", - Allocation: nil, - RewardsPool: "AAAAAAA", - FeeSink: "AAAAAAA", - Timestamp: 1234, - Comment: "", - DevMode: true, - } - err := fileExp.HandleGenesis(genesisA) - fileExp.Close() - assert.NoError(t, err) - configs, err := ioutil.ReadFile("/tmp/blocks/metadata.json") - assert.NoError(t, err) - var blockMetaData filewriter.BlockMetaData - err = json.Unmarshal(configs, &blockMetaData) - assert.Equal(t, uint64(0), blockMetaData.NextRound) - assert.Equal(t, string(genesisA.Network), blockMetaData.Network) - assert.Equal(t, crypto.HashObj(genesisA).String(), blockMetaData.GenesisHash) - - // genesis mismatch - fileExp.Init(ctx, plugins.PluginConfig(config), logger) - genesisB := bookkeeping.Genesis{ - SchemaID: "test", - Network: "test", - Proto: "test", - Allocation: nil, - RewardsPool: "AAAAAAA", - FeeSink: "AAAAAAA", - Timestamp: 5678, - Comment: "", - DevMode: false, - } - - err = fileExp.HandleGenesis(genesisB) - assert.Contains(t, err.Error(), "genesis hash in metadata does not match expected value") - fileExp.Close() - + assert.Equal(t, uint64(round), fileExp.round) } func TestExporterReceive(t *testing.T) { - ctx := context.Background() - fileExp := fileCons.New() + config := fmt.Sprintf("block-dir: %s/blocks\n", t.TempDir()) + fileExp := fileExporter{} + defer fileExp.Close() block := data.BlockData{ BlockHeader: bookkeeping.BlockHeader{ Round: 3, @@ -120,34 +59,17 @@ func TestExporterReceive(t *testing.T) { Delta: nil, Certificate: nil, } - // exporter not initialized + err := fileExp.Receive(block) assert.Contains(t, err.Error(), "exporter not initialized") - // initialize - fileExp.Init(ctx, plugins.PluginConfig(config), logger) - - // incorrect round + err = fileExp.Init(context.Background(), testutil.MockedInitProvider(&round), plugins.PluginConfig(config), logger) + assert.Nil(t, err) err = fileExp.Receive(block) - assert.Contains(t, err.Error(), "received round 3, expected round 0") + assert.Contains(t, err.Error(), "received round 3, expected round 2") - // genesis - genesis := bookkeeping.Genesis{ - SchemaID: "test", - Network: "test", - Proto: "test", - Allocation: nil, - RewardsPool: "AAAAAAA", - FeeSink: "AAAAAAA", - Timestamp: 1234, - Comment: "", - DevMode: true, - } - err = fileExp.HandleGenesis(genesis) - assert.NoError(t, err) - - // write block to file - for i := 0; i < 5; i++ { + // write block to file; pipeline starts at round 2, set in MockedInitProvider + for i := 2; i < 7; i++ { block = data.BlockData{ BlockHeader: bookkeeping.BlockHeader{ Round: basics.Round(i), @@ -158,44 +80,15 @@ func TestExporterReceive(t *testing.T) { } err = fileExp.Receive(block) assert.NoError(t, err) - assert.Equal(t, uint64(i+1), fileExp.Round()) + assert.Equal(t, uint64(i+1), fileExp.round) } - fileExp.Close() // written data are valid - for i := 0; i < 5; i++ { - b, _ := os.ReadFile(fmt.Sprintf("/tmp/blocks/block_%d.json", i)) + for i := 2; i < 7; i++ { + b, _ := os.ReadFile(filepath.Join(fileExp.cfg.BlocksDir, fmt.Sprintf("block_%d.json", i))) var blockData data.BlockData err = json.Unmarshal(b, &blockData) assert.NoError(t, err) } - - // should continue from round 6 after restart - fileExp.Init(ctx, plugins.PluginConfig(config), logger) - assert.Equal(t, uint64(5), fileExp.Round()) - fileExp.Close() -} - -func TestExporterClose(t *testing.T) { - ctx := context.Background() - fileExp := fileCons.New() - fileExp.Init(ctx, plugins.PluginConfig(config), logger) - block := data.BlockData{ - BlockHeader: bookkeeping.BlockHeader{ - Round: 5, - }, - Payset: nil, - Delta: nil, - Certificate: nil, - } - fileExp.Receive(block) - err := fileExp.Close() - assert.NoError(t, err) - // assert round is updated correctly - configs, err := ioutil.ReadFile("/tmp/blocks/metadata.json") - assert.NoError(t, err) - var blockMetaData filewriter.BlockMetaData - err = json.Unmarshal(configs, &blockMetaData) - assert.Equal(t, uint64(6), blockMetaData.NextRound) } diff --git a/exporters/noop/noop_exporter.go b/exporters/noop/noop_exporter.go index 73269d9c6..203bac856 100644 --- a/exporters/noop/noop_exporter.go +++ b/exporters/noop/noop_exporter.go @@ -3,6 +3,7 @@ package noop import ( "context" "fmt" + "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/indexer/data" "github.com/algorand/indexer/exporters" @@ -31,16 +32,14 @@ type Constructor struct{} // New initializes a noopExporter func (c *Constructor) New() exporters.Exporter { - return &noopExporter{ - round: 0, - } + return &noopExporter{} } func (exp *noopExporter) Metadata() exporters.ExporterMetadata { return noopExporterMetadata } -func (exp *noopExporter) Init(_ context.Context, cfg plugins.PluginConfig, _ *logrus.Logger) error { +func (exp *noopExporter) Init(_ context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig, _ *logrus.Logger) error { if err := yaml.Unmarshal([]byte(cfg), &exp.cfg); err != nil { return fmt.Errorf("init failure in unmarshalConfig: %v", err) } diff --git a/exporters/noop/noop_exporter_test.go b/exporters/noop/noop_exporter_test.go index 366fdf992..ebaf4ed3e 100644 --- a/exporters/noop/noop_exporter_test.go +++ b/exporters/noop/noop_exporter_test.go @@ -8,6 +8,7 @@ import ( "github.com/algorand/indexer/data" "github.com/algorand/indexer/exporters" "github.com/algorand/indexer/plugins" + testutil "github.com/algorand/indexer/util/test" "github.com/stretchr/testify/assert" "gopkg.in/yaml.v3" ) @@ -33,7 +34,7 @@ func TestExporterMetadata(t *testing.T) { } func TestExporterInit(t *testing.T) { - assert.NoError(t, ne.Init(context.Background(), "", nil)) + assert.NoError(t, ne.Init(context.Background(), testutil.MockedInitProvider(nil), "", nil)) } func TestExporterConfig(t *testing.T) { @@ -42,7 +43,7 @@ func TestExporterConfig(t *testing.T) { if err != nil { t.Fatalf("unable to Marshal default noop.ExporterConfig: %v", err) } - assert.NoError(t, ne.Init(context.Background(), "", nil)) + assert.NoError(t, ne.Init(context.Background(), testutil.MockedInitProvider(nil), "", nil)) assert.Equal(t, plugins.PluginConfig(expected), ne.Config()) } @@ -50,18 +51,6 @@ func TestExporterClose(t *testing.T) { assert.NoError(t, ne.Close()) } -func TestExporterHandleGenesis(t *testing.T) { - assert.NoError(t, ne.HandleGenesis(bookkeeping.Genesis{})) -} - -func TestExporterStartRound(t *testing.T) { - assert.NoError(t, ne.Init(context.Background(), "", nil)) - assert.Equal(t, uint64(0), ne.Round()) - assert.NoError(t, ne.Init(context.Background(), "round: 55", nil)) - assert.Equal(t, uint64(55), ne.Round()) - -} - func TestExporterRoundReceive(t *testing.T) { eData := data.BlockData{ BlockHeader: bookkeeping.BlockHeader{ @@ -69,5 +58,4 @@ func TestExporterRoundReceive(t *testing.T) { }, } assert.NoError(t, ne.Receive(eData)) - assert.Equal(t, uint64(6), ne.Round()) } diff --git a/exporters/postgresql/postgresql_exporter.go b/exporters/postgresql/postgresql_exporter.go index 75ffe572b..2b77f4312 100644 --- a/exporters/postgresql/postgresql_exporter.go +++ b/exporters/postgresql/postgresql_exporter.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "github.com/algorand/indexer/exporters/util" + "github.com/algorand/indexer/importer" "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" @@ -18,7 +19,6 @@ import ( "github.com/algorand/indexer/idb" // Necessary to ensure the postgres implementation has been registered in the idb factory _ "github.com/algorand/indexer/idb/postgres" - "github.com/algorand/indexer/importer" "github.com/algorand/indexer/plugins" ) @@ -46,16 +46,14 @@ type Constructor struct{} // New initializes a postgresqlExporter func (c *Constructor) New() exporters.Exporter { - return &postgresqlExporter{ - round: 0, - } + return &postgresqlExporter{} } func (exp *postgresqlExporter) Metadata() exporters.ExporterMetadata { return postgresqlExporterMetadata } -func (exp *postgresqlExporter) Init(ctx context.Context, cfg plugins.PluginConfig, logger *logrus.Logger) error { +func (exp *postgresqlExporter) Init(ctx context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig, logger *logrus.Logger) error { exp.ctx, exp.cf = context.WithCancel(ctx) dbName := "postgres" exp.logger = logger @@ -69,20 +67,32 @@ func (exp *postgresqlExporter) Init(ctx context.Context, cfg plugins.PluginConfi var opts idb.IndexerDbOptions opts.MaxConn = exp.cfg.MaxConn opts.ReadOnly = false + + // for some reason when ConnectionString is empty, it's automatically + // connecting to a local instance that's running. + // this behavior can be reproduced in TestConnectDbFailure. + if !exp.cfg.Test && exp.cfg.ConnectionString == "" { + return fmt.Errorf("connection string is empty for %s", dbName) + } db, ready, err := idb.IndexerDbByName(dbName, exp.cfg.ConnectionString, opts, exp.logger) if err != nil { return fmt.Errorf("connect failure constructing db, %s: %v", dbName, err) } exp.db = db <-ready - rnd, err := exp.db.GetNextRoundToAccount() - if err == nil { - exp.round = rnd - } else if err == idb.ErrorNotInitialized { - exp.round = 0 - } else { - return fmt.Errorf("Init() err getting next round: %v", err) + _, err = importer.EnsureInitialImport(exp.db, *initProvider.GetGenesis()) + if err != nil { + return fmt.Errorf("error importing genesis: %v", err) + } + dbRound, err := db.GetNextRoundToAccount() + if err != nil { + return fmt.Errorf("error getting next db round : %v", err) + } + if uint64(initProvider.NextDBRound()) != dbRound { + return fmt.Errorf("initializing block round %d but next round to account is %d", initProvider.NextDBRound(), dbRound) } + exp.round = uint64(initProvider.NextDBRound()) + // if data pruning is enabled if !exp.cfg.Test && exp.cfg.Delete.Rounds > 0 { exp.dm = util.MakeDataManager(exp.ctx, &exp.cfg.Delete, exp.db, logger) @@ -135,17 +145,6 @@ func (exp *postgresqlExporter) Receive(exportData data.BlockData) error { return nil } -func (exp *postgresqlExporter) HandleGenesis(genesis bookkeeping.Genesis) error { - _, err := importer.EnsureInitialImport(exp.db, genesis) - return err -} - -func (exp *postgresqlExporter) Round() uint64 { - // should we try to retrieve this from the db? That could fail. - // return exp.db.GetNextRoundToAccount() - return exp.round -} - func (exp *postgresqlExporter) unmarhshalConfig(cfg string) error { return yaml.Unmarshal([]byte(cfg), &exp.cfg) } diff --git a/exporters/postgresql/postgresql_exporter_test.go b/exporters/postgresql/postgresql_exporter_test.go index 5a7f06723..4957ca081 100644 --- a/exporters/postgresql/postgresql_exporter_test.go +++ b/exporters/postgresql/postgresql_exporter_test.go @@ -11,6 +11,7 @@ import ( "gopkg.in/yaml.v3" "github.com/algorand/go-algorand/agreement" + "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/ledger/ledgercore" @@ -18,10 +19,12 @@ import ( "github.com/algorand/indexer/exporters/util" _ "github.com/algorand/indexer/idb/dummy" "github.com/algorand/indexer/plugins" + testutil "github.com/algorand/indexer/util/test" ) var pgsqlConstructor = &Constructor{} var logger *logrus.Logger +var round = basics.Round(0) func init() { logger, _ = test.NewNullLogger() @@ -39,20 +42,20 @@ func TestExporterMetadata(t *testing.T) { func TestConnectDisconnectSuccess(t *testing.T) { pgsqlExp := pgsqlConstructor.New() cfg := plugins.PluginConfig("test: true\nconnection-string: ''") - assert.NoError(t, pgsqlExp.Init(context.Background(), cfg, logger)) + assert.NoError(t, pgsqlExp.Init(context.Background(), testutil.MockedInitProvider(&round), cfg, logger)) assert.NoError(t, pgsqlExp.Close()) } func TestConnectUnmarshalFailure(t *testing.T) { pgsqlExp := pgsqlConstructor.New() cfg := plugins.PluginConfig("'") - assert.ErrorContains(t, pgsqlExp.Init(context.Background(), cfg, logger), "connect failure in unmarshalConfig") + assert.ErrorContains(t, pgsqlExp.Init(context.Background(), testutil.MockedInitProvider(&round), cfg, logger), "connect failure in unmarshalConfig") } func TestConnectDbFailure(t *testing.T) { pgsqlExp := pgsqlConstructor.New() cfg := plugins.PluginConfig("") - assert.ErrorContains(t, pgsqlExp.Init(context.Background(), cfg, logger), "connect failure constructing db, postgres:") + assert.ErrorContains(t, pgsqlExp.Init(context.Background(), testutil.MockedInitProvider(&round), cfg, logger), "connection string is empty for postgres") } func TestConfigDefault(t *testing.T) { @@ -65,22 +68,10 @@ func TestConfigDefault(t *testing.T) { assert.Equal(t, plugins.PluginConfig(expected), pgsqlExp.Config()) } -func TestDefaultRoundZero(t *testing.T) { - pgsqlExp := pgsqlConstructor.New() - assert.Equal(t, uint64(0), pgsqlExp.Round()) -} - -func TestHandleGenesis(t *testing.T) { - pgsqlExp := pgsqlConstructor.New() - cfg := plugins.PluginConfig("test: true") - assert.NoError(t, pgsqlExp.Init(context.Background(), cfg, logger)) - assert.NoError(t, pgsqlExp.HandleGenesis(bookkeeping.Genesis{})) -} - func TestReceiveInvalidBlock(t *testing.T) { pgsqlExp := pgsqlConstructor.New() cfg := plugins.PluginConfig("test: true") - assert.NoError(t, pgsqlExp.Init(context.Background(), cfg, logger)) + assert.NoError(t, pgsqlExp.Init(context.Background(), testutil.MockedInitProvider(&round), cfg, logger)) invalidBlock := data.BlockData{ BlockHeader: bookkeeping.BlockHeader{}, @@ -95,7 +86,7 @@ func TestReceiveInvalidBlock(t *testing.T) { func TestReceiveAddBlockSuccess(t *testing.T) { pgsqlExp := pgsqlConstructor.New() cfg := plugins.PluginConfig("test: true") - assert.NoError(t, pgsqlExp.Init(context.Background(), cfg, logger)) + assert.NoError(t, pgsqlExp.Init(context.Background(), testutil.MockedInitProvider(&round), cfg, logger)) block := data.BlockData{ BlockHeader: bookkeeping.BlockHeader{}, @@ -106,6 +97,24 @@ func TestReceiveAddBlockSuccess(t *testing.T) { assert.NoError(t, pgsqlExp.Receive(block)) } +func TestPostgresqlExporterInit(t *testing.T) { + pgsqlExp := pgsqlConstructor.New() + cfg := plugins.PluginConfig("test: true") + + // genesis hash mismatch + initProvider := testutil.MockedInitProvider(&round) + initProvider.Genesis = &bookkeeping.Genesis{ + Network: "test", + } + err := pgsqlExp.Init(context.Background(), initProvider, cfg, logger) + assert.Contains(t, err.Error(), "error importing genesis: genesis hash not matching") + + // incorrect round + round = 1 + err = pgsqlExp.Init(context.Background(), testutil.MockedInitProvider(&round), cfg, logger) + assert.Contains(t, err.Error(), "initializing block round 1 but next round to account is 0") +} + func TestUnmarshalConfigsContainingDeleteTask(t *testing.T) { // configured delete task pgsqlExp := postgresqlExporter{} diff --git a/processors/blockprocessor/block_processor.go b/processors/blockprocessor/block_processor.go index fc00cb61c..9f247a12e 100644 --- a/processors/blockprocessor/block_processor.go +++ b/processors/blockprocessor/block_processor.go @@ -83,7 +83,7 @@ func (proc *blockProcessor) Init(ctx context.Context, initProvider data.InitProv } proc.cfg = cfg - genesis := initProvider.Genesis() + genesis := initProvider.GetGenesis() round := uint64(initProvider.NextDBRound()) err = InitializeLedger(ctx, proc.logger, round, *genesis, &pCfg) diff --git a/util/test/testutil.go b/util/test/testutil.go index d3479d2d2..2d2d9a4d9 100644 --- a/util/test/testutil.go +++ b/util/test/testutil.go @@ -7,6 +7,7 @@ import ( "os" "runtime" + "github.com/algorand/go-algorand/data/bookkeeping" log "github.com/sirupsen/logrus" "github.com/algorand/go-algorand/data/basics" @@ -122,3 +123,27 @@ func MakeTestLedger(logger *log.Logger) (*ledger.Ledger, error) { genesis := MakeGenesis() return util.MakeLedger(logger, true, &genesis, "ledger") } + +// MockInitProvider mock an init provider +type MockInitProvider struct { + CurrentRound *basics.Round + Genesis *bookkeeping.Genesis +} + +// GetGenesis produces genesis pointer +func (m *MockInitProvider) GetGenesis() *bookkeeping.Genesis { + return m.Genesis +} + +// NextDBRound provides next database round +func (m *MockInitProvider) NextDBRound() basics.Round { + return *m.CurrentRound +} + +// MockedInitProvider returns an InitProvider for testing +func MockedInitProvider(round *basics.Round) *MockInitProvider { + return &MockInitProvider{ + CurrentRound: round, + Genesis: &bookkeeping.Genesis{}, + } +}