Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

file-plugins: enabling messagepack format #142

Merged
merged 24 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 12 additions & 20 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ import (
"fmt"
"net/http"
"os"
"path"
"runtime/pprof"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
yaml "gopkg.in/yaml.v3"

sdk "github.com/algorand/go-algorand-sdk/v2/types"

Expand Down Expand Up @@ -103,29 +101,23 @@ func (p *pipelineImpl) registerPluginMetricsCallbacks() {
}
}

// makeConfig creates a plugin config from a name and config pair.
// configWithLogger creates a plugin config from a name and config pair.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why rename this? Seems unrelated to the rest of the PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to set up the integration test, I wanted to break-out the pure config generating portion to a new method pluginType.GetConfig(). With this refactoring, it's clearer that this method does 2 things:

  1. sets up the plugin's config
  2. returns a logger that inherits the pipeline's logger setup

These seemed sufficiently unrelated to me that I thought it was worth rename.

However, renaming is not at all crucial.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// It also creates a logger for the plugin and configures it using the pipeline's log settings.
func (p *pipelineImpl) makeConfig(cfg data.NameConfigPair, pluginType plugins.PluginType) (*log.Logger, plugins.PluginConfig, error) {
configs, err := yaml.Marshal(cfg.Config)
func (p *pipelineImpl) configWithLogger(cfg data.NameConfigPair, pluginType plugins.PluginType) (*log.Logger, plugins.PluginConfig, error) {
var dataDir string
if p.cfg.ConduitArgs != nil {
dataDir = p.cfg.ConduitArgs.ConduitDataDir
}
config, err := pluginType.GetConfig(cfg, dataDir)
if err != nil {
return nil, plugins.PluginConfig{}, fmt.Errorf("makeConfig(): could not serialize config: %w", err)
return nil, plugins.PluginConfig{}, fmt.Errorf("configWithLogger(): unable to create plugin config: %w", err)
}

lgr := log.New()
lgr.SetOutput(p.logger.Out)
lgr.SetLevel(p.logger.Level)
lgr.SetFormatter(makePluginLogFormatter(string(pluginType), cfg.Name))

var config plugins.PluginConfig
config.Config = string(configs)
if p.cfg != nil && p.cfg.ConduitArgs != nil {
config.DataDir = path.Join(p.cfg.ConduitArgs.ConduitDataDir, fmt.Sprintf("%s_%s", pluginType, cfg.Name))
err = os.MkdirAll(config.DataDir, os.ModePerm)
if err != nil {
return nil, plugins.PluginConfig{}, fmt.Errorf("makeConfig: unable to create plugin data directory: %w", err)
}
}

return lgr, config, nil
}

Expand Down Expand Up @@ -171,7 +163,7 @@ func (p *pipelineImpl) pluginRoundOverride() (uint64, error) {
var pluginOverride uint64
var pluginOverrideName string // cache this in case of error.
for _, part := range parts {
_, config, err := p.makeConfig(part.cfg, part.t)
_, config, err := p.configWithLogger(part.cfg, part.t)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -302,7 +294,7 @@ func (p *pipelineImpl) Init() error {

// Initialize Importer
{
importerLogger, pluginConfig, err := p.makeConfig(p.cfg.Importer, plugins.Importer)
importerLogger, pluginConfig, err := p.configWithLogger(p.cfg.Importer, plugins.Importer)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not make %s config: %w", p.cfg.Importer.Name, err)
}
Expand Down Expand Up @@ -335,7 +327,7 @@ func (p *pipelineImpl) Init() error {
// Initialize Processors
for idx, processor := range p.processors {
ncPair := p.cfg.Processors[idx]
logger, config, err := p.makeConfig(ncPair, plugins.Processor)
logger, config, err := p.configWithLogger(ncPair, plugins.Processor)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair, err)
}
Expand All @@ -348,7 +340,7 @@ func (p *pipelineImpl) Init() error {

// Initialize Exporter
{
logger, config, err := p.makeConfig(p.cfg.Exporter, plugins.Exporter)
logger, config, err := p.configWithLogger(p.cfg.Exporter, plugins.Exporter)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", p.cfg.Exporter.Name, err)
}
Expand Down
37 changes: 30 additions & 7 deletions conduit/plugins/exporters/filewriter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ import (
const (
// PluginName to use when configuring.
PluginName = "file_writer"

// FilePattern is used to name the output files.
FilePattern = "%[1]d_block.json"
FilePattern = "%[1]d_block.msgp.gz"
)

type fileExporter struct {
round uint64
cfg Config
gzip bool
format EncodingFormat
logger *logrus.Logger
}

Expand All @@ -51,20 +54,38 @@ func (exp *fileExporter) Init(_ context.Context, initProvider data.InitProvider,
if exp.cfg.FilenamePattern == "" {
exp.cfg.FilenamePattern = FilePattern
}
exp.format, exp.gzip, err = ParseFilenamePattern(exp.cfg.FilenamePattern)
if err != nil {
return fmt.Errorf("Init() error: %w", err)
}

// default to the data directory if no override provided.
if exp.cfg.BlocksDir == "" {
exp.cfg.BlocksDir = cfg.DataDir
}
// create block directory
err = os.Mkdir(exp.cfg.BlocksDir, 0755)
if err != nil && errors.Is(err, os.ErrExist) {
// Ignore mkdir if the dir exists
err = nil
} else if err != nil {
if err != nil && !errors.Is(err, os.ErrExist) {
// Ignore mkdir err if the dir exists (case errors.Is(err, os.ErrExist))
return fmt.Errorf("Init() error: %w", err)
}

exp.round = uint64(initProvider.NextDBRound())
return err

// export the genesis as well in the same format
genesis := initProvider.GetGenesis()
genesisFile, err := GenesisFilename(exp.format, exp.gzip)
if err != nil {
return fmt.Errorf("Init() error: %w", err)
}

genesisPath := path.Join(exp.cfg.BlocksDir, genesisFile)
err = EncodeToFile(genesisPath, genesis, exp.format, exp.gzip)
if err != nil {
return fmt.Errorf("Init() error sending to genesisPath=%s: %w", genesisPath, err)
}

return nil
}

func (exp *fileExporter) Close() error {
Expand All @@ -87,10 +108,12 @@ func (exp *fileExporter) Receive(exportData data.BlockData) error {
}

blockFile := path.Join(exp.cfg.BlocksDir, fmt.Sprintf(exp.cfg.FilenamePattern, exportData.Round()))
err := EncodeJSONToFile(blockFile, exportData, true)

err := EncodeToFile(blockFile, &exportData, exp.format, exp.gzip)
if err != nil {
return fmt.Errorf("Receive(): failed to write file %s: %w", blockFile, err)
}

exp.logger.Infof("Wrote block %d to %s", exportData.Round(), blockFile)
}

Expand Down
111 changes: 69 additions & 42 deletions conduit/plugins/exporters/filewriter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

Expand All @@ -21,29 +20,45 @@ import (
sdk "github.com/algorand/go-algorand-sdk/v2/types"
)

const (
defaultEncodingFormat = MessagepackFormat
defaultIsGzip = true
)

winder marked this conversation as resolved.
Show resolved Hide resolved
var logger *logrus.Logger
var fileCons = exporters.ExporterConstructorFunc(func() exporters.Exporter {
return &fileExporter{}
})
var configTemplate = "block-dir: %s/blocks\n"
var configTemplatePrefix = "block-dir: %s/blocks\n"
var round = sdk.Round(2)

func init() {
logger, _ = test.NewNullLogger()
}

func getConfig(t *testing.T) (config, tempdir string) {
func getConfigWithoutPattern(t *testing.T) (config, tempdir string) {
tempdir = t.TempDir()
config = fmt.Sprintf(configTemplate, tempdir)
config = fmt.Sprintf(configTemplatePrefix, tempdir)
return
}

func getConfigWithPattern(t *testing.T, pattern string) (config, tempdir string) {
config, tempdir = getConfigWithoutPattern(t)
config = fmt.Sprintf("%sfilename-pattern: '%s'\n", config, pattern)
return
}

func TestDefaults(t *testing.T) {
require.Equal(t, defaultEncodingFormat, MessagepackFormat)
require.Equal(t, defaultIsGzip, true)
}

func TestExporterMetadata(t *testing.T) {
fileExp := fileCons.New()
meta := fileExp.Metadata()
assert.Equal(t, metadata.Name, meta.Name)
assert.Equal(t, metadata.Description, meta.Description)
assert.Equal(t, metadata.Deprecated, meta.Deprecated)
require.Equal(t, metadata.Name, meta.Name)
require.Equal(t, metadata.Description, meta.Description)
require.Equal(t, metadata.Deprecated, meta.Deprecated)
}

func TestExporterInitDefaults(t *testing.T) {
Expand Down Expand Up @@ -87,18 +102,18 @@ func TestExporterInitDefaults(t *testing.T) {
}

func TestExporterInit(t *testing.T) {
config, _ := getConfig(t)
config, _ := getConfigWithPattern(t, "%[1]d_block.json")
fileExp := fileCons.New()
defer fileExp.Close()

// creates a new output file
err := fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), plugins.MakePluginConfig(config), logger)
assert.NoError(t, err)
require.NoError(t, err)
fileExp.Close()

// can open existing file
err = fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), plugins.MakePluginConfig(config), logger)
assert.NoError(t, err)
require.NoError(t, err)
fileExp.Close()
}

Expand Down Expand Up @@ -155,55 +170,67 @@ func sendData(t *testing.T, fileExp exporters.Exporter, config string, numRounds
}

func TestExporterReceive(t *testing.T) {
config, tempdir := getConfig(t)
fileExp := fileCons.New()
numRounds := 5
sendData(t, fileExp, config, numRounds)

// block data is valid
for i := 0; i < 5; i++ {
filename := fmt.Sprintf(FilePattern, i)
path := fmt.Sprintf("%s/blocks/%s", tempdir, filename)
require.FileExists(t, path)

blockBytes, err := os.ReadFile(path)
require.NoError(t, err)
assert.NotContains(t, string(blockBytes), " 0: ")
patterns := []string{
"%[1]d_block.json",
"%[1]d_block.json.gz",
"%[1]d_block.msgp",
"%[1]d_block.msgp.gz",
}
for _, pattern := range patterns {
pattern := pattern
t.Run(pattern, func(t *testing.T) {
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()

var blockData data.BlockData
err = DecodeJSONFromFile(path, &blockData, true)
require.Equal(t, sdk.Round(i), blockData.BlockHeader.Round)
require.NoError(t, err)
require.NotNil(t, blockData.Certificate)
format, isGzip, err := ParseFilenamePattern(pattern)
require.NoError(t, err)
config, tempdir := getConfigWithPattern(t, pattern)
fileExp := fileCons.New()
numRounds := 5
sendData(t, fileExp, config, numRounds)

// block data is valid
for i := 0; i < 5; i++ {
filename := fmt.Sprintf(pattern, i)
path := fmt.Sprintf("%s/blocks/%s", tempdir, filename)
require.FileExists(t, path)

blockBytes, err := os.ReadFile(path)
require.NoError(t, err)
require.NotContains(t, string(blockBytes), " 0: ")

var blockData data.BlockData
err = DecodeFromFile(path, &blockData, format, isGzip)
require.NoError(t, err)
require.Equal(t, sdk.Round(i), blockData.BlockHeader.Round)
require.NotNil(t, blockData.Certificate)
}
})
}
}

func TestExporterClose(t *testing.T) {
config, _ := getConfig(t)
config, _ := getConfigWithoutPattern(t)
fileExp := fileCons.New()
rnd := sdk.Round(0)
fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&rnd, nil, nil), plugins.MakePluginConfig(config), logger)
require.NoError(t, fileExp.Close())
}

func TestPatternOverride(t *testing.T) {
config, tempdir := getConfig(t)
func TestPatternDefault(t *testing.T) {
config, tempdir := getConfigWithoutPattern(t)
fileExp := fileCons.New()

patternOverride := "PREFIX_%[1]d_block.json"
config = fmt.Sprintf("%sfilename-pattern: '%s'\n", config, patternOverride)

numRounds := 5
sendData(t, fileExp, config, numRounds)

// block data is valid
for i := 0; i < 5; i++ {
filename := fmt.Sprintf(patternOverride, i)
filename := fmt.Sprintf(FilePattern, i)
path := fmt.Sprintf("%s/blocks/%s", tempdir, filename)
assert.FileExists(t, path)
require.FileExists(t, path)

var blockData data.BlockData
err := DecodeJSONFromFile(path, &blockData, true)
err := DecodeFromFile(path, &blockData, defaultEncodingFormat, defaultIsGzip)
require.Equal(t, sdk.Round(i), blockData.BlockHeader.Round)
require.NoError(t, err)
require.NotNil(t, blockData.Certificate)
Expand All @@ -227,10 +254,10 @@ func TestDropCertificate(t *testing.T) {
for i := 0; i < numRounds; i++ {
filename := fmt.Sprintf(FilePattern, i)
path := fmt.Sprintf("%s/%s", tempdir, filename)
assert.FileExists(t, path)
require.FileExists(t, path)
var blockData data.BlockData
err := DecodeJSONFromFile(path, &blockData, true)
assert.NoError(t, err)
assert.Nil(t, blockData.Certificate)
err := DecodeFromFile(path, &blockData, defaultEncodingFormat, defaultIsGzip)
require.NoError(t, err)
require.Nil(t, blockData.Certificate)
}
}
4 changes: 2 additions & 2 deletions conduit/plugins/exporters/filewriter/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ config:
# FilenamePattern is the format used to write block files. It uses go
# string formatting and should accept one number for the round.
# If the file has a '.gz' extension, blocks will be gzipped.
# Default: "%[1]d_block.json"
filename-pattern: "%[1]d_block.json"
# Default: "%[1]d_block.msgp.gz"
filename-pattern: "%[1]d_block.msgp.gz"

# DropCertificate is used to remove the vote certificate from the block data before writing files.
drop-certificate: true
Loading