Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: plugin interface #83

Merged
merged 26 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b96486c
enhancement: plugin interface
May 28, 2023
bb68599
lint
May 28, 2023
543bcf5
commentary
May 28, 2023
aa9d310
tighten unit test expectations around importer.Init() *genesis return…
May 28, 2023
f80f9d4
fix plugin docs: https://developer.algorand.org/docs/get-details/cond…
May 28, 2023
8b60bc3
small edits
May 29, 2023
f75141a
Update .markdownlint.yml
tzaffi May 29, 2023
650bdb7
cherry pick changes from docs-staleness branch
May 29, 2023
88d7b8f
Update .gitignore
tzaffi May 29, 2023
274aebf
Update .gitignore
tzaffi May 29, 2023
155e5ec
do we actually need the Config() method?
May 29, 2023
0e4d628
remove Config() everywhere
Jun 8, 2023
0eae647
remove Config() everywhere
Jun 8, 2023
9a3a95c
flatten PluginMetadata interface into Plugin interface
Jun 8, 2023
eb3e4fb
all the interfaces have Init() + importer has GetGenesis()
Jun 8, 2023
da2a27a
unify Init() inside of Plugin interface
Jun 8, 2023
7b36bde
Merge remote-tracking branch 'upstream/master' into plugin-interface
Jun 8, 2023
1301f19
fix unit tests that are supposed to error
Jun 8, 2023
f9f93b9
ErrorContains not Error()
Jun 8, 2023
d2fff8e
fix algod_importer tests
Jun 8, 2023
6863d42
Merge branch 'master' into plugin-interface
Jun 9, 2023
414d08d
md lint
Jun 9, 2023
ae6768b
Update conduit/metrics/metrics.go
tzaffi Jun 9, 2023
92a503d
assert that the genesis is non-nil when no error getting it
Jun 9, 2023
b8fef8f
add in missing zero-val'd errInit and errGetGen
Jun 9, 2023
5b114a0
add test case to trigger nil algod_importer genesis
Jun 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion conduit/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ func RegisterPrometheusMetrics(subsystem string) {
deregister()
instantiateCollectors(subsystem)

_ = prometheus.Register(BlockImportTimeSeconds)
_ = prometheus.Register(BlockImportTimeSeconds)
_ = prometheus.Register(ImportedTxnsPerBlock)
_ = prometheus.Register(ImportedRoundGauge)
Expand Down
8 changes: 6 additions & 2 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,14 @@ func (p *pipelineImpl) Init() error {
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not make %s config: %w", p.cfg.Importer.Name, err)
}
genesis, err := (*p.importer).Init(p.ctx, *p.initProvider, pluginConfig, importerLogger)
err = (*p.importer).Init(p.ctx, *p.initProvider, pluginConfig, importerLogger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize importer (%s): %w", p.cfg.Importer.Name, err)
}
genesis, err := (*p.importer).GetGenesis()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new interface method in use

if err != nil {
return fmt.Errorf("Pipeline.GetGenesis(): could not obtain Genesis from the importer (%s): %w", p.cfg.Importer.Name, err)
}
(*p.initProvider).SetGenesis(genesis)

// write pipeline metadata
Expand Down Expand Up @@ -413,7 +417,7 @@ func (p *pipelineImpl) Start() {
}
metrics.ImporterTimeSeconds.Observe(time.Since(importStart).Seconds())

// TODO: Verify that the block was build with a known protocol version.
// TODO: Verify that the block was built with a known protocol version.

// Start time currently measures operations after block fetching is complete.
// This is for backwards compatibility w/ Indexer's metrics
Expand Down
14 changes: 9 additions & 5 deletions conduit/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ type mockImporter struct {
rndReqErr error
}

func (m *mockImporter) Init(_ context.Context, _ data.InitProvider, cfg plugins.PluginConfig, _ *log.Logger) (*sdk.Genesis, error) {
func (m *mockImporter) Init(_ context.Context, _ data.InitProvider, cfg plugins.PluginConfig, _ *log.Logger) error {
m.cfg = cfg
return nil
}

func (m *mockImporter) GetGenesis() (*sdk.Genesis, error) {
return &m.genesis, nil
}

Expand Down Expand Up @@ -668,12 +672,12 @@ func (e *errorImporter) Metadata() plugins.Metadata {
return errorImporterMetadata
}

func (e *errorImporter) Init(_ context.Context, _ data.InitProvider, _ plugins.PluginConfig, _ *log.Logger) (*sdk.Genesis, error) {
return e.genesis, nil
func (e *errorImporter) Init(_ context.Context, _ data.InitProvider, _ plugins.PluginConfig, _ *log.Logger) error {
return nil
}

func (e *errorImporter) Config() string {
return ""
func (e *errorImporter) GetGenesis() (*sdk.Genesis, error) {
return e.genesis, nil
}

func (e *errorImporter) Close() error {
Expand Down
5 changes: 0 additions & 5 deletions conduit/plugins/exporters/example/example_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@ func (exp *exampleExporter) Init(_ context.Context, _ data.InitProvider, _ plugi
panic("not implemented")
}

// Config returns the unmarshaled config object
func (exp *exampleExporter) Config() string {
panic("not implemented")
}

// Close provides the opportunity to close connections, flush buffers, etc. when the process is terminating
func (exp *exampleExporter) Close() error {
panic("not implemented")
Expand Down
4 changes: 0 additions & 4 deletions conduit/plugins/exporters/example/example_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ func TestExporterInit(t *testing.T) {
assert.Panics(t, func() { exExp.Init(context.Background(), nil, plugins.MakePluginConfig(""), nil) })
}

func TestExporterConfig(t *testing.T) {
assert.Panics(t, func() { exExp.Config() })
}

func TestExporterClose(t *testing.T) {
assert.Panics(t, func() { exExp.Close() })
}
Expand Down
23 changes: 2 additions & 21 deletions conduit/plugins/exporters/exporter.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,14 @@
package exporters

import (
"context"

"github.com/sirupsen/logrus"

"github.com/algorand/conduit/conduit/data"
"github.com/algorand/conduit/conduit/plugins"
)

// Exporter defines the interface for plugins
type Exporter interface {
// PluginMetadata implement this interface.
plugins.PluginMetadata

// Init will be called during initialization, before block data starts going through the pipeline.
// Typically used for things like initializing network connections.
// The ExporterConfig passed to Connect will contain the Unmarhsalled config file specific to this plugin.
// Should return an error if it fails--this will result in the Indexer process terminating.
Init(ctx context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig, logger *logrus.Logger) error

// Config returns the configuration options used to create an Exporter.
// Initialized during Connect, it should return nil until the Exporter has been Connected.
Config() string

// Close will be called during termination of the Indexer process.
// There is no guarantee that plugin lifecycle hooks will be invoked in any specific order in relation to one another.
// Returns an error if it fails which will be surfaced in the logs, but the process is already terminating.
Close() error
// Plugin - implement this interface.
plugins.Plugin

// Receive is called for each block to be processed by the exporter.
// Should return an error on failure--retries are configurable.
Expand Down
6 changes: 0 additions & 6 deletions conduit/plugins/exporters/filewriter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"path"

"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"

"github.com/algorand/conduit/conduit/data"
"github.com/algorand/conduit/conduit/plugins"
Expand Down Expand Up @@ -68,11 +67,6 @@ func (exp *fileExporter) Init(_ context.Context, initProvider data.InitProvider,
return err
}

func (exp *fileExporter) Config() string {
ret, _ := yaml.Marshal(exp.cfg)
return string(ret)
}

func (exp *fileExporter) Close() error {
exp.logger.Infof("latest round on file: %d", exp.round)
return nil
Expand Down
6 changes: 1 addition & 5 deletions conduit/plugins/exporters/filewriter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ func TestExporterInitDefaults(t *testing.T) {
pcfg.DataDir = tempdir
err := fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil), pcfg, logger)
require.NoError(t, err)
pluginConfig := fileExp.Config()
assert.Contains(t, pluginConfig, fmt.Sprintf("block-dir: %s", tc.expected))
})
}
}
Expand All @@ -95,9 +93,7 @@ func TestExporterInit(t *testing.T) {

// creates a new output file
err := fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil), plugins.MakePluginConfig(config), logger)
pluginConfig := fileExp.Config()
configWithDefault := config + "filename-pattern: '%[1]d_block.json'\n" + "drop-certificate: false\n"
assert.Equal(t, configWithDefault, string(pluginConfig))
assert.NoError(t, err)
fileExp.Close()

// can open existing file
Expand Down
6 changes: 0 additions & 6 deletions conduit/plugins/exporters/noop/noop_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"

"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"

"github.com/algorand/conduit/conduit/data"
"github.com/algorand/conduit/conduit/plugins"
Expand Down Expand Up @@ -47,11 +46,6 @@ func (exp *noopExporter) Init(_ context.Context, initProvider data.InitProvider,
return nil
}

func (exp *noopExporter) Config() string {
ret, _ := yaml.Marshal(exp.cfg)
return string(ret)
}

func (exp *noopExporter) Close() error {
return nil
}
Expand Down
11 changes: 0 additions & 11 deletions conduit/plugins/exporters/noop/noop_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v3"

"github.com/algorand/conduit/conduit"
"github.com/algorand/conduit/conduit/data"
Expand Down Expand Up @@ -39,16 +38,6 @@ func TestExporterInit(t *testing.T) {
assert.NoError(t, ne.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.MakePluginConfig(""), nil))
}

func TestExporterConfig(t *testing.T) {
defaultConfig := &ExporterConfig{}
expected, err := yaml.Marshal(defaultConfig)
if err != nil {
t.Fatalf("unable to Marshal default noop.ExporterConfig: %v", err)
}
assert.NoError(t, ne.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.MakePluginConfig(""), nil))
assert.Equal(t, string(expected), ne.Config())
}

func TestExporterClose(t *testing.T) {
assert.NoError(t, ne.Close())
}
Expand Down
6 changes: 0 additions & 6 deletions conduit/plugins/exporters/postgresql/postgresql_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sync/atomic"

"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"

sdk "github.com/algorand/go-algorand-sdk/v2/types"
"github.com/algorand/indexer/idb"
Expand Down Expand Up @@ -142,11 +141,6 @@ func (exp *postgresqlExporter) Init(ctx context.Context, initProvider data.InitP
return nil
}

func (exp *postgresqlExporter) Config() string {
ret, _ := yaml.Marshal(exp.cfg)
return string(ret)
}

func (exp *postgresqlExporter) Close() error {
if exp.db != nil {
exp.db.Close()
Expand Down
10 changes: 0 additions & 10 deletions conduit/plugins/exporters/postgresql/postgresql_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,6 @@ func TestConnectDbFailure(t *testing.T) {
assert.ErrorContains(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil), cfg, logger), "connection string is empty for postgres")
}

func TestConfigDefault(t *testing.T) {
pgsqlExp := pgsqlConstructor.New()
defaultConfig := &ExporterConfig{}
expected, err := yaml.Marshal(defaultConfig)
if err != nil {
t.Fatalf("unable to Marshal default postgresql.ExporterConfig: %v", err)
}
assert.Equal(t, string(expected), pgsqlExp.Config())
}

func TestReceiveInvalidBlock(t *testing.T) {
pgsqlExp := pgsqlConstructor.New()
cfg := plugins.MakePluginConfig("test: true")
Expand Down
34 changes: 17 additions & 17 deletions conduit/plugins/importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"

"github.com/algorand/go-algorand-sdk/v2/client/v2/algod"
"github.com/algorand/go-algorand-sdk/v2/client/v2/common"
Expand Down Expand Up @@ -55,6 +54,7 @@ type algodImporter struct {
ctx context.Context
cancel context.CancelFunc
mode int
genesis *sdk.Genesis
Copy link
Contributor Author

@tzaffi tzaffi Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cache the genesis in the algodImporter struct to avoid having to call the genesis endpoint after Init()

}

//go:embed sample.yaml
Expand Down Expand Up @@ -328,12 +328,12 @@ func (algodImp *algodImporter) catchupNode(network string, targetRound uint64) e
return err
}

func (algodImp *algodImporter) Init(ctx context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig, logger *logrus.Logger) (*sdk.Genesis, error) {
func (algodImp *algodImporter) Init(ctx context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig, logger *logrus.Logger) error {
algodImp.ctx, algodImp.cancel = context.WithCancel(ctx)
algodImp.logger = logger
err := cfg.UnmarshalConfig(&algodImp.cfg)
if err != nil {
return nil, fmt.Errorf("connect failure in unmarshalConfig: %v", err)
return fmt.Errorf("connect failure in unmarshalConfig: %v", err)
}

// To support backwards compatibility with the daemon we default to archival mode
Expand All @@ -347,13 +347,13 @@ func (algodImp *algodImporter) Init(ctx context.Context, initProvider data.InitP
case followerModeStr:
algodImp.mode = followerMode
default:
return nil, fmt.Errorf("algod importer was set to a mode (%s) that wasn't supported", algodImp.cfg.Mode)
return fmt.Errorf("algod importer was set to a mode (%s) that wasn't supported", algodImp.cfg.Mode)
}

var client *algod.Client
u, err := url.Parse(algodImp.cfg.NetAddr)
if err != nil {
return nil, err
return err
}

if u.Scheme != "http" && u.Scheme != "https" {
Expand All @@ -362,34 +362,34 @@ func (algodImp *algodImporter) Init(ctx context.Context, initProvider data.InitP
}
client, err = algod.MakeClient(algodImp.cfg.NetAddr, algodImp.cfg.Token)
if err != nil {
return nil, err
return err
}
algodImp.aclient = client

genesisResponse, err := client.GetGenesis().Do(ctx)
genesisResponse, err := algodImp.aclient.GetGenesis().Do(algodImp.ctx)
if err != nil {
return nil, err
return err
}

genesis := sdk.Genesis{}

// Don't fail on unknown properties here since the go-algorand and SDK genesis types differ slightly
err = json.LenientDecode([]byte(genesisResponse), &genesis)
if err != nil {
return nil, err
return err
}
if reflect.DeepEqual(genesis, sdk.Genesis{}) {
return nil, fmt.Errorf("unable to fetch genesis file from API at %s", algodImp.cfg.NetAddr)
return fmt.Errorf("unable to fetch genesis file from API at %s", algodImp.cfg.NetAddr)
}
algodImp.genesis = &genesis

err = algodImp.catchupNode(genesis.Network, uint64(initProvider.NextDBRound()))

return &genesis, err
return algodImp.catchupNode(genesis.Network, uint64(initProvider.NextDBRound()))
}

func (algodImp *algodImporter) Config() string {
s, _ := yaml.Marshal(algodImp.cfg)
return string(s)
func (algodImp *algodImporter) GetGenesis() (*sdk.Genesis, error) {
if algodImp.genesis != nil {
return algodImp.genesis, nil
}
return nil, fmt.Errorf("algod importer is missing its genesis: GetGenesis() should be called only after Init()")
}

func (algodImp *algodImporter) Close() error {
Expand Down
Loading