Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

conduit: Initial conduit pipeline tool. #1326

Merged
merged 68 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from 63 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
3c8c150
Conduit: Postgres exporter initial implementation (#1114)
Eric-Warehime Jul 21, 2022
e2c50a0
Add all PluginTypes (#1136)
Eric-Warehime Jul 21, 2022
2c945b4
Conduit: Rfc updates (#1139)
Eric-Warehime Jul 25, 2022
c0bf99a
Conduit: Merge conduit develop (#1157)
Eric-Warehime Aug 1, 2022
f2b422f
docs: Plugin docs (#1150)
Eric-Warehime Aug 1, 2022
595d3bf
Conduit: Algod importer implementation (#1135)
algoganesh Aug 1, 2022
3d2720c
Create RFC for processor (#1140)
AlgoStephenAkiki Aug 5, 2022
34986db
Use config to control noop exporter round start (#1176)
Eric-Warehime Aug 12, 2022
959c576
Set pgsql exporter round from db on init (#1177)
Eric-Warehime Aug 12, 2022
45d5c4b
1165 create initial conduit binary (#1167)
AlgoStephenAkiki Aug 12, 2022
a0b6ecc
Add waits/retries to algod importer (#1178)
Eric-Warehime Aug 15, 2022
9cc3d36
conduit: pipeline run loop implementation (#1183)
Eric-Warehime Aug 17, 2022
adbae7e
Adds unit tests (#1202)
AlgoStephenAkiki Aug 26, 2022
bf8b441
succeed close calls when init fails (#1204)
Eric-Warehime Aug 26, 2022
13aaba6
silence usage data on error (#1205)
Eric-Warehime Aug 26, 2022
d0046cb
conduit: Conduit merge upstream (#1206)
Eric-Warehime Aug 29, 2022
beaf4c6
Adds cpu profile and pid flag (#1209)
AlgoStephenAkiki Sep 1, 2022
88fca5c
conduit: Convert daemon to conduit pipeline (#1208)
Eric-Warehime Sep 9, 2022
e8fa891
file exporter (#1213)
shiqizng Sep 9, 2022
73fdf28
conduit: Conduit/develop merge (#1222)
Eric-Warehime Sep 14, 2022
f93bd51
Conduit: e2e tests (#1221)
Eric-Warehime Sep 16, 2022
7446007
Initial Filter Processor (#1220)
AlgoStephenAkiki Sep 16, 2022
756be9b
Add wait-for-block to block-gen (#1229)
Eric-Warehime Sep 19, 2022
7cf43e8
Conduit: prometheus metrics (#1230)
Eric-Warehime Sep 20, 2022
a2e7bdc
Add filter_processor to all imports for processors (#1233)
Eric-Warehime Sep 21, 2022
ce54ff3
File Exporter: save metadata more often (#1232)
winder Sep 26, 2022
c809d35
Conduit: init subcommand (#1235)
winder Sep 27, 2022
3bc8f52
Changed tag format (#1238)
AlgoStephenAkiki Oct 3, 2022
a6b7473
Add panic recovers (#1251)
AlgoStephenAkiki Oct 3, 2022
eaab3a1
Condensed Duplicate logger output (#1256)
AlgoStephenAkiki Oct 5, 2022
6a6fd87
Fix configuration of logger (#1259)
AlgoStephenAkiki Oct 7, 2022
5f35da3
Conduit: ensure thread safety across multiple loggers. (#1271)
AlgoStephenAkiki Oct 13, 2022
6e13534
Add context to exporter interface (#1272)
AlgoStephenAkiki Oct 13, 2022
d7c2365
Conduit: improve config naming consistency (#1279)
AlgoStephenAkiki Oct 19, 2022
9284c97
enhancement: prune historical data (#1277)
shiqizng Oct 19, 2022
d0aef01
Conduit Documentation (#1275)
winder Oct 20, 2022
c5fb831
Conduit: Update submodule (#1291)
Eric-Warehime Oct 24, 2022
ace1808
Conduit: Dynamic on complete (#1285)
winder Oct 25, 2022
99a848c
Add block metadata to pipeline (#1258)
shiqizng Oct 25, 2022
997b8c7
Conduit: filereader / filewriter plugins and compatibility changes. (…
winder Oct 26, 2022
ea8c774
adding metrics endpoint (#1284)
shiqizng Oct 27, 2022
bd5a482
e2e scripts readme (#1293)
winder Oct 31, 2022
d30cfc1
plugin metrics (#1290)
shiqizng Oct 31, 2022
8f165aa
Add some basic documentation about lifecycle hooks. (#1299)
winder Oct 31, 2022
750754d
Conduit: config updates (#1295)
shiqizng Nov 1, 2022
388d00b
fix e2e conduit (#1301)
shiqizng Nov 1, 2022
84e2f12
conduit: plugin constructor (#1298)
shiqizng Nov 1, 2022
3c8938e
update import
shiqizng Nov 1, 2022
2cb79ff
Merge branch 'develop' into will/merge
winder Nov 1, 2022
330f27b
Update submodule, regenerate mapping.
winder Nov 1, 2022
0cd10ab
Update util/test/account_testutil.go
winder Nov 2, 2022
04e272a
Fix some merge issues.
winder Nov 2, 2022
2da54f0
More merge errors.
winder Nov 2, 2022
4ab2a90
More merge errors.
winder Nov 2, 2022
5043497
Added none operation (#1297)
AlgoStephenAkiki Nov 2, 2022
964200e
Merge develop into conduit. (#1303)
winder Nov 2, 2022
28ce7ca
Merge branch 'will/merge' into conduit
winder Nov 2, 2022
556e377
Merge branch 'develop' into conduit
winder Nov 2, 2022
9a2f836
conduit: next-round command line flag (#1311)
shiqizng Nov 4, 2022
74a9954
List subcommand (#1296)
AlgoStephenAkiki Nov 6, 2022
9214b07
Minor pipeline test cleanup. (#1316)
winder Nov 6, 2022
c0e1146
Reorganize plugin packages. (#1317)
winder Nov 7, 2022
b343cae
misc(release): add support for conduit (#1330)
algolucky Nov 16, 2022
75adc17
New Feature: Application Boxes in Indexer (#1168)
tzaffi Nov 1, 2022
837eadc
Enhancement: Update git submodule with go-algorand release commit `31…
tzaffi Nov 2, 2022
ed7317e
Adding a link to the data directory documentation (#1287)
fabrice102 Nov 4, 2022
2cf6b2d
Merge branch 'develop' into conduit
winder Nov 29, 2022
9f32987
Remove HandleGenesis references.
winder Nov 29, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ commands:
sudo apt update
sudo apt -y install python3 python3-pip python3-setuptools python3-wheel libboost-math-dev libffi-dev
pip3 install -r misc/requirements.txt
pip3 install e2e_tests/

- run:
name: sync submodules (go-algorand)
Expand Down Expand Up @@ -143,6 +144,7 @@ commands:
- run: make test-generate
- run: make fakepackage
- run: make e2e
- run: make e2e-conduit

run_indexer_vs_algod:
steps:
Expand Down
5 changes: 5 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ codecov:
require_ci_to_pass: no
branch: develop

ignore:
- "idb/mocks"
- "idb/dummy"
- "util/test"

coverage:
precision: 2
round: down
Expand Down
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Build artifacts
cmd/algorand-indexer/algorand-indexer
cmd/conduit/conduit
api/.3tmp.json
api/generate
tmp/
Expand Down Expand Up @@ -28,6 +29,9 @@ _*.json

# Python
__pycache__
e2e_tests/dist
e2e_tests/build
e2e_tests/*egg-info*
.venv

# jetbrains IDE
Expand All @@ -48,3 +52,6 @@ coverage.txt

# asdf
.tool-versions

# conduit example
cmd/conduit/data
22 changes: 16 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ COVERPKG := $(shell go list ./... | grep -v '/cmd/' | egrep -v '(testing|test|m
# Used for e2e test
export GO_IMAGE = golang:$(shell go version | cut -d ' ' -f 3 | tail -c +3 )

# This is the default target, build the indexer:
# This is the default target, build everything:
all: conduit cmd/algorand-indexer/algorand-indexer go-algorand idb/postgres/internal/schema/setup_postgres_sql.go idb/mocks/IndexerDb.go

conduit: go-algorand
go generate ./... && cd cmd/conduit && go build

cmd/algorand-indexer/algorand-indexer: idb/postgres/internal/schema/setup_postgres_sql.go go-algorand
cd cmd/algorand-indexer && go build -ldflags="${GOLDFLAGS}"

Expand Down Expand Up @@ -55,7 +60,7 @@ fakepackage: go-algorand
test: idb/mocks/IndexerDb.go cmd/algorand-indexer/algorand-indexer
go test -coverpkg=$(COVERPKG) ./... -coverprofile=coverage.txt -covermode=atomic ${TEST_FLAG}

lint: go-algorand
lint: conduit
golint -set_exit_status ./...
go vet ./...

Expand All @@ -68,7 +73,11 @@ integration: cmd/algorand-indexer/algorand-indexer
test/postgres_integration_test.sh

e2e: cmd/algorand-indexer/algorand-indexer
cd misc && docker-compose build --build-arg GO_IMAGE=${GO_IMAGE} && docker-compose up --exit-code-from e2e
cd e2e_tests/docker/indexer/ && docker-compose build --build-arg GO_IMAGE=${GO_IMAGE} && docker-compose up --exit-code-from e2e

e2e-conduit: conduit
cd third_party/go-algorand && make install
export PATH=$(PATH):$(shell go env GOPATH)/bin; pip3 install e2e_tests/ && e2econduit --s3-source-net ${CI_E2E_FILENAME} --conduit-bin cmd/conduit/conduit

deploy:
mule/deploy.sh
Expand Down Expand Up @@ -96,9 +105,10 @@ indexer-v-algod: nightly-setup indexer-v-algod-swagger nightly-teardown
# fetch and update submodule. it's default to latest rel/nightly branch.
# to use a different branch, update the branch in .gitmodules for CI build,
# and for local testing, you may checkout a specific branch in the submodule.
# after submodule is updated, CI_E2E_FILE in circleci/config.yml should also
# be updated to use a newer artifact. path copied from s3 bucket, s3://algorand-testdata/indexer/e2e4/
# after submodule is updated, CI_E2E_FILENAME in .circleci/config.yml should
# also be updated to use a newer artifact. path copied from s3 bucket,
# s3://algorand-testdata/indexer/e2e4/
update-submodule:
git submodule update --remote

.PHONY: test e2e integration fmt lint deploy sign test-package package fakepackage cmd/algorand-indexer/algorand-indexer idb/mocks/IndexerDb.go go-algorand indexer-v-algod
.PHONY: all test e2e integration fmt lint deploy sign test-package package fakepackage cmd/algorand-indexer/algorand-indexer idb/mocks/IndexerDb.go go-algorand indexer-v-algod conduit
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ Indexer is part of the [sandbox](https://github.com/algorand/sandbox) private ne
- Search and filter accounts, transactions, assets, and asset balances with many different parameters.
- Pagination of results.
- Enriched transaction and account data:
- Confirmation round (block containing the transaction)
- Confirmation time
- Signature type
- Close amounts
- Create/delete rounds.
- Confirmation round (block containing the transaction)
- Confirmation time
- Signature type
- Close amounts
- Create/delete rounds.
- Human readable field names instead of the space optimized protocol level names.

# Contributing
Expand Down
21 changes: 10 additions & 11 deletions api/app_boxes_fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/algorand/go-algorand/data/transactions/logic"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/indexer/processor"
"github.com/algorand/indexer/util/test"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -40,7 +39,7 @@ var goalEncodingExamples map[string]string = map[string]string{
"abi": `(uint64,string,bool[]):[399,"pls pass",[true,false]]`,
}

func setupLiveBoxes(t *testing.T, proc processor.Processor, l *ledger.Ledger) {
func setupLiveBoxes(t *testing.T, proc func(cert *rpcs.EncodedBlockCert) error, l *ledger.Ledger) {
deleted := "DELETED"

firstAppid := basics.AppIndex(1)
Expand Down Expand Up @@ -69,7 +68,7 @@ func setupLiveBoxes(t *testing.T, proc processor.Processor, l *ledger.Ledger) {
block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &createTxn, &payNewAppTxn, &createTxn2, &payNewAppTxn2, &createTxn3, &payNewAppTxn3)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
err = proc(&rpcs.EncodedBlockCert{Block: block})
require.NoError(t, err)

// block header handoff: round 1 --> round 2
Expand Down Expand Up @@ -104,7 +103,7 @@ func setupLiveBoxes(t *testing.T, proc processor.Processor, l *ledger.Ledger) {
block, err = test.MakeBlockForTxns(blockHdr, boxTxns...)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
err = proc(&rpcs.EncodedBlockCert{Block: block})
require.NoError(t, err)

// block header handoff: round 2 --> round 3
Expand Down Expand Up @@ -137,7 +136,7 @@ func setupLiveBoxes(t *testing.T, proc processor.Processor, l *ledger.Ledger) {
block, err = test.MakeBlockForTxns(blockHdr, boxTxns...)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
err = proc(&rpcs.EncodedBlockCert{Block: block})
require.NoError(t, err)

// block header handoff: round 3 --> round 4
Expand Down Expand Up @@ -165,7 +164,7 @@ func setupLiveBoxes(t *testing.T, proc processor.Processor, l *ledger.Ledger) {
block, err = test.MakeBlockForTxns(blockHdr, boxTxns...)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
err = proc(&rpcs.EncodedBlockCert{Block: block})
require.NoError(t, err)

// block header handoff: round 4 --> round 5
Expand Down Expand Up @@ -194,7 +193,7 @@ func setupLiveBoxes(t *testing.T, proc processor.Processor, l *ledger.Ledger) {
block, err = test.MakeBlockForTxns(blockHdr, boxTxns...)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
err = proc(&rpcs.EncodedBlockCert{Block: block})
require.NoError(t, err)

// block header handoff: round 5 --> round 6
Expand Down Expand Up @@ -223,7 +222,7 @@ func setupLiveBoxes(t *testing.T, proc processor.Processor, l *ledger.Ledger) {
block, err = test.MakeBlockForTxns(blockHdr, boxTxns...)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
err = proc(&rpcs.EncodedBlockCert{Block: block})
require.NoError(t, err)

// block header handoff: round 6 --> round 7
Expand All @@ -250,7 +249,7 @@ func setupLiveBoxes(t *testing.T, proc processor.Processor, l *ledger.Ledger) {
block, err = test.MakeBlockForTxns(blockHdr, boxTxns...)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
err = proc(&rpcs.EncodedBlockCert{Block: block})
require.NoError(t, err)

// block header handoff: round 7 --> round 8
Expand All @@ -273,7 +272,7 @@ func setupLiveBoxes(t *testing.T, proc processor.Processor, l *ledger.Ledger) {
block, err = test.MakeBlockForTxns(blockHdr, boxTxns...)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
err = proc(&rpcs.EncodedBlockCert{Block: block})
require.NoError(t, err)

// block header handoff: round 8 --> round 9
Expand All @@ -287,7 +286,7 @@ func setupLiveBoxes(t *testing.T, proc processor.Processor, l *ledger.Ledger) {
block, err = test.MakeBlockForTxns(blockHdr, &deleteTxn)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
err = proc(&rpcs.EncodedBlockCert{Block: block})
require.NoError(t, err)

// ---- SUMMARY ---- //
Expand Down
4 changes: 2 additions & 2 deletions api/fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import (
"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/rpcs"

"github.com/algorand/indexer/api/generated/v2"
"github.com/algorand/indexer/idb/postgres"
"github.com/algorand/indexer/processor"
"github.com/algorand/indexer/util/test"
)

Expand Down Expand Up @@ -268,7 +268,7 @@ func getProof(path string) (route string, proof prover, err error) {
}

// WARNING: receiver should not call l.Close()
func setupIdbAndReturnShutdownFunc(t *testing.T) (db *postgres.IndexerDb, proc processor.Processor, l *ledger.Ledger, shutdown func()) {
func setupIdbAndReturnShutdownFunc(t *testing.T) (db *postgres.IndexerDb, proc func(cert *rpcs.EncodedBlockCert) error, l *ledger.Ledger, shutdown func()) {
db, dbShutdown, proc, l := setupIdb(t, test.MakeGenesis())

shutdown = func() {
Expand Down
8 changes: 5 additions & 3 deletions api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type ServerImplementation struct {

db idb.IndexerDb

fetcher error
dataError func() error

timeout time.Duration

Expand Down Expand Up @@ -142,8 +142,10 @@ func (si *ServerImplementation) MakeHealthCheck(ctx echo.Context) error {
errors = append(errors, fmt.Sprintf("database error: %s", health.Error))
}

if si.fetcher != nil && si.fetcher.Error() != "" {
errors = append(errors, fmt.Sprintf("fetcher error: %s", si.fetcher.Error()))
if si.dataError != nil {
if err := si.dataError(); err != nil {
errors = append(errors, fmt.Sprintf("data error: %s", err))
}
}

return ctx.JSON(http.StatusOK, common.HealthCheckResponse{
Expand Down
Loading