Skip to content

Commit

Permalink
Conduit: Dynamic on complete (#1285)
Browse files Browse the repository at this point in the history
  • Loading branch information
winder authored Oct 25, 2022
1 parent c5fb831 commit ace1808
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 45 deletions.
14 changes: 14 additions & 0 deletions conduit/hooks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package conduit

import "github.com/algorand/indexer/data"

// OnCompleteFunc is the signature for the Completed functional interface.
type OnCompleteFunc func(input data.BlockData) error

// Completed is called by the conduit pipeline after every exporter has
// finished. It can be used for things like finalizing state.
type Completed interface {
// OnComplete will be called by the Conduit framework when the pipeline
// finishes processing a round.
OnComplete(input data.BlockData) error
}
30 changes: 24 additions & 6 deletions conduit/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,12 @@ type pipelineImpl struct {

initProvider *data.InitProvider

importer *importers.Importer
processors []*processors.Processor
exporter *exporters.Exporter
round basics.Round
importer *importers.Importer
processors []*processors.Processor
exporter *exporters.Exporter
completeCallback []OnCompleteFunc

round basics.Round
}

func (p *pipelineImpl) Error() error {
Expand All @@ -158,6 +160,20 @@ func (p *pipelineImpl) setError(err error) {
p.err = err
}

func (p *pipelineImpl) registerLifecycleCallbacks() {
if v, ok := (*p.importer).(Completed); ok {
p.completeCallback = append(p.completeCallback, v.OnComplete)
}
for _, processor := range p.processors {
if v, ok := (*processor).(Completed); ok {
p.completeCallback = append(p.completeCallback, v.OnComplete)
}
}
if v, ok := (*p.exporter).(Completed); ok {
p.completeCallback = append(p.completeCallback, v.OnComplete)
}
}

// Init prepares the pipeline for processing block data
func (p *pipelineImpl) Init() error {
p.logger.Infof("Starting Pipeline Initialization")
Expand Down Expand Up @@ -243,6 +259,8 @@ func (p *pipelineImpl) Init() error {
p.logger.Infof("Initialized Processor: %s", processorName)
}

// Register callbacks.
p.registerLifecycleCallbacks()
return err
}

Expand Down Expand Up @@ -335,8 +353,8 @@ func (p *pipelineImpl) Start() {
goto pipelineRun
}
// Callback Processors
for _, proc := range p.processors {
err = (*proc).OnComplete(blkData)
for _, cb := range p.completeCallback {
err = cb(blkData)
if err != nil {
p.logger.Errorf("%v", err)
p.setError(err)
Expand Down
102 changes: 82 additions & 20 deletions conduit/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ var uniqueBlockData = data.BlockData{
type mockImporter struct {
mock.Mock
importers.Importer
returnError bool
finalRound basics.Round
returnError bool
onCompleteError bool
}

func (m *mockImporter) Init(_ context.Context, _ plugins.PluginConfig, _ *log.Logger) (*bookkeeping.Genesis, error) {
Expand All @@ -170,6 +172,16 @@ func (m *mockImporter) GetBlock(rnd uint64) (data.BlockData, error) {
return uniqueBlockData, err
}

func (m *mockImporter) OnComplete(input data.BlockData) error {
var err error
if m.onCompleteError {
err = fmt.Errorf("on complete")
}
m.finalRound = input.BlockHeader.Round
m.Called(input)
return err
}

type mockProcessor struct {
mock.Mock
processors.Processor
Expand Down Expand Up @@ -199,6 +211,7 @@ func (m *mockProcessor) Process(input data.BlockData) (data.BlockData, error) {
input.BlockHeader.Round++
return input, err
}

func (m *mockProcessor) OnComplete(input data.BlockData) error {
var err error
if m.onCompleteError {
Expand All @@ -212,7 +225,9 @@ func (m *mockProcessor) OnComplete(input data.BlockData) error {
type mockExporter struct {
mock.Mock
exporters.Exporter
returnError bool
finalRound basics.Round
returnError bool
onCompleteError bool
}

func (m *mockExporter) Metadata() exporters.ExporterMetadata {
Expand Down Expand Up @@ -246,6 +261,16 @@ func (m *mockExporter) Receive(exportData data.BlockData) error {
return err
}

func (m *mockExporter) OnComplete(input data.BlockData) error {
var err error
if m.onCompleteError {
err = fmt.Errorf("on complete")
}
m.finalRound = input.BlockHeader.Round
m.Called(input)
return err
}

// TestPipelineRun tests that running the pipeline calls the correct functions with mocking
func TestPipelineRun(t *testing.T) {

Expand All @@ -262,19 +287,21 @@ func TestPipelineRun(t *testing.T) {
var pImporter importers.Importer = &mImporter
var pProcessor processors.Processor = &mProcessor
var pExporter exporters.Exporter = &mExporter
var cbComplete Completed = &mProcessor

ctx, cf := context.WithCancel(context.Background())

pImpl := pipelineImpl{
ctx: ctx,
cf: cf,
cfg: &PipelineConfig{},
logger: log.New(),
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
round: 0,
ctx: ctx,
cf: cf,
cfg: &PipelineConfig{},
logger: log.New(),
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
completeCallback: []OnCompleteFunc{cbComplete.OnComplete},
exporter: &pExporter,
round: 0,
}

go func() {
Expand Down Expand Up @@ -369,18 +396,20 @@ func TestPipelineErrors(t *testing.T) {
var pImporter importers.Importer = &mImporter
var pProcessor processors.Processor = &mProcessor
var pExporter exporters.Exporter = &mExporter
var cbComplete Completed = &mProcessor

ctx, cf := context.WithCancel(context.Background())
pImpl := pipelineImpl{
ctx: ctx,
cf: cf,
cfg: &PipelineConfig{},
logger: log.New(),
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
round: 0,
ctx: ctx,
cf: cf,
cfg: &PipelineConfig{},
logger: log.New(),
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
completeCallback: []OnCompleteFunc{cbComplete.OnComplete},
round: 0,
}

mImporter.returnError = true
Expand Down Expand Up @@ -420,5 +449,38 @@ func TestPipelineErrors(t *testing.T) {
pImpl.cf()
pImpl.Wait()
assert.Error(t, pImpl.Error(), fmt.Errorf("exporter"))
}

func Test_pipelineImpl_registerLifecycleCallbacks(t *testing.T) {
mImporter := mockImporter{}
mImporter.On("GetBlock", mock.Anything).Return(uniqueBlockData, nil)
mProcessor := mockProcessor{}
processorData := uniqueBlockData
processorData.BlockHeader.Round++
mProcessor.On("Process", mock.Anything).Return(processorData)
mProcessor.On("OnComplete", mock.Anything).Return(nil)
mExporter := mockExporter{}
mExporter.On("Receive", mock.Anything).Return(nil)

var pImporter importers.Importer = &mImporter
var pProcessor processors.Processor = &mProcessor
var pExporter exporters.Exporter = &mExporter

ctx, cf := context.WithCancel(context.Background())
pImpl := pipelineImpl{
ctx: ctx,
cf: cf,
cfg: &PipelineConfig{},
logger: log.New(),
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor, &pProcessor},
exporter: &pExporter,
round: 0,
}

// Each plugin implements the Completed interface, so there should be 4
// plugins registered (one of them is registered twice)
pImpl.registerLifecycleCallbacks()
assert.Len(t, pImpl.completeCallback, 4)
}
15 changes: 10 additions & 5 deletions docs/rfc/0002-processor-interface.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ type Processor interface {

// Process will be called with provided optional inputs. It is up to the plugin to check that required inputs are provided.
Process(input data.BlockData) (data.BlockData, error)

// OnComplete will be called by the Conduit framework when the exporter has successfully written the block
// The input will be the finalized information written to disk
OnComplete(input data.BlockData) error
}
```

Expand Down Expand Up @@ -143,4 +139,13 @@ Often it is useful to know when a block has passed through the entire Conduit fr
a stateful Processor plugin might want to commit block information to a private database once it is certain that the block information has been fully processed.

The `OnComplete()` function will be called by the Conduit framework when the exporter plugin has finished executing **and** did not return any errors. The data supplied to this function
will be the final block data written to disk. Note that this block data may have been processed by other Processor plugins since it was last "seen" by any specific Processor plugin.
will be the final block data written to disk. Note that this block data may have been processed by other Processor plugins since it was last "seen" by any specific Processor plugin.

It is implemented as a dynamic interface and can be added to any plugin.
```go
type Completed interface {
// OnComplete will be called by the Conduit framework when the exporter has successfully written the block
// The input will be the finalized information written to disk
OnComplete(input data.BlockData) error
}
```
2 changes: 2 additions & 0 deletions processors/blockprocessor/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/algorand/go-algorand/rpcs"

"github.com/algorand/indexer/accounting"
"github.com/algorand/indexer/conduit"
"github.com/algorand/indexer/data"
"github.com/algorand/indexer/plugins"
"github.com/algorand/indexer/processors"
Expand All @@ -30,6 +31,7 @@ type BlockProcessor interface {
NextRoundToProcess() uint64

processors.Processor
conduit.Completed
}

// package-wide init function
Expand Down
5 changes: 0 additions & 5 deletions processors/filterprocessor/filter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,3 @@ func (a *FilterProcessor) Process(input data.BlockData) (data.BlockData, error)

return input, err
}

// OnComplete a no-op for this processor
func (a *FilterProcessor) OnComplete(input data.BlockData) error {
return nil
}
5 changes: 0 additions & 5 deletions processors/noop/noop_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,3 @@ func (p *Processor) Close() error {
func (p *Processor) Process(input data.BlockData) (data.BlockData, error) {
return input, nil
}

// OnComplete noop
func (p *Processor) OnComplete(_ data.BlockData) error {
return nil
}
4 changes: 0 additions & 4 deletions processors/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,4 @@ type Processor interface {

// Process will be called with provided optional inputs. It is up to the plugin to check that required inputs are provided.
Process(input data.BlockData) (data.BlockData, error)

// OnComplete will be called by the Conduit framework when the exporter has successfully written the block
// The input will be the finalized information written to disk
OnComplete(input data.BlockData) error
}

0 comments on commit ace1808

Please sign in to comment.