Skip to content

Commit

Permalink
Conduit: Postgres exporter initial implementation (#1114)
Browse files Browse the repository at this point in the history
* Local Ledger (#1011)

* integrate block processor

* Local Ledger Deployment (#1013)

* add simple local ledger migration

* add deleted opts

* fast catchup (#1023)

* add fast catchup

* Localledger merge (#1036)

* return empty lists from fetchApplications and fetchAppLocalStates (#1010)

* Update model to converge with algod (#1005)

* New Feature: Adds Data Directory Support (#1012)

- Updates the go-algorand submodule hash to point to rel/beta
- Moves the cpu profiling file, pid file and indexer configuration file
  to be options of only the daemon sub-command
- Changes os.Exit() to be a panic with a special handler.  This is so
  that defer's are handled instead of being ignored.
- Detects auto-loading configuration files in the data directory and
  issues errors if equivalent command line arguments are supplied.
- Updates the README with instructions on how to use the auto-loading
  configuration files and the data directory.

* Update mockery version

Co-authored-by: erer1243 <[email protected]>
Co-authored-by: AlgoStephenAkiki <[email protected]>

* recovery scenario (#1024)

* handle ledger recovery scenario

* refactor create genesis block (#1026)

* refactor create genesis block

* Adds Local Ledger Readme (#1035)

* Adds Local Ledger Readme

Resolves #4109

Starts Readme docs

* Update docs/LocalLedger.md

Co-authored-by: Will Winder <[email protected]>

* Update docs/LocalLedger.md

Co-authored-by: Will Winder <[email protected]>

* Update docs/LocalLedger.md

Co-authored-by: Will Winder <[email protected]>

* Removed troubleshooting section

Co-authored-by: Will Winder <[email protected]>

* update ledger file path and migration (#1042)

* LocalLedger Refactoring + Catchpoint Service (#1049)

Part 1

    cleanup genesis file access.
    put node catchup into a function that can be swapped out with the catchup service.
    pass the indexer logger into the block processor.
    move open ledger into a util function, and move the initial state util function into a new ledger util file.
    add initial catchupservice implementation.
    move ledger init from daemon.go to constructor.
    Merge multiple read genesis functions.

Part 2

    Merge local_ledger migration package into blockprocessor.
    Rename Migration to Initialize
    Use logger in catchup service catchup

Part 3

    Update submodule and use NewWrappedLogger.
    Make util.CreateInitState private

* build: merge develop into localledger/integration (#1062)

* Ledger init status (#1058)

* Generate an error if the catchpoint is not valid for initialization. (#1075)

* Use main logger in handler and fetcher. (#1077)

* Switch from fullNode catchup to catchpoint catchup service. (#1076)

* Refactor daemon, add more tests (#1039)

Refactors daemon cmd into separate, testable pieces.

* Merge develop into localledger/integration (#1083)

* Misc Local Ledger cleanup (#1086)

* Update processor/blockprocessor/initialize.go

Co-authored-by: Zeph Grunschlag <[email protected]>

* commit

* fix function call args

* RFC-0001: Rfc 0001 impl (#1069)

Adds an Exporter interface and a noop exporter implementation with factory methods for construction

* Fix test errors

* Add/fix tests

* Add postgresql_exporter tests

* Update config loading

* Change BlockExportData to pointers

* Move and rename ExportData

* Add Empty func to BlockData

* Add comment

Co-authored-by: shiqizng <[email protected]>
Co-authored-by: [email protected] <[email protected]>
Co-authored-by: erer1243 <[email protected]>
Co-authored-by: AlgoStephenAkiki <[email protected]>
Co-authored-by: Will Winder <[email protected]>
Co-authored-by: Zeph Grunschlag <[email protected]>
  • Loading branch information
7 people authored Jul 21, 2022
1 parent 54d39d7 commit 3c8c150
Show file tree
Hide file tree
Showing 15 changed files with 488 additions and 65 deletions.
39 changes: 39 additions & 0 deletions data/block_export_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package data

import (
"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/ledgercore"
)

// RoundProvider is the interface which all data types sent to Exporters should implement
type RoundProvider interface {
Round() uint64
Empty() bool
}

// BlockData is provided to the Exporter on each round.
type BlockData struct {
// Block is the block data written to the blockchain.
Block *bookkeeping.Block

// Delta contains a list of account changes resulting from the block. Processor plugins may have modify this data.
Delta *ledgercore.StateDelta

// Certificate contains voting data that certifies the block. The certificate is non deterministic, a node stops collecting votes once the voting threshold is reached.
Certificate *agreement.Certificate
}

// Round returns the round to which the BlockData corresponds
func (blkData BlockData) Round() uint64 {
return uint64(blkData.Block.Round())
}

// Empty returns whether the Block contains Txns. Assumes the Block is never nil
func (blkData BlockData) Empty() (bool, error) {
payset, err := blkData.Block.DecodePaysetFlat()
if err != nil {
return true, err
}
return len(payset) > 0, nil
}
43 changes: 6 additions & 37 deletions exporters/exporter.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,12 @@
package exporters

import (
"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/indexer/data"
"github.com/algorand/indexer/plugins"
"github.com/sirupsen/logrus"
)

// ExporterConfig is a generic string which can be deserialized by each individual Exporter.
type ExporterConfig string

// ExportData is the interface which all data types sent to Exporters should implement
type ExportData interface {
Round() uint64
}

// BlockExportData is provided to the Exporter on each round.
type BlockExportData struct {
// Block is the block data written to the blockchain.
Block bookkeeping.Block

// Delta contains a list of account changes resulting from the block. Processor plugins may have modify this data.
Delta ledgercore.StateDelta

// Certificate contains voting data that certifies the block. The certificate is non deterministic, a node stops collecting votes once the voting threshold is reached.
Certificate agreement.Certificate
}

// Round returns the round to which the BlockExportData corresponds
func (blkData *BlockExportData) Round() uint64 {
return uint64(blkData.Block.Round())
}

// ExporterMetadata contains fields relevant to identification and description of plugins.
type ExporterMetadata struct {
Name string
Description string
Deprecated bool
}

// Exporter defines the interface for plugins
type Exporter interface {
// Metadata associated with each Exporter.
Expand All @@ -47,11 +16,11 @@ type Exporter interface {
// Typically used for things like initializing network connections.
// The ExporterConfig passed to Connect will contain the Unmarhsalled config file specific to this plugin.
// Should return an error if it fails--this will result in the Indexer process terminating.
Connect(cfg ExporterConfig) error
Connect(cfg plugins.PluginConfig, logger *logrus.Logger) error

// Config returns the configuration options used to create an Exporter.
// Initialized during Connect, it should return nil until the Exporter has been Connected.
Config() ExporterConfig
Config() plugins.PluginConfig

// Disconnect will be called during termination of the Indexer process.
// There is no guarantee that plugin lifecycle hooks will be invoked in any specific order in relation to one another.
Expand All @@ -60,7 +29,7 @@ type Exporter interface {

// Receive is called for each block to be processed by the exporter.
// Should return an error on failure--retries are configurable.
Receive(exportData ExportData) error
Receive(exportData data.BlockData) error

// HandleGenesis is an Exporter's opportunity to do initial validation and handling of the Genesis block.
// If validation (such as a check to ensure `genesis` matches a previously stored genesis block) or handling fails,
Expand Down
7 changes: 5 additions & 2 deletions exporters/exporter_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package exporters

import (
"fmt"
"github.com/algorand/indexer/plugins"
"github.com/sirupsen/logrus"
)

// ExporterConstructor must be implemented by each Exporter.
Expand All @@ -26,14 +28,15 @@ func RegisterExporter(name string, constructor ExporterConstructor) {
// ExporterByName is used to construct an Exporter object by name.
// Returns an Exporter object, an availability channel that closes when the database
// becomes available, and an error object.
func ExporterByName(name string, cfg ExporterConfig) (Exporter, error) {
func ExporterByName(name string, dataDir string, logger *logrus.Logger) (Exporter, error) {
var constructor ExporterConstructor
var ok bool
if constructor, ok = exporterImpls[name]; !ok {
return nil, fmt.Errorf("no Exporter Constructor for %s", name)
}
val := constructor.New()
if err := val.Connect(cfg); err != nil {
cfg := plugins.LoadConfig(logger, dataDir, val.Metadata())
if err := val.Connect(cfg, logger); err != nil {
return nil, err
}
return val, nil
Expand Down
29 changes: 22 additions & 7 deletions exporters/exporter_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,36 @@ package exporters

import (
"fmt"
"github.com/algorand/indexer/plugins"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"testing"
)

var logger *logrus.Logger

func init() {
logger, _ = test.NewNullLogger()
}

type mockExporter struct {
mock.Mock
Exporter
}

func (m *mockExporter) Connect(config ExporterConfig) error {
args := m.Called(config)
func (m *mockExporter) Connect(config plugins.PluginConfig, logger *logrus.Logger) error {
args := m.Called(config, logger)
return args.Error(0)
}

func (m *mockExporter) Metadata() ExporterMetadata {
return ExporterMetadata{
ExpName: "foobar",
}
}

type mockExporterConstructor struct {
me *mockExporter
}
Expand All @@ -27,25 +42,25 @@ func (c *mockExporterConstructor) New() Exporter {

func TestExporterByNameSuccess(t *testing.T) {
me := mockExporter{}
me.On("Connect", mock.Anything).Return(nil)
me.On("Connect", mock.Anything, mock.Anything).Return(nil)
RegisterExporter("foobar", &mockExporterConstructor{&me})

exp, err := ExporterByName("foobar", "")
exp, err := ExporterByName("foobar", "", logger)
assert.NoError(t, err)
assert.Implements(t, (*Exporter)(nil), exp)
}

func TestExporterByNameNotFound(t *testing.T) {
_, err := ExporterByName("barfoo", "")
_, err := ExporterByName("barfoo", "", logger)
expectedErr := "no Exporter Constructor for barfoo"
assert.EqualError(t, err, expectedErr)
}

func TestExporterByNameConnectFailure(t *testing.T) {
me := mockExporter{}
expectedErr := fmt.Errorf("connect failure")
me.On("Connect", mock.Anything).Return(expectedErr)
me.On("Connect", mock.Anything, mock.Anything).Return(expectedErr)
RegisterExporter("baz", &mockExporterConstructor{&me})
_, err := ExporterByName("baz", "")
_, err := ExporterByName("baz", "", logger)
assert.EqualError(t, err, expectedErr.Error())
}
32 changes: 32 additions & 0 deletions exporters/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package exporters

import (
"github.com/algorand/indexer/plugins"
)

// ExporterMetadata returns fields relevant to identification and description of plugins.
type ExporterMetadata struct {
ExpName string
ExpDescription string
ExpDeprecated bool
}

// Type implements the Plugin.Type interface
func (meta ExporterMetadata) Type() plugins.PluginType {
return plugins.Exporter
}

// Name implements the Plugin.Name interface
func (meta ExporterMetadata) Name() string {
return meta.ExpName
}

// Description provides a brief description of the purpose of the Exporter
func (meta *ExporterMetadata) Description() string {
return meta.ExpDescription
}

// Deprecated is used to warn users against deprecated plugins
func (meta *ExporterMetadata) Deprecated() bool {
return meta.ExpDeprecated
}
19 changes: 11 additions & 8 deletions exporters/noop/noop_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package noop

import (
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/indexer/data"
"github.com/algorand/indexer/exporters"
"github.com/algorand/indexer/plugins"
"github.com/sirupsen/logrus"
)

// `noopExporter`s will function without ever erroring. This means they will also process out of order blocks
Expand All @@ -11,13 +14,13 @@ import (
// The `noopExporter` will maintain `Round` state according to the round of the last block it processed.
type noopExporter struct {
round uint64
cfg exporters.ExporterConfig
cfg plugins.PluginConfig
}

var noopExporterMetadata exporters.ExporterMetadata = exporters.ExporterMetadata{
Name: "noop",
Description: "noop exporter",
Deprecated: false,
ExpName: "noop",
ExpDescription: "noop exporter",
ExpDeprecated: false,
}

// Constructor is the ExporterConstructor implementation for the "noop" exporter
Expand All @@ -35,19 +38,19 @@ func (exp *noopExporter) Metadata() exporters.ExporterMetadata {
return noopExporterMetadata
}

func (exp *noopExporter) Connect(_ exporters.ExporterConfig) error {
func (exp *noopExporter) Connect(_ plugins.PluginConfig, _ *logrus.Logger) error {
return nil
}

func (exp *noopExporter) Config() exporters.ExporterConfig {
func (exp *noopExporter) Config() plugins.PluginConfig {
return exp.cfg
}

func (exp *noopExporter) Disconnect() error {
return nil
}

func (exp *noopExporter) Receive(exportData exporters.ExportData) error {
func (exp *noopExporter) Receive(exportData data.BlockData) error {
exp.round = exportData.Round() + 1
return nil
}
Expand All @@ -61,5 +64,5 @@ func (exp *noopExporter) Round() uint64 {
}

func init() {
exporters.RegisterExporter(noopExporterMetadata.Name, &Constructor{})
exporters.RegisterExporter(noopExporterMetadata.ExpName, &Constructor{})
}
23 changes: 14 additions & 9 deletions exporters/noop/noop_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package noop

import (
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/indexer/data"
"github.com/algorand/indexer/exporters"
"github.com/algorand/indexer/plugins"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"testing"
)
Expand All @@ -12,25 +15,27 @@ var nc = &Constructor{}
var ne = nc.New()

func TestExporterByName(t *testing.T) {
exporters.RegisterExporter(noopExporterMetadata.Name, nc)
ne, err := exporters.ExporterByName(noopExporterMetadata.Name, "")
logger, _ := test.NewNullLogger()
exporters.RegisterExporter(noopExporterMetadata.ExpName, nc)
ne, err := exporters.ExporterByName(noopExporterMetadata.ExpName, "", logger)
assert.NoError(t, err)
assert.Implements(t, (*exporters.Exporter)(nil), ne)
}

func TestExporterMetadata(t *testing.T) {
meta := ne.Metadata()
assert.Equal(t, noopExporterMetadata.Name, meta.Name)
assert.Equal(t, noopExporterMetadata.Description, meta.Description)
assert.Equal(t, noopExporterMetadata.Deprecated, meta.Deprecated)
assert.Equal(t, plugins.PluginType(plugins.Exporter), meta.Type())
assert.Equal(t, noopExporterMetadata.ExpName, meta.Name())
assert.Equal(t, noopExporterMetadata.ExpDescription, meta.Description())
assert.Equal(t, noopExporterMetadata.ExpDeprecated, meta.Deprecated())
}

func TestExporterConnect(t *testing.T) {
assert.NoError(t, ne.Connect(""))
assert.NoError(t, ne.Connect("", nil))
}

func TestExporterConfig(t *testing.T) {
assert.Equal(t, exporters.ExporterConfig(""), ne.Config())
assert.Equal(t, plugins.PluginConfig(""), ne.Config())
}

func TestExporterDisconnect(t *testing.T) {
Expand All @@ -42,8 +47,8 @@ func TestExporterHandleGenesis(t *testing.T) {
}

func TestExporterRoundReceive(t *testing.T) {
eData := &exporters.BlockExportData{
Block: bookkeeping.Block{
eData := data.BlockData{
Block: &bookkeeping.Block{
BlockHeader: bookkeeping.BlockHeader{
Round: 5,
},
Expand Down
Loading

0 comments on commit 3c8c150

Please sign in to comment.