-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
conduit: Convert daemon to conduit pipeline #1208
conduit: Convert daemon to conduit pipeline #1208
Conversation
Codecov Report
@@ Coverage Diff @@
## conduit #1208 +/- ##
==========================================
Coverage ? 61.58%
==========================================
Files ? 64
Lines ? 9126
Branches ? 0
==========================================
Hits ? 5620
Misses ? 3021
Partials ? 485 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
if si.fetcher != nil && si.fetcher.Error() != "" { | ||
errors = append(errors, fmt.Sprintf("fetcher error: %s", si.fetcher.Error())) | ||
if si.dataError != nil { | ||
if err := si.dataError(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if err := si.dataError(); err != nil { | |
if si.dataError != nil && si.dataError() != nil { | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I've separated it out is that we need to add err
to the set of known errors. We could just call si.dataError()
again inside the block to reference the error, but because the pipeline will keep retrying it's possible that the first call would show an error but the second call might return nil
.
This way we have a single call which either returns an error that is added to the health check or it's nil
.
conduit/pipeline.go
Outdated
logger *log.Logger | ||
ctx context.Context | ||
cf context.CancelFunc | ||
running sync.WaitGroup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: consider naming this wg so it's clear that it's a go waitgroup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've copied the name from https://github.com/algorand/go-algorand/blob/master/catchup/catchpointService.go#L75 since I liked it.
To me, running is a waitgroup counting the running goroutine(1)
makes sense. Open to changing it if we want to stick with wg
for waitgroup vars though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Shiqi here. Though I've never seen it written down, calling this wg
seems to be a go idiom similar to ctx
and cf
above.
conduit/pipeline.go
Outdated
return err | ||
p.logger.Errorf("%v", err) | ||
p.err = err | ||
goto pipelineRun |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will retry forever until err==nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried to mimic the behaviour that we have in fetcher. https://github.com/algorand/indexer/blob/develop/fetcher/fetcher.go#L219
This will run until the context is expires, which is probably whenever the cancel func is called. Errors can be raised by individual plugins, at which point we will set p.err
so that it can be referenced by the p.Error()
function, and the pipeline will start running again from the Importer on the same round.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this was done in order to match the current Indexer behavior?
Does Indexer also retry on processor and writer errors?
Thinking out loud here, do you think that the error handling policy should be configurable per plugin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this was done in order to match the current Indexer behavior?
Yes, I've tried to match the behaviour of the current Indexer here.
Does Indexer also retry on processor and writer errors?
The Indexer's data pipeline is controlled by fetcher which runs two things concurrently:
- Fetching and enqueuing blocks
- Running the block handler on the queued blocks
The fetcher will stop running when:
- The cancel function is called or the context is
Done
- It receives block bytes from algod which it cannot decode into a Block object
- The handler returns an error
However, the handler we're using will just continue retrying when it errors. The effect is that fetcher only exits on malformed block data or context canceling.
Thinking out loud here, do you think that the error handling policy should be configurable per plugin?
I think it's a good idea to allow plugins to either return an error that can be retried or one that causes the pipeline to terminate. At the moment though, if we want to be backwards compatible w/ Indexer, all errors in the existing plugins would be non-fatal (except for block decoding).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, thanks for the clarifications. This approach sounds good to me.
conduit/pipeline.go
Outdated
} | ||
} | ||
// Increment Round | ||
p.err = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this need to be set to nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an error has occurred in one of the plugins, p.err
will be set to that. The pipeline will continue trying to process this round, and if it is successful we set the error to nil and move onto processing the next round. It's similar to https://github.com/algorand/indexer/blob/develop/fetcher/fetcher.go#L182 for reference.
adf9386
to
1e26d3b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really good. The only thing that really should change is how the blocking is done in runConduitCmdWithConfig
.
cmd/algorand-indexer/daemon.go
Outdated
@@ -4,12 +4,12 @@ import ( | |||
"context" | |||
"errors" | |||
"fmt" | |||
"github.com/algorand/indexer/conduit" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: import grouping
return err | ||
} | ||
|
||
func runBlockImporter(ctx context.Context, cfg *daemonConfig, wg *sync.WaitGroup, db idb.IndexerDb, dbAvailable chan struct{}, bot fetcher.Fetcher, opts idb.IndexerDbOptions) { | ||
func makeConduitConfig(dCfg *daemonConfig) conduit.PipelineConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
cmd/conduit/main.go
Outdated
// Make sure to call this so we can shutdown if there is an error | ||
defer pipeline.Stop() | ||
|
||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a busy loop. A good way to handle this is to have Start()
block. Start it in a go routine and use a sync.WaitGroup. Something like:
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
pipeline.Start()
} ()
wg.Wait()
conduit/pipeline.go
Outdated
func (p *pipelineImpl) Error() error { | ||
return p.err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤡 : This pattern is a bit of a hack, I wasn't happy when adding it to the daemon. I don't have a good suggestion for fixing it.
conduit/pipeline_test.go
Outdated
// Give some time for the goroutine to start | ||
time.Sleep(1 * time.Second) | ||
err := pImpl.Start() | ||
assert.Nil(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: assert.NoError(t, err)
}() | ||
|
||
// Give some time for the goroutine to start | ||
time.Sleep(1 * time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't the delay needed anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1...just curious
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pipeline.Start
synchronously creates the PID file and CPU Profile if necessary, launches the goroutine which runs block imports, and then returns. So since we're not creating these files in a goroutine anymore they will always be created by the time we've returned.
} | ||
|
||
func (p *pipelineImpl) Stop() error { | ||
func (p *pipelineImpl) Stop() { | ||
p.cf() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this cancel function set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In MakePipeline
which takes a context, we create a cancel context and assign it to the pipelineImpl.
The one thing which I haven't included is an easy way to reset that context. So if you call Start
and then Stop
things wouldn't really work if you called Start
again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we just say that once you call stop you are closing conduit.
conduit/pipeline.go
Outdated
return p.RunPipeline() | ||
go p.runPipeline() | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed this in my first pass. I think it's better to let the caller decide when to start a go routine, that way it's more natural to control a graceful shutdown with sync.WaitGroup.
It is nice that the pidFile/profFile tests are deterministic this way.
Maybe it would be the best of both solutions to split Init/Start by renaming:
Start
->Initialize
runPipeline
->Start
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved all of the goroutine specific code into Start
and added a synchronous Init
function. Also added a blocking Wait
function to the pipeline so that we don't have an empty loop places where we have no other blocking operations to wait on.
Slightly modified the error variable used in the handlers, but like you've said I don't think there's an idiomatic way to achieve that other than just protecting reads/writes with a mutex.
Let me know what you think...I tried your suggestion above of moving wait group management into the conduit daemon, but it ended up splitting up the wait group and context handling/blocking in ways that I didn't like.
Summary
Converts
algorand-indexer
's data pipeline to a conduit pipeline.Test Plan
Indexer e2e tests.