Skip to content

Commit

Permalink
Use codec for encoding, write metadata file more often.
Browse files Browse the repository at this point in the history
  • Loading branch information
winder committed Sep 21, 2022
1 parent 7cf43e8 commit 7e8ac4d
Showing 1 changed file with 50 additions and 16 deletions.
66 changes: 50 additions & 16 deletions exporters/filewriter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"

sdkJson "github.com/algorand/go-algorand-sdk/encoding/json"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-codec/codec"

"github.com/algorand/indexer/data"
"github.com/algorand/indexer/exporters"
Expand All @@ -22,7 +24,7 @@ const exporterName = "filewriter"

type fileExporter struct {
round uint64
blockMetadataFile *os.File
blockMetadataFile string
blockMetadata BlockMetaData
cfg ExporterConfig
logger *logrus.Logger
Expand Down Expand Up @@ -68,18 +70,26 @@ func (exp *fileExporter) Init(cfg plugins.PluginConfig, logger *logrus.Logger) e
}
// initialize block metadata
file := path.Join(exp.cfg.BlocksDir, "metadata.json")
if _, err = os.Stat(file); errors.Is(err, os.ErrNotExist) {
exp.blockMetadataFile, err = os.Create(file)
exp.blockMetadataFile = file
var stat os.FileInfo
if stat, err = os.Stat(file); errors.Is(err, os.ErrNotExist) || (stat != nil && stat.Size() == 0) {
if stat != nil && stat.Size() == 0 {
// somehow it did not finish initializing
err = os.Remove(file)
if err != nil {
return fmt.Errorf("Init(): error creating file: %w", err)
}
}
err = exp.encodeMetadataToFile()
if err != nil {
return fmt.Errorf("error creating file: %w", err)
return fmt.Errorf("Init(): error creating file: %w", err)
}
exp.blockMetadata = BlockMetaData{
GenesisHash: "",
Network: "",
NextRound: exp.round,
}
} else {
exp.blockMetadataFile, err = os.OpenFile(file, os.O_WRONLY, 0775)
if err != nil {
return fmt.Errorf("error opening file: %w", err)
}
Expand All @@ -102,37 +112,57 @@ func (exp *fileExporter) Config() plugins.PluginConfig {
return plugins.PluginConfig(ret)
}

func (exp *fileExporter) Close() error {
defer exp.blockMetadataFile.Close()
exp.logger.Infof("latest round on file: %d", exp.round)
err := json.NewEncoder(exp.blockMetadataFile).Encode(exp.blockMetadata)
func (exp *fileExporter) encodeMetadataToFile() error {
tempFilename := fmt.Sprintf("%s.temp", exp.blockMetadataFile)
file, err := os.Create(tempFilename)
if err != nil {
return fmt.Errorf("encodeMetadataToFile(): failed to create temp metadata file: %w", err)
}
defer file.Close()
err = json.NewEncoder(file).Encode(exp.blockMetadata)
if err != nil {
return fmt.Errorf("Close() encoding err %w", err)
return fmt.Errorf("encodeMetadataToFile(): failed to write temp metadata: %w", err)
}

err = os.Rename(tempFilename, exp.blockMetadataFile)
if err != nil {
return fmt.Errorf("encodeMetadataToFile(): failed to replace metadata file: %w", err)
}

return nil
}

func (exp *fileExporter) Close() error {
exp.logger.Infof("latest round on file: %d", exp.round)
return nil
}

func (exp *fileExporter) Receive(exportData data.BlockData) error {
if exp.blockMetadataFile == nil {
if exp.blockMetadataFile == "" {
return fmt.Errorf("exporter not initialized")
}
if exportData.Round() != exp.round {
return fmt.Errorf("wrong block. received round %d, expected round %d", exportData.Round(), exp.round)
return fmt.Errorf("Receive(): wrong block: received round %d, expected round %d", exportData.Round(), exp.round)
}
//write block to file
blockFile := path.Join(exp.cfg.BlocksDir, fmt.Sprintf("block_%d.json", exportData.Round()))
file, err := os.OpenFile(blockFile, os.O_WRONLY|os.O_CREATE, 0755)
if err != nil {
return fmt.Errorf("error opening file %s, %w", blockFile, err)
return fmt.Errorf("Receive(): error opening file %s: %w", blockFile, err)
}
defer file.Close()
err = json.NewEncoder(file).Encode(exportData)
enc := codec.NewEncoder(file, sdkJson.CodecHandle)
err = enc.Encode(exportData)
if err != nil {
return fmt.Errorf("error encoding exportData in Receive(), %w", err)
return fmt.Errorf("Receive(): error encoding exportData: %w", err)
}
exp.logger.Infof("Added block %d", exportData.Round())
exp.round++
exp.blockMetadata.NextRound = exp.round
err = exp.encodeMetadataToFile()
if err != nil {
return fmt.Errorf("Receive() metadata encoding err %w", err)
}
return nil
}

Expand All @@ -142,9 +172,13 @@ func (exp *fileExporter) HandleGenesis(genesis bookkeeping.Genesis) error {
if exp.blockMetadata.GenesisHash == "" {
exp.blockMetadata.GenesisHash = gh
exp.blockMetadata.Network = string(genesis.Network)
err := exp.encodeMetadataToFile()
if err != nil {
return fmt.Errorf("HandleGenesis() metadata encoding err %w", err)
}
} else {
if exp.blockMetadata.GenesisHash != gh {
return fmt.Errorf("genesis hash in metadata does not match expected value. actual %s, expected %s ", gh, exp.blockMetadata.GenesisHash)
return fmt.Errorf("HandleGenesis() genesis hash in metadata does not match expected value: actual %s, expected %s", gh, exp.blockMetadata.GenesisHash)
}
}
return nil
Expand Down

0 comments on commit 7e8ac4d

Please sign in to comment.