Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conduit: filereader / filewriter plugins and compatibility changes. #1253

Merged
merged 39 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
3e9cc08
Write block and state delta to separate files.
winder Sep 29, 2022
189c216
Include certificate in block
winder Sep 29, 2022
33d3dbc
Simplify.
winder Sep 29, 2022
1266432
Revert new config
winder Sep 29, 2022
f70cc78
Save genesis file.
winder Sep 29, 2022
e59c02a
Allow providing the filename pattern that blocks are written to.
winder Sep 30, 2022
965f562
Make encode/decode utilities.
winder Sep 30, 2022
06d4a6b
Fix typos
winder Sep 30, 2022
4026dc6
Add file reader.
winder Sep 30, 2022
2cb9343
No special case for retry duration of 0.
winder Sep 30, 2022
390da37
Option to drop the certificate before writing.
winder Sep 30, 2022
99f6a73
Fix test.
winder Sep 30, 2022
fae7294
Add gzip option to encode/decode.
winder Sep 30, 2022
c6ea439
Fix gzip header.
winder Oct 1, 2022
5a613fd
Close the reader.
winder Oct 1, 2022
74b4736
Use the EncodeToFile function in the exporter.
winder Oct 1, 2022
8c7883f
Allow empty delta on round 0
winder Oct 1, 2022
395489a
Add some logging, create localledger dir if needed.
winder Oct 1, 2022
e89a207
Add missing error message and some TODOs for useful configs that shou…
winder Oct 2, 2022
ca0f4fc
This is much more performant for large blocks.
winder Oct 3, 2022
24d537a
Add some extra timing information.
winder Oct 3, 2022
6922b12
unmarshalConfig change.
winder Oct 3, 2022
6dd292f
Fix import.
winder Oct 3, 2022
f318282
Fix self assign.
winder Oct 3, 2022
0fe7bd7
Merge branch 'conduit' into will/filewriter-statedelta
winder Oct 12, 2022
379e949
Merge branch 'conduit' into will/filewriter-statedelta
winder Oct 13, 2022
de99a0d
Fix merge error.
winder Oct 13, 2022
fdb3191
Fix make integration
winder Oct 13, 2022
d0e0ec1
Fix test.
winder Oct 13, 2022
f8ccabc
Merge branch 'conduit' into will/filewriter-statedelta
winder Oct 20, 2022
9b584b5
Merge branch 'conduit' into will/filewriter-statedelta
winder Oct 25, 2022
c16e353
Merge branch 'conduit' into will/filewriter-statedelta
winder Oct 25, 2022
75e011b
Fix block_processor parameters.
winder Oct 25, 2022
559f542
Fix indexer-data-dir
winder Oct 26, 2022
10fa5ac
Temporary - extra logging.
winder Oct 26, 2022
107b81b
Revert "Temporary - extra logging."
winder Oct 26, 2022
4706d90
More data dir fixes.
winder Oct 26, 2022
77002a6
misc
winder Oct 26, 2022
eb6631e
Remove a TODO
winder Oct 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions data/block_export_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ type InitProvider interface {
type BlockData struct {

// BlockHeader is the immutable header from the block
BlockHeader bookkeeping.BlockHeader
BlockHeader bookkeeping.BlockHeader `json:"block,omitempty"`

// Payset is the set of data the block is carrying--can be modified as it is processed
Payset []transactions.SignedTxnInBlock
Payset []transactions.SignedTxnInBlock `json:"payset,omitempty"`

// Delta contains a list of account changes resulting from the block. Processor plugins may have modify this data.
Delta *ledgercore.StateDelta
Delta *ledgercore.StateDelta `json:"delta,omitempty"`

// Certificate contains voting data that certifies the block. The certificate is non deterministic, a node stops collecting votes once the voting threshold is reached.
Certificate *agreement.Certificate
Certificate *agreement.Certificate `json:"cert,omitempty"`
}

// MakeBlockDataFromValidatedBlock makes BlockData from agreement.ValidatedBlock
Expand Down
68 changes: 41 additions & 27 deletions exporters/filewriter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,23 @@ import (

"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/bookkeeping"

"github.com/algorand/indexer/data"
"github.com/algorand/indexer/exporters"
"github.com/algorand/indexer/plugins"
"github.com/algorand/indexer/util"
)

const exporterName = "file_writer"
const (
exporterName = "file_writer"
// FilePattern is used to name the output files.
FilePattern = "%[1]d_block.json"
)

type fileExporter struct {
round uint64
blockMetadataFile string
blockMetadata BlockMetaData
cfg ExporterConfig
cfg Config
logger *logrus.Logger
}

Expand Down Expand Up @@ -58,11 +62,16 @@ func (exp *fileExporter) Metadata() exporters.ExporterMetadata {

func (exp *fileExporter) Init(_ context.Context, cfg plugins.PluginConfig, logger *logrus.Logger) error {
exp.logger = logger
if err := exp.unmarhshalConfig(string(cfg)); err != nil {
var err error
exp.cfg, err = unmarshalConfig(string(cfg))
if err != nil {
return fmt.Errorf("connect failure in unmarshalConfig: %w", err)
}
if exp.cfg.FilenamePattern == "" {
exp.cfg.FilenamePattern = FilePattern
}
// create block directory
err := os.Mkdir(exp.cfg.BlocksDir, 0755)
err = os.Mkdir(exp.cfg.BlocksDir, 0755)
if err != nil && errors.Is(err, os.ErrExist) {
} else if err != nil {
return fmt.Errorf("Init() error: %w", err)
Expand Down Expand Up @@ -113,12 +122,7 @@ func (exp *fileExporter) Config() plugins.PluginConfig {

func (exp *fileExporter) encodeMetadataToFile() error {
tempFilename := fmt.Sprintf("%s.temp", exp.blockMetadataFile)
file, err := os.Create(tempFilename)
if err != nil {
return fmt.Errorf("encodeMetadataToFile(): failed to create temp metadata file: %w", err)
}
defer file.Close()
err = json.NewEncoder(file).Encode(exp.blockMetadata)
err := util.EncodeToFile(tempFilename, exp.blockMetadata, true)
if err != nil {
return fmt.Errorf("encodeMetadataToFile(): failed to write temp metadata: %w", err)
}
Expand All @@ -143,21 +147,24 @@ func (exp *fileExporter) Receive(exportData data.BlockData) error {
if exportData.Round() != exp.round {
return fmt.Errorf("Receive(): wrong block: received round %d, expected round %d", exportData.Round(), exp.round)
}
//write block to file
blockFile := path.Join(exp.cfg.BlocksDir, fmt.Sprintf("block_%d.json", exportData.Round()))
file, err := os.OpenFile(blockFile, os.O_WRONLY|os.O_CREATE, 0755)
if err != nil {
return fmt.Errorf("Receive(): error opening file %s: %w", blockFile, err)
}
defer file.Close()
err = json.NewEncoder(file).Encode(exportData)
if err != nil {
return fmt.Errorf("Receive(): error encoding exportData: %w", err)

// write block to file
{
if exp.cfg.DropCertificate {
exportData.Certificate = nil
}

blockFile := path.Join(exp.cfg.BlocksDir, fmt.Sprintf(exp.cfg.FilenamePattern, exportData.Round()))
err := util.EncodeToFile(blockFile, exportData, true)
if err != nil {
return fmt.Errorf("Receive(): failed to write file %s: %w", blockFile, err)
}
exp.logger.Infof("Wrote block %d to %s", exportData.Round(), blockFile)
}
exp.logger.Infof("Added block %d", exportData.Round())

exp.round++
exp.blockMetadata.NextRound = exp.round
err = exp.encodeMetadataToFile()
err := exp.encodeMetadataToFile()
if err != nil {
return fmt.Errorf("Receive() metadata encoding err %w", err)
}
Expand All @@ -168,12 +175,17 @@ func (exp *fileExporter) HandleGenesis(genesis bookkeeping.Genesis) error {
// check genesis hash
gh := crypto.HashObj(genesis).String()
if exp.blockMetadata.GenesisHash == "" {
// First time initialization.
exp.blockMetadata.GenesisHash = gh
exp.blockMetadata.Network = string(genesis.Network)
err := exp.encodeMetadataToFile()
if err != nil {
if err := exp.encodeMetadataToFile(); err != nil {
return fmt.Errorf("HandleGenesis() metadata encoding err %w", err)
}

genesisFilename := path.Join(exp.cfg.BlocksDir, "genesis.json")
if err := util.EncodeToFile(genesisFilename, genesis, true); err != nil {
return fmt.Errorf("HandleGenesis() failed to serialize genesis file: %w", err)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File reader needs the genesis file.

} else {
if exp.blockMetadata.GenesisHash != gh {
return fmt.Errorf("HandleGenesis() genesis hash in metadata does not match expected value: actual %s, expected %s", gh, exp.blockMetadata.GenesisHash)
Expand All @@ -186,8 +198,10 @@ func (exp *fileExporter) Round() uint64 {
return exp.round
}

func (exp *fileExporter) unmarhshalConfig(cfg string) error {
return yaml.Unmarshal([]byte(cfg), &exp.cfg)
func unmarshalConfig(cfg string) (Config, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Contender for generics right here, we have the same function (with the same typo) in lots of places. I turned it into a pure function in a few places.

var config Config
err := yaml.Unmarshal([]byte(cfg), &config)
return config, err
}

func init() {
Expand Down
22 changes: 17 additions & 5 deletions exporters/filewriter/file_exporter_config.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
package filewriter

// ExporterConfig specific to the file exporter
type ExporterConfig struct {
// full file path to a directory
// where the block data should be stored.
// Create if directory doesn't exist
// Config specific to the file exporter
type Config struct {
// BlocksDir is the path to a directory where block data should be stored.
// The directory is created if it doesn't exist.
BlocksDir string `yaml:"block-dir"`
// FilenamePattern is the format used to write block files. It uses go
// string formatting and should accept one number for the round.
// If the file has a '.gz' extension, blocks will be gzipped.
// Default: "%[1]d_block.json"
FilenamePattern string `yaml:"filename-pattern"`
// DropCertificate is used to remove the vote certificate from the block data before writing files.
DropCertificate bool `yaml:"drop-certificate"`

// TODO: compression level - Default, Fastest, Best compression, etc

// TODO: How to avoid having millions of files in a directory?
// Write batches of blocks to a single file?
// Tree of directories
}
Loading