Skip to content

Commit

Permalink
Added new logger
Browse files Browse the repository at this point in the history
Resolves #1245

Adds a multi-threaded logger instance for conduit usage
  • Loading branch information
AlgoStephenAkiki committed Oct 12, 2022
1 parent 6a6fd87 commit f187867
Show file tree
Hide file tree
Showing 23 changed files with 168 additions and 96 deletions.
8 changes: 5 additions & 3 deletions cmd/algorand-indexer/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/algorand/indexer/loggers"
"os"
"os/signal"
"path/filepath"
Expand Down Expand Up @@ -340,7 +341,8 @@ func runDaemon(daemonConfig *daemonConfig) error {
defer db.Close()
var dataError func() error
if daemonConfig.noAlgod != true {
pipeline := runConduitPipeline(ctx, availableCh, daemonConfig)
loggerManager := loggers.AdaptLogger(logger)
pipeline := runConduitPipeline(ctx, availableCh, daemonConfig, loggerManager.MakeLogger())
if pipeline != nil {
dataError = pipeline.Error
defer pipeline.Stop()
Expand Down Expand Up @@ -393,7 +395,7 @@ func makeConduitConfig(dCfg *daemonConfig) conduit.PipelineConfig {

}

func runConduitPipeline(ctx context.Context, dbAvailable chan struct{}, dCfg *daemonConfig) conduit.Pipeline {
func runConduitPipeline(ctx context.Context, dbAvailable chan struct{}, dCfg *daemonConfig, loggerMT *loggers.MT) conduit.Pipeline {
// Need to redefine exitHandler() for every go-routine
defer exitHandler()

Expand All @@ -403,7 +405,7 @@ func runConduitPipeline(ctx context.Context, dbAvailable chan struct{}, dCfg *da
var pipeline conduit.Pipeline
var err error
pcfg := makeConduitConfig(dCfg)
if pipeline, err = conduit.MakePipeline(ctx, &pcfg, logger); err != nil {
if pipeline, err = conduit.MakePipeline(ctx, &pcfg, loggerMT); err != nil {
logger.Errorf("%v", err)
panic(exit{1})
}
Expand Down
18 changes: 6 additions & 12 deletions cmd/conduit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"os"
"path/filepath"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/algorand/indexer/conduit"
"github.com/algorand/indexer/loggers"
)

import (
Expand All @@ -22,7 +22,8 @@ import (
)

var (
logger *log.Logger
loggerManager *loggers.LoggerManager
logger *loggers.MT
conduitCmd = makeConduitCmd()
initCmd = makeInitCmd()
defaultDataDirectory = "data"
Expand All @@ -31,19 +32,12 @@ var (
// init() function for main package
func init() {

loggerManager = loggers.MakeLoggerManager(os.Stdout)
// Setup logger
logger = log.New()

formatter := conduit.PluginLogFormatter{
Formatter: &log.JSONFormatter{
DisableHTMLEscape: true,
},
Type: "Conduit",
Name: "main",
}
logger = loggerManager.MakeLogger()

formatter := conduit.MakePluginLogFormatter("Conduit", "main")
logger.SetFormatter(&formatter)
logger.SetOutput(os.Stdout)

conduitCmd.AddCommand(initCmd)
}
Expand Down
4 changes: 2 additions & 2 deletions conduit/common.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package conduit

import log "github.com/sirupsen/logrus"
import "github.com/algorand/indexer/loggers"

// HandlePanic function to log panics in a common way
func HandlePanic(logger *log.Logger) {
func HandlePanic(logger *loggers.MT) {
if r := recover(); r != nil {
logger.Panicf("conduit pipeline experienced a panic: %v", r)
}
Expand Down
3 changes: 2 additions & 1 deletion conduit/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ func (f PluginLogFormatter) Format(entry *log.Entry) ([]byte, error) {
return f.Formatter.Format(entry)
}

func makePluginLogFormatter(pluginType string, pluginName string) PluginLogFormatter {
// MakePluginLogFormatter creates a json formatter with plugin name and type
func MakePluginLogFormatter(pluginType string, pluginName string) PluginLogFormatter {
return PluginLogFormatter{
Formatter: &log.JSONFormatter{
DisableHTMLEscape: true,
Expand Down
9 changes: 6 additions & 3 deletions conduit/logging_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package conduit

import (
"github.com/algorand/indexer/loggers"
"os"
"testing"
"time"

Expand All @@ -14,16 +16,17 @@ func TestPluginLogFormatter_Format(t *testing.T) {
pluginType := "A Question"
pluginName := "What's in a name?"

pluginFormatter := makePluginLogFormatter(pluginType, pluginName)
pluginFormatter := MakePluginLogFormatter(pluginType, pluginName)

l := log.New()
lMgr := loggers.MakeLoggerManager(os.Stdout)
l := lMgr.MakeLogger()

entry := &log.Entry{
Time: time.Time{},
Level: log.InfoLevel,
Message: "That which we call a rose by any other name would smell just as sweet.",
Data: log.Fields{},
Logger: l,
Logger: l.Logger,
}

bytes, err := pluginFormatter.Format(entry)
Expand Down
30 changes: 16 additions & 14 deletions conduit/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package conduit
import (
"context"
"fmt"
"github.com/algorand/indexer/loggers"
"github.com/algorand/indexer/util/metrics"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
Expand Down Expand Up @@ -69,7 +70,7 @@ func (cfg *PipelineConfig) Valid() error {
}

// MakePipelineConfig creates a pipeline configuration
func MakePipelineConfig(logger *log.Logger, cfg *Config) (*PipelineConfig, error) {
func MakePipelineConfig(logger *loggers.MT, cfg *Config) (*PipelineConfig, error) {
if cfg == nil {
return nil, fmt.Errorf("MakePipelineConfig(): empty conduit config")
}
Expand Down Expand Up @@ -133,17 +134,18 @@ type pipelineImpl struct {
cf context.CancelFunc
wg sync.WaitGroup
cfg *PipelineConfig
logger *log.Logger
logger *loggers.MT
profFile *os.File
err error
mu sync.RWMutex

initProvider *data.InitProvider

importer *importers.Importer
processors []*processors.Processor
exporter *exporters.Exporter
round basics.Round
importer *importers.Importer
processors []*processors.Processor
exporter *exporters.Exporter
round basics.Round
loggerManager *loggers.LoggerManager
}

func (p *pipelineImpl) Error() error {
Expand Down Expand Up @@ -179,7 +181,7 @@ func (p *pipelineImpl) Init() error {
}

if p.cfg.PIDFilePath != "" {
err := util.CreateIndexerPidFile(p.logger, p.cfg.PIDFilePath)
err := util.CreateIndexerPidFile(p.logger.Logger, p.cfg.PIDFilePath)
if err != nil {
return err
}
Expand All @@ -188,8 +190,8 @@ func (p *pipelineImpl) Init() error {
// TODO Need to change interfaces to accept config of map[string]interface{}

// Initialize Exporter first since the pipeline derives its round from the Exporter
exporterLogger := log.New()
exporterLogger.SetFormatter(makePluginLogFormatter(plugins.Exporter, (*p.exporter).Metadata().Name()))
exporterLogger := p.loggerManager.MakeLogger()
exporterLogger.SetFormatter(MakePluginLogFormatter(plugins.Exporter, (*p.exporter).Metadata().Name()))

jsonEncode := string(json.Encode(p.cfg.Exporter.Config))
err := (*p.exporter).Init(plugins.PluginConfig(jsonEncode), exporterLogger)
Expand All @@ -200,9 +202,9 @@ func (p *pipelineImpl) Init() error {
p.logger.Infof("Initialized Exporter: %s", exporterName)

// Initialize Importer
importerLogger := log.New()
importerLogger := p.loggerManager.MakeLogger()
importerName := (*p.importer).Metadata().Name()
importerLogger.SetFormatter(makePluginLogFormatter(plugins.Importer, importerName))
importerLogger.SetFormatter(MakePluginLogFormatter(plugins.Importer, importerName))

// TODO modify/fix ?
jsonEncode = string(json.Encode(p.cfg.Importer.Config))
Expand All @@ -226,8 +228,8 @@ func (p *pipelineImpl) Init() error {
p.initProvider = &initProvider

for idx, processor := range p.processors {
processorLogger := log.New()
processorLogger.SetFormatter(makePluginLogFormatter(plugins.Processor, (*processor).Metadata().Name()))
processorLogger := p.loggerManager.MakeLogger()
processorLogger.SetFormatter(MakePluginLogFormatter(plugins.Processor, (*processor).Metadata().Name()))
jsonEncode = string(json.Encode(p.cfg.Processors[idx].Config))
err := (*processor).Init(p.ctx, *p.initProvider, plugins.PluginConfig(jsonEncode), processorLogger)
processorName := (*processor).Metadata().Name()
Expand Down Expand Up @@ -357,7 +359,7 @@ func (p *pipelineImpl) Wait() {
}

// MakePipeline creates a Pipeline
func MakePipeline(ctx context.Context, cfg *PipelineConfig, logger *log.Logger) (Pipeline, error) {
func MakePipeline(ctx context.Context, cfg *PipelineConfig, logger *loggers.MT) (Pipeline, error) {

if cfg == nil {
return nil, fmt.Errorf("MakePipeline(): pipeline config was empty")
Expand Down
29 changes: 16 additions & 13 deletions conduit/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package conduit
import (
"context"
"fmt"
"github.com/algorand/indexer/loggers"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"

log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

Expand All @@ -22,6 +22,8 @@ import (
"github.com/algorand/indexer/processors"
)

var lMgr *loggers.LoggerManager = loggers.MakeLoggerManager(os.Stdout)

// TestPipelineConfigValidity tests the Valid() function for the PipelineConfig
func TestPipelineConfigValidity(t *testing.T) {
tests := []struct {
Expand Down Expand Up @@ -80,7 +82,7 @@ func TestPipelineConfigValidity(t *testing.T) {
// TestMakePipelineConfig tests making the pipeline configuration
func TestMakePipelineConfig(t *testing.T) {

l := log.New()
l := lMgr.MakeLogger()

_, err := MakePipelineConfig(l, nil)
assert.Equal(t, fmt.Errorf("MakePipelineConfig(): empty conduit config"), err)
Expand Down Expand Up @@ -148,7 +150,7 @@ type mockImporter struct {
returnError bool
}

func (m *mockImporter) Init(_ context.Context, _ plugins.PluginConfig, _ *log.Logger) (*bookkeeping.Genesis, error) {
func (m *mockImporter) Init(_ context.Context, _ plugins.PluginConfig, _ *loggers.MT) (*bookkeeping.Genesis, error) {
return &bookkeeping.Genesis{}, nil
}

Expand Down Expand Up @@ -178,7 +180,7 @@ type mockProcessor struct {
onCompleteError bool
}

func (m *mockProcessor) Init(_ context.Context, _ data.InitProvider, _ plugins.PluginConfig, _ *log.Logger) error {
func (m *mockProcessor) Init(_ context.Context, _ data.InitProvider, _ plugins.PluginConfig, _ *loggers.MT) error {
return nil
}

Expand Down Expand Up @@ -221,7 +223,7 @@ func (m *mockExporter) Metadata() exporters.ExporterMetadata {
}
}

func (m *mockExporter) Init(_ plugins.PluginConfig, _ *log.Logger) error {
func (m *mockExporter) Init(_ plugins.PluginConfig, _ *loggers.MT) error {
return nil
}

Expand Down Expand Up @@ -269,7 +271,7 @@ func TestPipelineRun(t *testing.T) {
ctx: ctx,
cf: cf,
cfg: &PipelineConfig{},
logger: log.New(),
logger: lMgr.MakeLogger(),
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
Expand Down Expand Up @@ -319,12 +321,13 @@ func TestPipelineCpuPidFiles(t *testing.T) {
Config: map[string]interface{}{},
},
},
logger: log.New(),
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
round: 0,
loggerManager: lMgr,
logger: lMgr.MakeLogger(),
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
exporter: &pExporter,
round: 0,
}

err := pImpl.Init()
Expand Down Expand Up @@ -375,7 +378,7 @@ func TestPipelineErrors(t *testing.T) {
ctx: ctx,
cf: cf,
cfg: &PipelineConfig{},
logger: log.New(),
logger: lMgr.MakeLogger(),
initProvider: nil,
importer: &pImporter,
processors: []*processors.Processor{&pProcessor},
Expand Down
4 changes: 2 additions & 2 deletions exporters/example/example_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/indexer/data"
"github.com/algorand/indexer/exporters"
"github.com/algorand/indexer/loggers"
"github.com/algorand/indexer/plugins"
"github.com/sirupsen/logrus"
)

// This is our exporter object. It should store all the in memory data required to run the Exporter.
Expand Down Expand Up @@ -33,7 +33,7 @@ func (exp *exampleExporter) Metadata() exporters.ExporterMetadata {
}

// Init provides the opportunity for your Exporter to initialize connections, store config variables, etc.
func (exp *exampleExporter) Init(_ plugins.PluginConfig, _ *logrus.Logger) error {
func (exp *exampleExporter) Init(_ plugins.PluginConfig, _ *loggers.MT) error {
panic("not implemented")
}

Expand Down
4 changes: 2 additions & 2 deletions exporters/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package exporters
import (
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/indexer/data"
"github.com/algorand/indexer/loggers"
"github.com/algorand/indexer/plugins"
"github.com/sirupsen/logrus"
)

// Exporter defines the interface for plugins
Expand All @@ -16,7 +16,7 @@ type Exporter interface {
// Typically used for things like initializing network connections.
// The ExporterConfig passed to Connect will contain the Unmarhsalled config file specific to this plugin.
// Should return an error if it fails--this will result in the Indexer process terminating.
Init(cfg plugins.PluginConfig, logger *logrus.Logger) error
Init(cfg plugins.PluginConfig, logger *loggers.MT) error

// Config returns the configuration options used to create an Exporter.
// Initialized during Connect, it should return nil until the Exporter has been Connected.
Expand Down
6 changes: 3 additions & 3 deletions exporters/filewriter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/algorand/indexer/loggers"
"os"
"path"

"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"

"github.com/algorand/go-algorand/crypto"
Expand All @@ -25,7 +25,7 @@ type fileExporter struct {
blockMetadataFile string
blockMetadata BlockMetaData
cfg ExporterConfig
logger *logrus.Logger
logger *loggers.MT
}

var fileExporterMetadata = exporters.ExporterMetadata{
Expand Down Expand Up @@ -55,7 +55,7 @@ func (exp *fileExporter) Metadata() exporters.ExporterMetadata {
return fileExporterMetadata
}

func (exp *fileExporter) Init(cfg plugins.PluginConfig, logger *logrus.Logger) error {
func (exp *fileExporter) Init(cfg plugins.PluginConfig, logger *loggers.MT) error {
exp.logger = logger
if err := exp.unmarhshalConfig(string(cfg)); err != nil {
return fmt.Errorf("connect failure in unmarshalConfig: %w", err)
Expand Down
Loading

0 comments on commit f187867

Please sign in to comment.