Skip to content

Commit

Permalink
Merge pull request #183 from vulcanize/port_retry
Browse files Browse the repository at this point in the history
port retry on deadlock detection feature
  • Loading branch information
i-norden authored Dec 29, 2021
2 parents 68f2390 + 488309b commit 801a25e
Showing 1 changed file with 28 additions and 6 deletions.
34 changes: 28 additions & 6 deletions statediff/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"math/big"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -46,8 +47,12 @@ import (
types2 "github.com/ethereum/go-ethereum/statediff/types"
)

const chainEventChanSize = 20000
const genesisBlockNumber = 0
const (
chainEventChanSize = 20000
genesisBlockNumber = 0
defaultRetryLimit = 3 // default retry limit once deadlock is detected.
deadlockDetected = "deadlock detected" // 40P01 https://www.postgresql.org/docs/current/errcodes-appendix.html
)

var writeLoopParams = Params{
IntermediateStateNodes: true,
Expand Down Expand Up @@ -122,6 +127,8 @@ type Service struct {
enableWriteLoop bool
// Size of the worker pool
numWorkers uint
// Number of retry for aborted transactions due to deadlock.
maxRetry uint
}

// BlockCache caches the last block for safe access from different service loops
Expand Down Expand Up @@ -174,6 +181,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
indexer: indexer,
enableWriteLoop: params.EnableWriteLoop,
numWorkers: workers,
maxRetry: defaultRetryLimit,
}
stack.RegisterLifecycle(sds)
stack.RegisterAPIs(sds.APIs())
Expand Down Expand Up @@ -266,7 +274,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) {
// For genesis block we need to return the entire state trie hence we diff it with an empty trie.
log.Info("Writing state diff", "block height", genesisBlockNumber, "worker", workerId)
err := sds.writeStateDiff(currBlock, common.Hash{}, writeLoopParams)
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, writeLoopParams)
if err != nil {
log.Error("statediff.Service.WriteLoop: processing error", "block height",
genesisBlockNumber, "error", err.Error(), "worker", workerId)
Expand Down Expand Up @@ -295,7 +303,7 @@ func (sds *Service) writeLoopWorker(params workerParams) {
}

log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams)
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams)
if err != nil {
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
continue
Expand Down Expand Up @@ -632,7 +640,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
parentRoot = parentBlock.Root()
}
return sds.writeStateDiff(currentBlock, parentRoot, params)
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
}

// WriteStateDiffFor writes a state diff for the specific blockhash directly to the database
Expand All @@ -645,7 +653,7 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro
parentBlock := sds.BlockChain.GetBlockByHash(currentBlock.ParentHash())
parentRoot = parentBlock.Root()
}
return sds.writeStateDiff(currentBlock, parentRoot, params)
return sds.writeStateDiffWithRetry(currentBlock, parentRoot, params)
}

// Writes a state diff from the current block, parent state root, and provided params
Expand Down Expand Up @@ -689,3 +697,17 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
}
return nil
}

// Wrapper function on writeStateDiff to retry when the deadlock is detected.
func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot common.Hash, params Params) error {
var err error
for i := uint(0); i < sds.maxRetry; i++ {
err = sds.writeStateDiff(block, parentRoot, params)
if err != nil && strings.Contains(err.Error(), deadlockDetected) {
// Retry only when the deadlock is detected.
continue
}
break
}
return err
}

0 comments on commit 801a25e

Please sign in to comment.