Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: plugin interface #83

Merged
merged 26 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b96486c
enhancement: plugin interface
May 28, 2023
bb68599
lint
May 28, 2023
543bcf5
commentary
May 28, 2023
aa9d310
tighten unit test expectations around importer.Init() *genesis return…
May 28, 2023
f80f9d4
fix plugin docs: https://developer.algorand.org/docs/get-details/cond…
May 28, 2023
8b60bc3
small edits
May 29, 2023
f75141a
Update .markdownlint.yml
tzaffi May 29, 2023
650bdb7
cherry pick changes from docs-staleness branch
May 29, 2023
88d7b8f
Update .gitignore
tzaffi May 29, 2023
274aebf
Update .gitignore
tzaffi May 29, 2023
155e5ec
do we actually need the Config() method?
May 29, 2023
0e4d628
remove Config() everywhere
Jun 8, 2023
0eae647
remove Config() everywhere
Jun 8, 2023
9a3a95c
flatten PluginMetadata interface into Plugin interface
Jun 8, 2023
eb3e4fb
all the interfaces have Init() + importer has GetGenesis()
Jun 8, 2023
da2a27a
unify Init() inside of Plugin interface
Jun 8, 2023
7b36bde
Merge remote-tracking branch 'upstream/master' into plugin-interface
Jun 8, 2023
1301f19
fix unit tests that are supposed to error
Jun 8, 2023
f9f93b9
ErrorContains not Error()
Jun 8, 2023
d2fff8e
fix algod_importer tests
Jun 8, 2023
6863d42
Merge branch 'master' into plugin-interface
Jun 9, 2023
414d08d
md lint
Jun 9, 2023
ae6768b
Update conduit/metrics/metrics.go
tzaffi Jun 9, 2023
92a503d
assert that the genesis is non-nil when no error getting it
Jun 9, 2023
b8fef8f
add in missing zero-val'd errInit and errGetGen
Jun 9, 2023
5b114a0
add test case to trigger nil algod_importer genesis
Jun 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .markdownlint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
default: true

MD010:
code_blocks: false

MD013:
line_length: 200
1 change: 1 addition & 0 deletions conduit/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func RegisterPrometheusMetrics(subsystem string) {
instantiateCollectors(subsystem)

_ = prometheus.Register(BlockImportTimeSeconds)
// TODO: why is this registered twice?
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
_ = prometheus.Register(BlockImportTimeSeconds)
_ = prometheus.Register(ImportedTxnsPerBlock)
_ = prometheus.Register(ImportedRoundGauge)
Expand Down
2 changes: 1 addition & 1 deletion conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (p *pipelineImpl) Start() {
}
metrics.ImporterTimeSeconds.Observe(time.Since(importStart).Seconds())

// TODO: Verify that the block was build with a known protocol version.
// TODO: Verify that the block was built with a known protocol version.

// Start time currently measures operations after block fetching is complete.
// This is for backwards compatibility w/ Indexer's metrics
Expand Down
6 changes: 3 additions & 3 deletions conduit/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,9 +669,9 @@ func (e *errorImporter) Init(_ context.Context, _ data.InitProvider, _ plugins.P
return e.genesis, nil
}

func (e *errorImporter) Config() string {
return ""
}
// func (e *errorImporter) Config() string {
// return ""
// }

func (e *errorImporter) Close() error {
return nil
Expand Down
6 changes: 3 additions & 3 deletions conduit/plugins/exporters/example/example_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func (exp *exampleExporter) Init(_ context.Context, _ data.InitProvider, _ plugi
}

// Config returns the unmarshaled config object
func (exp *exampleExporter) Config() string {
panic("not implemented")
}
// func (exp *exampleExporter) Config() string {
// panic("not implemented")
// }

// Close provides the opportunity to close connections, flush buffers, etc. when the process is terminating
func (exp *exampleExporter) Close() error {
Expand Down
6 changes: 3 additions & 3 deletions conduit/plugins/exporters/example/example_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ func TestExporterInit(t *testing.T) {
assert.Panics(t, func() { exExp.Init(context.Background(), nil, plugins.MakePluginConfig(""), nil) })
}

func TestExporterConfig(t *testing.T) {
assert.Panics(t, func() { exExp.Config() })
}
// func TestExporterConfig(t *testing.T) {
// assert.Panics(t, func() { exExp.Config() })
// }

func TestExporterClose(t *testing.T) {
assert.Panics(t, func() { exExp.Close() })
Expand Down
17 changes: 4 additions & 13 deletions conduit/plugins/exporters/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,15 @@ import (

// Exporter defines the interface for plugins
type Exporter interface {
// PluginMetadata implement this interface.
plugins.PluginMetadata
// Plugin - implement this interface.
plugins.Plugin

// Init will be called during initialization, before block data starts going through the pipeline.
// Typically used for things like initializing network connections.
// The ExporterConfig passed to Connect will contain the Unmarhsalled config file specific to this plugin.
// Should return an error if it fails--this will result in the Indexer process terminating.
// The ExporterConfig passed to Init will contain the Unmarhsalled config file specific to this plugin.
// Should return an error if it fails--this will result in the Conduit process terminating.
Init(ctx context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig, logger *logrus.Logger) error

// Config returns the configuration options used to create an Exporter.
// Initialized during Connect, it should return nil until the Exporter has been Connected.
Config() string

// Close will be called during termination of the Indexer process.
// There is no guarantee that plugin lifecycle hooks will be invoked in any specific order in relation to one another.
// Returns an error if it fails which will be surfaced in the logs, but the process is already terminating.
Close() error

// Receive is called for each block to be processed by the exporter.
// Should return an error on failure--retries are configurable.
Receive(exportData data.BlockData) error
Expand Down
9 changes: 4 additions & 5 deletions conduit/plugins/exporters/filewriter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"path"

"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"

"github.com/algorand/conduit/conduit/data"
"github.com/algorand/conduit/conduit/plugins"
Expand Down Expand Up @@ -68,10 +67,10 @@ func (exp *fileExporter) Init(_ context.Context, initProvider data.InitProvider,
return err
}

func (exp *fileExporter) Config() string {
ret, _ := yaml.Marshal(exp.cfg)
return string(ret)
}
// func (exp *fileExporter) Config() string {
// ret, _ := yaml.Marshal(exp.cfg)
// return string(ret)
// }

func (exp *fileExporter) Close() error {
exp.logger.Infof("latest round on file: %d", exp.round)
Expand Down
11 changes: 6 additions & 5 deletions conduit/plugins/exporters/filewriter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ func TestExporterInitDefaults(t *testing.T) {
pcfg.DataDir = tempdir
err := fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil), pcfg, logger)
require.NoError(t, err)
pluginConfig := fileExp.Config()
assert.Contains(t, pluginConfig, fmt.Sprintf("block-dir: %s", tc.expected))
// pluginConfig := fileExp.Config()
// assert.Contains(t, pluginConfig, fmt.Sprintf("block-dir: %s", tc.expected))
})
}
}
Expand All @@ -95,9 +95,10 @@ func TestExporterInit(t *testing.T) {

// creates a new output file
err := fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil), plugins.MakePluginConfig(config), logger)
pluginConfig := fileExp.Config()
configWithDefault := config + "filename-pattern: '%[1]d_block.json'\n" + "drop-certificate: false\n"
assert.Equal(t, configWithDefault, string(pluginConfig))
assert.NoError(t, err)
// pluginConfig := fileExp.Config()
// configWithDefault := config + "filename-pattern: '%[1]d_block.json'\n" + "drop-certificate: false\n"
// assert.Equal(t, configWithDefault, string(pluginConfig))
fileExp.Close()

// can open existing file
Expand Down
9 changes: 4 additions & 5 deletions conduit/plugins/exporters/noop/noop_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"

"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"

"github.com/algorand/conduit/conduit/data"
"github.com/algorand/conduit/conduit/plugins"
Expand Down Expand Up @@ -47,10 +46,10 @@ func (exp *noopExporter) Init(_ context.Context, initProvider data.InitProvider,
return nil
}

func (exp *noopExporter) Config() string {
ret, _ := yaml.Marshal(exp.cfg)
return string(ret)
}
// func (exp *noopExporter) Config() string {
// ret, _ := yaml.Marshal(exp.cfg)
// return string(ret)
// }

func (exp *noopExporter) Close() error {
return nil
Expand Down
19 changes: 9 additions & 10 deletions conduit/plugins/exporters/noop/noop_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v3"

"github.com/algorand/conduit/conduit"
"github.com/algorand/conduit/conduit/data"
Expand Down Expand Up @@ -39,15 +38,15 @@ func TestExporterInit(t *testing.T) {
assert.NoError(t, ne.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.MakePluginConfig(""), nil))
}

func TestExporterConfig(t *testing.T) {
defaultConfig := &ExporterConfig{}
expected, err := yaml.Marshal(defaultConfig)
if err != nil {
t.Fatalf("unable to Marshal default noop.ExporterConfig: %v", err)
}
assert.NoError(t, ne.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.MakePluginConfig(""), nil))
assert.Equal(t, string(expected), ne.Config())
}
// func TestExporterConfig(t *testing.T) {
// defaultConfig := &ExporterConfig{}
// expected, err := yaml.Marshal(defaultConfig)
// if err != nil {
// t.Fatalf("unable to Marshal default noop.ExporterConfig: %v", err)
// }
// assert.NoError(t, ne.Init(context.Background(), &conduit.PipelineInitProvider{}, plugins.MakePluginConfig(""), nil))
// assert.Equal(t, string(expected), ne.Config())
// }

func TestExporterClose(t *testing.T) {
assert.NoError(t, ne.Close())
Expand Down
9 changes: 4 additions & 5 deletions conduit/plugins/exporters/postgresql/postgresql_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sync/atomic"

"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"

sdk "github.com/algorand/go-algorand-sdk/v2/types"
"github.com/algorand/indexer/idb"
Expand Down Expand Up @@ -142,10 +141,10 @@ func (exp *postgresqlExporter) Init(ctx context.Context, initProvider data.InitP
return nil
}

func (exp *postgresqlExporter) Config() string {
ret, _ := yaml.Marshal(exp.cfg)
return string(ret)
}
// func (exp *postgresqlExporter) Config() string {
// ret, _ := yaml.Marshal(exp.cfg)
// return string(ret)
// }

func (exp *postgresqlExporter) Close() error {
if exp.db != nil {
Expand Down
18 changes: 9 additions & 9 deletions conduit/plugins/exporters/postgresql/postgresql_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ func TestConnectDbFailure(t *testing.T) {
assert.ErrorContains(t, pgsqlExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil), cfg, logger), "connection string is empty for postgres")
}

func TestConfigDefault(t *testing.T) {
pgsqlExp := pgsqlConstructor.New()
defaultConfig := &ExporterConfig{}
expected, err := yaml.Marshal(defaultConfig)
if err != nil {
t.Fatalf("unable to Marshal default postgresql.ExporterConfig: %v", err)
}
assert.Equal(t, string(expected), pgsqlExp.Config())
}
// func TestConfigDefault(t *testing.T) {
// pgsqlExp := pgsqlConstructor.New()
// defaultConfig := &ExporterConfig{}
// expected, err := yaml.Marshal(defaultConfig)
// if err != nil {
// t.Fatalf("unable to Marshal default postgresql.ExporterConfig: %v", err)
// }
// assert.Equal(t, string(expected), pgsqlExp.Config())
// }

func TestReceiveInvalidBlock(t *testing.T) {
pgsqlExp := pgsqlConstructor.New()
Expand Down
9 changes: 4 additions & 5 deletions conduit/plugins/importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"

"github.com/algorand/go-algorand-sdk/v2/client/v2/algod"
"github.com/algorand/go-algorand-sdk/v2/client/v2/common"
Expand Down Expand Up @@ -382,10 +381,10 @@ func (algodImp *algodImporter) Init(ctx context.Context, initProvider data.InitP
return &genesis, err
}

func (algodImp *algodImporter) Config() string {
s, _ := yaml.Marshal(algodImp.cfg)
return string(s)
}
// func (algodImp *algodImporter) Config() string {
// s, _ := yaml.Marshal(algodImp.cfg)
// return string(s)
// }

func (algodImp *algodImporter) Close() error {
if algodImp.cancel != nil {
Expand Down
Loading