-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
conduit: pipeline run loop implementation #1183
Changes from 5 commits
0ded239
7f3b97a
4947a88
818507e
1fa6e68
982f458
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -131,13 +131,33 @@ type pipelineImpl struct { | |
importer *importers.Importer | ||
processors []*processors.Processor | ||
exporter *exporters.Exporter | ||
round basics.Round | ||
} | ||
|
||
func (p *pipelineImpl) Start() error { | ||
p.logger.Infof("Starting Pipeline Initialization") | ||
|
||
// TODO Need to change interfaces to accept config of map[string]interface{} | ||
|
||
exporterLogger := log.New() | ||
exporterLogger.SetFormatter( | ||
PluginLogFormatter{ | ||
Formatter: &log.JSONFormatter{ | ||
DisableHTMLEscape: true, | ||
}, | ||
Type: "Exporter", | ||
Name: (*p.exporter).Metadata().Name(), | ||
}, | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably have a helper to construct these. |
||
|
||
jsonEncode := string(json.Encode(p.cfg.Exporter.Config)) | ||
err := (*p.exporter).Init(plugins.PluginConfig(jsonEncode), exporterLogger) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you put a comment somewhere that this is imported first so that we can fetch the round? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll update w/ comment. I think it's hard to enumerate all of possible combinations of multiple exporters. I'd prefer to make it really easy to run a pipeline so that people can just spin up multiple pipelines if they need to run multiple exporters. |
||
exporterName := (*p.exporter).Metadata().Name() | ||
if err != nil { | ||
return fmt.Errorf("Pipeline.Start(): could not initialize Exporter (%s): %w", exporterName, err) | ||
} | ||
p.logger.Infof("Initialized Exporter: %s", exporterName) | ||
|
||
importerLogger := log.New() | ||
importerLogger.SetFormatter( | ||
PluginLogFormatter{ | ||
|
@@ -150,25 +170,28 @@ func (p *pipelineImpl) Start() error { | |
) | ||
|
||
// TODO modify/fix ? | ||
jsonEncode := string(json.Encode(p.cfg.Importer.Config)) | ||
jsonEncode = string(json.Encode(p.cfg.Importer.Config)) | ||
genesis, err := (*p.importer).Init(p.ctx, plugins.PluginConfig(jsonEncode), importerLogger) | ||
|
||
currentRound := basics.Round(0) | ||
importerName := (*p.importer).Metadata().Name() | ||
if err != nil { | ||
return fmt.Errorf("Pipeline.Start(): could not initialize importer (%s): %w", importerName, err) | ||
} | ||
p.round = basics.Round((*p.exporter).Round()) | ||
err = (*p.exporter).HandleGenesis(*genesis) | ||
if err != nil { | ||
return fmt.Errorf("Pipeline.Start(): exporter could not handle genesis (%s): %w", exporterName, err) | ||
} | ||
p.logger.Infof("Initialized Importer: %s", importerName) | ||
|
||
var initProvider data.InitProvider = &PipelineInitProvider{ | ||
currentRound: ¤tRound, | ||
currentRound: &p.round, | ||
genesis: genesis, | ||
} | ||
|
||
p.initProvider = &initProvider | ||
|
||
importerName := (*p.importer).Metadata().Name() | ||
if err != nil { | ||
return fmt.Errorf("Pipeline.Start(): could not initialize importer (%s): %w", importerName, err) | ||
} | ||
p.logger.Infof("Initialized Importer: %s", importerName) | ||
|
||
for _, processor := range p.processors { | ||
for idx, processor := range p.processors { | ||
|
||
processorLogger := log.New() | ||
processorLogger.SetFormatter( | ||
|
@@ -180,8 +203,8 @@ func (p *pipelineImpl) Start() error { | |
Name: (*processor).Metadata().Name(), | ||
}, | ||
) | ||
|
||
err := (*processor).Init(p.ctx, *p.initProvider, "") | ||
jsonEncode = string(json.Encode(p.cfg.Processors[idx].Config)) | ||
err := (*processor).Init(p.ctx, *p.initProvider, plugins.PluginConfig(jsonEncode), processorLogger) | ||
processorName := (*processor).Metadata().Name() | ||
if err != nil { | ||
return fmt.Errorf("Pipeline.Start(): could not initialize processor (%s): %w", processorName, err) | ||
|
@@ -190,25 +213,7 @@ func (p *pipelineImpl) Start() error { | |
|
||
} | ||
|
||
exporterLogger := log.New() | ||
exporterLogger.SetFormatter( | ||
PluginLogFormatter{ | ||
Formatter: &log.JSONFormatter{ | ||
DisableHTMLEscape: true, | ||
}, | ||
Type: "Exporter", | ||
Name: (*p.exporter).Metadata().Name(), | ||
}, | ||
) | ||
|
||
err = (*p.exporter).Init("", p.logger) | ||
ExporterName := (*p.exporter).Metadata().Name() | ||
if err != nil { | ||
return fmt.Errorf("Pipeline.Start(): could not initialize Exporter (%s): %w", ExporterName, err) | ||
} | ||
p.logger.Infof("Initialized Exporter: %s", ExporterName) | ||
|
||
return nil | ||
return p.RunPipeline() | ||
} | ||
|
||
func (p *pipelineImpl) Stop() error { | ||
|
@@ -231,6 +236,44 @@ func (p *pipelineImpl) Stop() error { | |
return nil | ||
} | ||
|
||
// RunPipeline pushes block data through the pipeline | ||
func (p *pipelineImpl) RunPipeline() error { | ||
for { | ||
// TODO Retries? | ||
p.logger.Infof("Pipeline round: %v", p.round) | ||
// fetch block | ||
blkData, err := (*p.importer).GetBlock(uint64(p.round)) | ||
if err != nil { | ||
p.logger.Errorf("%v\n", err) | ||
return err | ||
} | ||
// run through processors | ||
for _, proc := range p.processors { | ||
blkData, err = (*proc).Process(blkData) | ||
if err != nil { | ||
p.logger.Errorf("%v\n", err) | ||
return err | ||
} | ||
} | ||
// run through exporter | ||
err = (*p.exporter).Receive(blkData) | ||
if err != nil { | ||
p.logger.Errorf("%v\n", err) | ||
return err | ||
} | ||
// Callback Processors | ||
for _, proc := range p.processors { | ||
err = (*proc).OnComplete(blkData) | ||
if err != nil { | ||
p.logger.Errorf("%v\n", err) | ||
return err | ||
} | ||
} | ||
// Increment Round | ||
p.round++ | ||
} | ||
} | ||
|
||
// MakePipeline creates a Pipeline | ||
func MakePipeline(cfg *PipelineConfig, logger *log.Logger) (Pipeline, error) { | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this further, I think we might have a concurrency issue here. Created this ticket for us to follow up later: #1187