Skip to content

Commit

Permalink
Merge pull request #202 from vulcanize/v1.10.15-statediff-3.0.2
Browse files Browse the repository at this point in the history
V1.10.15 statediff 3.0.2
  • Loading branch information
i-norden authored Feb 16, 2022
2 parents f78f9be + db96f3d commit 8f3a573
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 22 deletions.
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
VersionMajor = 1 // Major version component of the current release
VersionMinor = 10 // Minor version component of the current release
VersionPatch = 15 // Patch version component of the current release
VersionMeta = "statediff-3.0.1" // Version metadata to append to the version string
VersionMeta = "statediff-3.0.2" // Version metadata to append to the version string
)

// Version holds the textual version string.
Expand Down
5 changes: 3 additions & 2 deletions statediff/indexer/database/sql/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (
"math/big"
"time"

ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld"

"github.com/ipfs/go-cid"
node "github.com/ipfs/go-ipld-format"
"github.com/multiformats/go-multihash"
Expand All @@ -39,6 +37,7 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
Expand Down Expand Up @@ -155,9 +154,11 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
close(self.iplds)
}()
if p := recover(); p != nil {
log.Info("panic detected before tx submission, rolling back the tx", "panic", p)
rollback(sdi.ctx, tx)
panic(p)
} else if err != nil {
log.Info("error detected before tx submission, rolling back the tx", "error", err)
rollback(sdi.ctx, tx)
} else {
tDiff := time.Since(t)
Expand Down
16 changes: 14 additions & 2 deletions statediff/indexer/database/sql/indexer_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"os"
"testing"

"github.com/ethereum/go-ethereum/rlp"

"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
Expand Down Expand Up @@ -141,3 +141,15 @@ func expectTrue(t *testing.T, value bool) {
t.Fatalf("Assertion failed")
}
}

func checkTxClosure(t *testing.T, idle, inUse, open int64) {
require.Equal(t, idle, db.Stats().Idle())
require.Equal(t, inUse, db.Stats().InUse())
require.Equal(t, open, db.Stats().Open())
}

func tearDown(t *testing.T) {
sql.TearDownDB(t, db)
err := ind.Close()
require.NoError(t, err)
}
3 changes: 3 additions & 0 deletions statediff/indexer/database/sql/mainnet_tests/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func setup(t *testing.T, testBlock *types.Block, testReceipts types.Receipts) {
}

func tearDown(t *testing.T) {
require.Equal(t, int64(0), db.Stats().Idle())
require.Equal(t, int64(0), db.Stats().InUse())
require.Equal(t, int64(0), db.Stats().Open())
sql.TearDownDB(t, db)
err = ind.Close()
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion statediff/indexer/database/sql/pgx_indexer_legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ func setupLegacyPGX(t *testing.T) {
test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, legacyData.BlockNumber.Uint64())
}

func TestPGXIndexerLegacy(t *testing.T) {
func TestLegacyPGXIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs", func(t *testing.T) {
setupLegacyPGX(t)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
Expand Down
6 changes: 6 additions & 0 deletions statediff/indexer/database/sql/pgx_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func TestPGXIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
Expand Down Expand Up @@ -111,6 +112,7 @@ func TestPGXIndexer(t *testing.T) {
t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)
// check that txs were properly indexed and published
trxs := make([]string, 0)
pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash)
Expand Down Expand Up @@ -237,6 +239,7 @@ func TestPGXIndexer(t *testing.T) {
t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)

rcts := make([]string, 0)
rctsPgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
Expand Down Expand Up @@ -294,6 +297,7 @@ func TestPGXIndexer(t *testing.T) {
t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)

// check receipts were properly indexed and published
rcts := make([]string, 0)
Expand Down Expand Up @@ -395,6 +399,7 @@ func TestPGXIndexer(t *testing.T) {
t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)
// check that state nodes were properly indexed and published
stateNodes := make([]models.StateNodeModel, 0)
pgStr := `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
Expand Down Expand Up @@ -484,6 +489,7 @@ func TestPGXIndexer(t *testing.T) {
t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)
// check that storage nodes were properly indexed
storageNodes := make([]models.StorageNodeWithStateKeyModel, 0)
pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path
Expand Down
7 changes: 2 additions & 5 deletions statediff/indexer/database/sql/postgres/sqlx.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,10 @@ func NewSQLXDriver(ctx context.Context, config Config, node node.Info) (*SQLXDri
if config.MaxConns > 0 {
db.SetMaxOpenConns(config.MaxConns)
}
if config.MaxIdle > 0 {
db.SetMaxIdleConns(config.MaxIdle)
}
if config.MaxConnLifetime > 0 {
lifetime := config.MaxConnLifetime
db.SetConnMaxLifetime(lifetime)
db.SetConnMaxLifetime(config.MaxConnLifetime)
}
db.SetMaxIdleConns(config.MaxIdle)
driver := &SQLXDriver{ctx: ctx, db: db, nodeInfo: node}
if err := driver.createNode(); err != nil {
return &SQLXDriver{}, ErrUnableToSetNode(err)
Expand Down
4 changes: 3 additions & 1 deletion statediff/indexer/database/sql/postgres/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (

// SetupSQLXDB is used to setup a sqlx db for tests
func SetupSQLXDB() (sql.Database, error) {
driver, err := NewSQLXDriver(context.Background(), DefaultConfig, node.Info{})
conf := DefaultConfig
conf.MaxIdle = 0
driver, err := NewSQLXDriver(context.Background(), conf, node.Info{})
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion statediff/indexer/database/sql/sqlx_indexer_legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,11 @@ func setupLegacySQLX(t *testing.T) {
test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, legacyData.BlockNumber.Uint64())
}

func TestSQLXIndexerLegacy(t *testing.T) {
func TestLegacySQLXIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs", func(t *testing.T) {
setupLegacySQLX(t)
defer tearDown(t)
defer checkTxClosure(t, 0, 0, 0)
pgStr := `SELECT cid, td, reward, block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
Expand Down
13 changes: 6 additions & 7 deletions statediff/indexer/database/sql/sqlx_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,11 @@ func setupSQLX(t *testing.T) {
test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, mocks.BlockNumber.Uint64())
}

func tearDown(t *testing.T) {
sql.TearDownDB(t, db)
if err := ind.Close(); err != nil {
t.Fatal(err)
}
}

func TestSQLXIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
setupSQLX(t)
defer tearDown(t)
defer checkTxClosure(t, 0, 0, 0)
pgStr := `SELECT cid, td, reward, block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
Expand Down Expand Up @@ -114,6 +108,7 @@ func TestSQLXIndexer(t *testing.T) {
t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) {
setupSQLX(t)
defer tearDown(t)
defer checkTxClosure(t, 0, 0, 0)
// check that txs were properly indexed and published
trxs := make([]string, 0)
pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash)
Expand Down Expand Up @@ -240,6 +235,7 @@ func TestSQLXIndexer(t *testing.T) {
t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) {
setupSQLX(t)
defer tearDown(t)
defer checkTxClosure(t, 0, 0, 0)

rcts := make([]string, 0)
rctsPgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
Expand Down Expand Up @@ -295,6 +291,7 @@ func TestSQLXIndexer(t *testing.T) {
t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) {
setupSQLX(t)
defer tearDown(t)
defer checkTxClosure(t, 0, 0, 0)

// check receipts were properly indexed and published
rcts := make([]string, 0)
Expand Down Expand Up @@ -395,6 +392,7 @@ func TestSQLXIndexer(t *testing.T) {
t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) {
setupSQLX(t)
defer tearDown(t)
defer checkTxClosure(t, 0, 0, 0)
// check that state nodes were properly indexed and published
stateNodes := make([]models.StateNodeModel, 0)
pgStr := `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
Expand Down Expand Up @@ -484,6 +482,7 @@ func TestSQLXIndexer(t *testing.T) {
t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) {
setupSQLX(t)
defer tearDown(t)
defer checkTxClosure(t, 0, 0, 0)
// check that storage nodes were properly indexed
storageNodes := make([]models.StorageNodeWithStateKeyModel, 0)
pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path
Expand Down
6 changes: 4 additions & 2 deletions statediff/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,11 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie"

ind "github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
types2 "github.com/ethereum/go-ethereum/statediff/types"
"github.com/ethereum/go-ethereum/trie"
)

const (
Expand Down Expand Up @@ -705,6 +704,9 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo
err = sds.writeStateDiff(block, parentRoot, params)
if err != nil && strings.Contains(err.Error(), deadlockDetected) {
// Retry only when the deadlock is detected.
if i != sds.maxRetry {
log.Info("dead lock detected while writing statediff", "err", err, "retry number", i)
}
continue
}
break
Expand Down

0 comments on commit 8f3a573

Please sign in to comment.