diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go index 66a24b3b..418d4f56 100644 --- a/conduit/pipeline/pipeline.go +++ b/conduit/pipeline/pipeline.go @@ -6,9 +6,11 @@ import ( "fmt" "net/http" "os" + "os/signal" "path" "runtime/pprof" "sync" + "syscall" "time" "github.com/prometheus/client_golang/prometheus" @@ -329,6 +331,14 @@ func (p *pipelineImpl) Init() error { go p.startMetricsServer() } + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) + go func() { + sig := <-stop + p.logger.Infof("Pipeline received stopping signal <%v>, stopping pipeline. p.pipelineMetadata.NextRound: %d", sig, p.pipelineMetadata.NextRound) + p.Stop() + }() + return err } @@ -338,31 +348,39 @@ func (p *pipelineImpl) Stop() { if p.profFile != nil { if err := p.profFile.Close(); err != nil { - p.logger.WithError(err).Errorf("%s: could not close CPUProf file", p.profFile.Name()) + p.logger.WithError(err).Errorf("Pipeline.Stop(): %s: could not close CPUProf file", p.profFile.Name()) } pprof.StopCPUProfile() } if p.cfg.PIDFilePath != "" { if err := os.Remove(p.cfg.PIDFilePath); err != nil { - p.logger.WithError(err).Errorf("%s: could not remove pid file", p.cfg.PIDFilePath) + p.logger.WithError(err).Errorf("Pipeline.Stop(): %s: could not remove pid file", p.cfg.PIDFilePath) + } else { + p.logger.Infof("Pipeline.Stop(): %s: removed pid file", p.cfg.PIDFilePath) } } if err := (*p.importer).Close(); err != nil { // Log and continue on closing the rest of the pipeline p.logger.Errorf("Pipeline.Stop(): Importer (%s) error on close: %v", (*p.importer).Metadata().Name, err) + } else { + p.logger.Infof("Pipeline.Stop(): Importer (%s) closed without error", (*p.importer).Metadata().Name) } for _, processor := range p.processors { if err := (*processor).Close(); err != nil { // Log and continue on closing the rest of the pipeline p.logger.Errorf("Pipeline.Stop(): Processor (%s) error on close: %v", (*processor).Metadata().Name, err) + } else { + p.logger.Infof("Pipeline.Stop(): Processor (%s) closed without error", (*processor).Metadata().Name) } } if err := (*p.exporter).Close(); err != nil { p.logger.Errorf("Pipeline.Stop(): Exporter (%s) error on close: %v", (*p.exporter).Metadata().Name, err) + } else { + p.logger.Infof("Pipeline.Stop(): Exporter (%s) closed without error", (*p.exporter).Metadata().Name) } } diff --git a/conduit/plugins/exporters/postgresql/postgresql_exporter.go b/conduit/plugins/exporters/postgresql/postgresql_exporter.go index e46d6c82..b8ebcc2d 100644 --- a/conduit/plugins/exporters/postgresql/postgresql_exporter.go +++ b/conduit/plugins/exporters/postgresql/postgresql_exporter.go @@ -148,6 +148,7 @@ func (exp *postgresqlExporter) Close() error { exp.cf() exp.wg.Wait() + exp.logger.Infof("exporter postgresql.Close() at round %d", atomic.LoadUint64(&exp.round)) return nil } diff --git a/conduit/plugins/importers/algod/algod_importer.go b/conduit/plugins/importers/algod/algod_importer.go index 5893f826..f0e50e8a 100644 --- a/conduit/plugins/importers/algod/algod_importer.go +++ b/conduit/plugins/importers/algod/algod_importer.go @@ -11,6 +11,7 @@ import ( "reflect" "strconv" "strings" + "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -55,6 +56,7 @@ type algodImporter struct { cancel context.CancelFunc mode int genesis *sdk.Genesis + round uint64 } //go:embed sample.yaml @@ -396,6 +398,7 @@ func (algodImp *algodImporter) Close() error { if algodImp.cancel != nil { algodImp.cancel() } + algodImp.logger.Infof("importer algod.Close() at round %d", algodImp.round) return nil } @@ -468,8 +471,8 @@ func (algodImp *algodImporter) GetBlock(rnd uint64) (data.BlockData, error) { blk.Delta = &delta } } - - return blk, err + atomic.StoreUint64(&algodImp.round, rnd+1) + return blk, nil } err = fmt.Errorf("failed to get block for round %d after %d attempts, check node configuration: %s", rnd, retries, err)