Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: graceful pipeline interrupt #97

Draft
wants to merge 37 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
b96486c
enhancement: plugin interface
May 28, 2023
bb68599
lint
May 28, 2023
543bcf5
commentary
May 28, 2023
aa9d310
tighten unit test expectations around importer.Init() *genesis return…
May 28, 2023
f80f9d4
fix plugin docs: https://developer.algorand.org/docs/get-details/cond…
May 28, 2023
8b60bc3
small edits
May 29, 2023
f75141a
Update .markdownlint.yml
tzaffi May 29, 2023
650bdb7
cherry pick changes from docs-staleness branch
May 29, 2023
88d7b8f
Update .gitignore
tzaffi May 29, 2023
274aebf
Update .gitignore
tzaffi May 29, 2023
155e5ec
do we actually need the Config() method?
May 29, 2023
0e4d628
remove Config() everywhere
Jun 8, 2023
0eae647
remove Config() everywhere
Jun 8, 2023
9a3a95c
flatten PluginMetadata interface into Plugin interface
Jun 8, 2023
eb3e4fb
all the interfaces have Init() + importer has GetGenesis()
Jun 8, 2023
da2a27a
unify Init() inside of Plugin interface
Jun 8, 2023
7b36bde
Merge remote-tracking branch 'upstream/master' into plugin-interface
Jun 8, 2023
1301f19
fix unit tests that are supposed to error
Jun 8, 2023
f9f93b9
ErrorContains not Error()
Jun 8, 2023
d2fff8e
fix algod_importer tests
Jun 8, 2023
6863d42
Merge branch 'master' into plugin-interface
Jun 9, 2023
414d08d
md lint
Jun 9, 2023
4e41a97
handle interrupt signal
Jun 9, 2023
81c1b32
add round information to logs
Jun 9, 2023
ae6768b
Update conduit/metrics/metrics.go
tzaffi Jun 9, 2023
92a503d
assert that the genesis is non-nil when no error getting it
Jun 9, 2023
948ac5b
Merge branch 'plugin-interface' into graceful-interrupt
Jun 9, 2023
80a51e3
atomically load the round as well
Jun 9, 2023
bcaa20c
per CR suggestion - more stopping signals
Jun 9, 2023
25c2478
revert carelessly introduced bug
Jun 9, 2023
b8fef8f
add in missing zero-val'd errInit and errGetGen
Jun 9, 2023
a4ffc01
Merge branch 'plugin-interface' into graceful-interrupt
Jun 9, 2023
d0a68c6
no for range needed
Jun 9, 2023
06c096c
Merge remote-tracking branch 'algorand/master' into graceful-interrupt
Jun 12, 2023
800bef7
fix bad merge
Jun 12, 2023
476faef
Update conduit/plugins/importers/algod/algod_importer.go
tzaffi Jun 12, 2023
61ec15e
Update conduit/plugins/exporters/postgresql/postgresql_exporter.go
tzaffi Jun 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"fmt"
"net/http"
"os"
"os/signal"
"path"
"runtime/pprof"
"sync"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -329,6 +331,14 @@ func (p *pipelineImpl) Init() error {
go p.startMetricsServer()
}

stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
go func() {
sig := <-stop
p.logger.Infof("Pipeline received stopping signal <%v>, stopping pipeline. p.pipelineMetadata.NextRound: %d", sig, p.pipelineMetadata.NextRound)
p.Stop()
}()
Comment on lines +334 to +340
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You get the duplicate calls to plugins being closed because Stop is called twice. The other spot is in runConduitCmdWithConfig.

Maybe the cli package is the right place to install a signal handler? We pass a context into the pipeline, when the context is cancelled maybe we should implicitly call stop (or maybe cancelling the context is Stop and we get rid of the public function)

Copy link
Contributor Author

@tzaffi tzaffi Jun 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've converted this PR into a draft and encapsulated its goals in new issue #100


return err
}

Expand All @@ -338,31 +348,39 @@ func (p *pipelineImpl) Stop() {

if p.profFile != nil {
if err := p.profFile.Close(); err != nil {
p.logger.WithError(err).Errorf("%s: could not close CPUProf file", p.profFile.Name())
p.logger.WithError(err).Errorf("Pipeline.Stop(): %s: could not close CPUProf file", p.profFile.Name())
}
pprof.StopCPUProfile()
}

if p.cfg.PIDFilePath != "" {
if err := os.Remove(p.cfg.PIDFilePath); err != nil {
p.logger.WithError(err).Errorf("%s: could not remove pid file", p.cfg.PIDFilePath)
p.logger.WithError(err).Errorf("Pipeline.Stop(): %s: could not remove pid file", p.cfg.PIDFilePath)
} else {
p.logger.Infof("Pipeline.Stop(): %s: removed pid file", p.cfg.PIDFilePath)
}
}

if err := (*p.importer).Close(); err != nil {
// Log and continue on closing the rest of the pipeline
p.logger.Errorf("Pipeline.Stop(): Importer (%s) error on close: %v", (*p.importer).Metadata().Name, err)
} else {
p.logger.Infof("Pipeline.Stop(): Importer (%s) closed without error", (*p.importer).Metadata().Name)
}

for _, processor := range p.processors {
if err := (*processor).Close(); err != nil {
// Log and continue on closing the rest of the pipeline
p.logger.Errorf("Pipeline.Stop(): Processor (%s) error on close: %v", (*processor).Metadata().Name, err)
} else {
p.logger.Infof("Pipeline.Stop(): Processor (%s) closed without error", (*processor).Metadata().Name)
}
}

if err := (*p.exporter).Close(); err != nil {
p.logger.Errorf("Pipeline.Stop(): Exporter (%s) error on close: %v", (*p.exporter).Metadata().Name, err)
} else {
p.logger.Infof("Pipeline.Stop(): Exporter (%s) closed without error", (*p.exporter).Metadata().Name)
}
}

Expand Down
2 changes: 2 additions & 0 deletions conduit/plugins/exporters/postgresql/postgresql_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (exp *postgresqlExporter) Close() error {

exp.cf()
exp.wg.Wait()
exp.logger.Infof("exporter postgresql.Close() at round %d", atomic.LoadUint64(&exp.round))
return nil
}

Expand All @@ -167,6 +168,7 @@ func (exp *postgresqlExporter) Receive(exportData data.BlockData) error {
return err
}
atomic.StoreUint64(&exp.round, exportData.Round()+1)

tzaffi marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand Down
9 changes: 6 additions & 3 deletions conduit/plugins/importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"reflect"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -55,6 +56,7 @@ type algodImporter struct {
cancel context.CancelFunc
mode int
genesis *sdk.Genesis
round uint64
}

//go:embed sample.yaml
Expand Down Expand Up @@ -396,6 +398,7 @@ func (algodImp *algodImporter) Close() error {
if algodImp.cancel != nil {
algodImp.cancel()
}
algodImp.logger.Infof("importer algod.Close() at round %d", algodImp.round)
return nil
}

Expand Down Expand Up @@ -438,7 +441,7 @@ func (algodImp *algodImporter) GetBlock(rnd uint64) (data.BlockData, error) {
dt := time.Since(start)
getAlgodRawBlockTimeSeconds.Observe(dt.Seconds())
if err != nil {
algodImp.logger.Errorf("error getting block for round %d (attempt %d): %s", rnd, r, err.Error())
algodImp.logger.Infof("error getting block for round %d (attempt %d)", rnd, r)
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
continue
}
tmpBlk := new(models.BlockResponse)
Expand Down Expand Up @@ -468,8 +471,8 @@ func (algodImp *algodImporter) GetBlock(rnd uint64) (data.BlockData, error) {
blk.Delta = &delta
}
}

return blk, err
atomic.StoreUint64(&algodImp.round, rnd+1)
return blk, nil
}

err = fmt.Errorf("failed to get block for round %d after %d attempts, check node configuration: %s", rnd, retries, err)
Expand Down