Skip to content

Commit

Permalink
Conduit: filereader / filewriter plugins and compatibility changes. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
winder authored Oct 26, 2022
1 parent 99a848c commit 997b8c7
Show file tree
Hide file tree
Showing 27 changed files with 686 additions and 86 deletions.
10 changes: 5 additions & 5 deletions cmd/algorand-indexer/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,11 @@ func makeConduitConfig(dCfg *daemonConfig) conduit.PipelineConfig {
{
Name: "block_evaluator",
Config: map[string]interface{}{
"catchpoint": dCfg.catchpoint,
"indexer-data-dir": dCfg.indexerDataDir,
"algod-data-dir": dCfg.algodDataDir,
"algod-token": dCfg.algodToken,
"algod-addr": dCfg.algodAddr,
"catchpoint": dCfg.catchpoint,
"data-dir": dCfg.indexerDataDir,
"algod-data-dir": dCfg.algodDataDir,
"algod-token": dCfg.algodToken,
"algod-addr": dCfg.algodAddr,
},
},
},
Expand Down
8 changes: 4 additions & 4 deletions data/block_export_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ type InitProvider interface {
type BlockData struct {

// BlockHeader is the immutable header from the block
BlockHeader bookkeeping.BlockHeader
BlockHeader bookkeeping.BlockHeader `json:"block,omitempty"`

// Payset is the set of data the block is carrying--can be modified as it is processed
Payset []transactions.SignedTxnInBlock
Payset []transactions.SignedTxnInBlock `json:"payset,omitempty"`

// Delta contains a list of account changes resulting from the block. Processor plugins may have modify this data.
Delta *ledgercore.StateDelta
Delta *ledgercore.StateDelta `json:"delta,omitempty"`

// Certificate contains voting data that certifies the block. The certificate is non deterministic, a node stops collecting votes once the voting threshold is reached.
Certificate *agreement.Certificate
Certificate *agreement.Certificate `json:"cert,omitempty"`
}

// MakeBlockDataFromValidatedBlock makes BlockData from agreement.ValidatedBlock
Expand Down
2 changes: 1 addition & 1 deletion docs/conduit/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ exporter:

## Plugin configuration

See [plugins/home.md](plugins/home.md) for details.
See [plugin list](plugins/home.md) for details.
Each plugin is identified by a `name`, and provided the `config` during initialization.
2 changes: 1 addition & 1 deletion docs/conduit/plugins/block_evaluator.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ For example, if you want to get **Mainnet** round `22212765`, you would refer to
processors:
- name: block_evaluator
config:
- indexer-data-dir: "location where the local ledger will be stored."
- data-dir: "location where the local ledger will be stored."
algod-data-dir: "local algod data directory"
algod-addr: "algod URL"
algod-token: "algod token"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ Data is written to one file per block in JSON format.
# Config
```yaml
exporter:
- name: filewriter
- name: file_writer
config:
- block-dir: "path to write block data"
# override the filename pattern.
filename-pattern: "%[1]d_block.json"
# exclude the vote certificate from the file.
drop-certificate: false
```
3 changes: 2 additions & 1 deletion docs/conduit/plugins/home.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ Each plugin is identified by a `name`, and provided the `config` during initiali
## Importers

* [algod](algod.md)
* [file_reader](file_reader.md)

## Processors
* [block_evaluator](block_evaluator.md)
* [filter_processor](filter_processor.md)
* [noop_processor](noop_processor.md)

## Exporters
* [filewriter](filewriter.md)
* [file_writer](file_writer.md)
* [postgresql](postgresql.md)
* [noop_exporter](noop_exporter.md)

Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def setup(self, accumulated_config):
def resolve_config_input(self):
self.config_input = {
"catchpoint": self.catchpoint,
"indexer-data-dir": self.indexer_data_dir,
"data-dir": self.indexer_data_dir,
"algod-data-dir": self.algod_data_dir,
"algod-token": self.algod_token,
"algod-addr": self.algod_net,
Expand Down
3 changes: 2 additions & 1 deletion e2e_tests/src/e2e_indexer/e2elive.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,12 @@ def main():
psqlstring = ensure_test_db(args.connection_string, args.keep_temps)
algoddir = os.path.join(tempnet, "Primary")
aiport = args.indexer_port or random.randint(4000, 30000)
indexerdir = os.path.join(tempdir, "indexer_data_dir")
cmd = [
indexer_bin,
"daemon",
"--data-dir",
tempdir,
indexerdir,
"-P",
psqlstring,
"--dev-mode",
Expand Down
52 changes: 34 additions & 18 deletions exporters/filewriter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package filewriter

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
Expand All @@ -14,13 +13,18 @@ import (
"github.com/algorand/indexer/data"
"github.com/algorand/indexer/exporters"
"github.com/algorand/indexer/plugins"
"github.com/algorand/indexer/util"
)

const exporterName = "file_writer"
const (
exporterName = "file_writer"
// FilePattern is used to name the output files.
FilePattern = "%[1]d_block.json"
)

type fileExporter struct {
round uint64
cfg ExporterConfig
cfg Config
logger *logrus.Logger
}

Expand All @@ -44,12 +48,19 @@ func (exp *fileExporter) Metadata() exporters.ExporterMetadata {

func (exp *fileExporter) Init(_ context.Context, initProvider data.InitProvider, cfg plugins.PluginConfig, logger *logrus.Logger) error {
exp.logger = logger
if err := exp.unmarhshalConfig(string(cfg)); err != nil {
var err error
exp.cfg, err = unmarshalConfig(string(cfg))
if err != nil {
return fmt.Errorf("connect failure in unmarshalConfig: %w", err)
}
if exp.cfg.FilenamePattern == "" {
exp.cfg.FilenamePattern = FilePattern
}
// create block directory
err := os.Mkdir(exp.cfg.BlocksDir, 0755)
err = os.Mkdir(exp.cfg.BlocksDir, 0755)
if err != nil && errors.Is(err, os.ErrExist) {
// Ignore mkdir if the dir exists
err = nil
} else if err != nil {
return fmt.Errorf("Init() error: %w", err)
}
Expand All @@ -74,24 +85,29 @@ func (exp *fileExporter) Receive(exportData data.BlockData) error {
if exportData.Round() != exp.round {
return fmt.Errorf("Receive(): wrong block: received round %d, expected round %d", exportData.Round(), exp.round)
}
//write block to file
blockFile := path.Join(exp.cfg.BlocksDir, fmt.Sprintf("block_%d.json", exportData.Round()))
file, err := os.OpenFile(blockFile, os.O_WRONLY|os.O_CREATE, 0755)
if err != nil {
return fmt.Errorf("Receive(): error opening file %s: %w", blockFile, err)
}
defer file.Close()
err = json.NewEncoder(file).Encode(exportData)
if err != nil {
return fmt.Errorf("Receive(): error encoding exportData: %w", err)

// write block to file
{
if exp.cfg.DropCertificate {
exportData.Certificate = nil
}

blockFile := path.Join(exp.cfg.BlocksDir, fmt.Sprintf(exp.cfg.FilenamePattern, exportData.Round()))
err := util.EncodeToFile(blockFile, exportData, true)
if err != nil {
return fmt.Errorf("Receive(): failed to write file %s: %w", blockFile, err)
}
exp.logger.Infof("Wrote block %d to %s", exportData.Round(), blockFile)
}
exp.logger.Infof("Added block %d", exportData.Round())

exp.round++
return nil
}

func (exp *fileExporter) unmarhshalConfig(cfg string) error {
return yaml.Unmarshal([]byte(cfg), &exp.cfg)
func unmarshalConfig(cfg string) (Config, error) {
var config Config
err := yaml.Unmarshal([]byte(cfg), &config)
return config, err
}

func init() {
Expand Down
22 changes: 17 additions & 5 deletions exporters/filewriter/file_exporter_config.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
package filewriter

// ExporterConfig specific to the file exporter
type ExporterConfig struct {
// full file path to a directory
// where the block data should be stored.
// Create if directory doesn't exist
// Config specific to the file exporter
type Config struct {
// BlocksDir is the path to a directory where block data should be stored.
// The directory is created if it doesn't exist.
BlocksDir string `yaml:"block-dir"`
// FilenamePattern is the format used to write block files. It uses go
// string formatting and should accept one number for the round.
// If the file has a '.gz' extension, blocks will be gzipped.
// Default: "%[1]d_block.json"
FilenamePattern string `yaml:"filename-pattern"`
// DropCertificate is used to remove the vote certificate from the block data before writing files.
DropCertificate bool `yaml:"drop-certificate"`

// TODO: compression level - Default, Fastest, Best compression, etc

// TODO: How to avoid having millions of files in a directory?
// Write batches of blocks to a single file?
// Tree of directories
}
Loading

0 comments on commit 997b8c7

Please sign in to comment.