Skip to content

Commit

Permalink
chore: Add proper termination upon critical error (ethereum-optimism#103
Browse files Browse the repository at this point in the history
)
gitferry authored Sep 21, 2023
1 parent bbe499d commit b288764
Showing 13 changed files with 418 additions and 203 deletions.
31 changes: 31 additions & 0 deletions clientcontroller/babylon.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path"
"strings"
"sync"
"time"

@@ -209,6 +210,7 @@ func (bc *BabylonController) reliablySendMsgs(msgs []sdk.Msg) (*provider.Relayer
return retry.Unrecoverable(krErr)
}
if sendMsgErr != nil {
sendMsgErr = ConvertErrType(sendMsgErr)
if IsUnrecoverable(sendMsgErr) {
bc.logger.WithFields(logrus.Fields{
"error": sendMsgErr,
@@ -241,6 +243,7 @@ func (bc *BabylonController) reliablySendMsgs(msgs []sdk.Msg) (*provider.Relayer
wg.Wait()

if callbackErr != nil {
callbackErr = ConvertErrType(callbackErr)
if IsExpected(callbackErr) {
return nil, nil
}
@@ -891,3 +894,31 @@ func (bc *BabylonController) Close() error {

return bc.provider.RPCClient.Stop()
}

func ConvertErrType(err error) error {
if err == nil {
return nil
}
switch {
case strings.Contains(err.Error(), btcstakingtypes.ErrBTCValAlreadySlashed.Error()):
return types.ErrValidatorSlashed
case strings.Contains(err.Error(), finalitytypes.ErrBlockNotFound.Error()):
return types.ErrBlockNotFound
case strings.Contains(err.Error(), finalitytypes.ErrInvalidFinalitySig.Error()):
return types.ErrInvalidFinalitySig
case strings.Contains(err.Error(), finalitytypes.ErrHeightTooHigh.Error()):
return types.ErrHeightTooHigh
case strings.Contains(err.Error(), finalitytypes.ErrNoPubRandYet.Error()):
return types.ErrNoPubRandYet
case strings.Contains(err.Error(), finalitytypes.ErrPubRandNotFound.Error()):
return types.ErrPubRandNotFound
case strings.Contains(err.Error(), finalitytypes.ErrTooFewPubRand.Error()):
return types.ErrTooFewPubRand
case strings.Contains(err.Error(), finalitytypes.ErrInvalidPubRand.Error()):
return types.ErrInvalidPubRand
case strings.Contains(err.Error(), finalitytypes.ErrDuplicatedFinalitySig.Error()):
return types.ErrDuplicatedFinalitySig
default:
return err
}
}
33 changes: 17 additions & 16 deletions clientcontroller/retry_utils.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package clientcontroller

import (
"strings"
"errors"
"time"

errorsmod "cosmossdk.io/errors"
"github.com/avast/retry-go/v4"
ftypes "github.com/babylonchain/babylon/x/finality/types"

"github.com/babylonchain/btc-validator/types"
)

// Variables used for retries
@@ -18,38 +18,39 @@ var (
)

// these errors are considered unrecoverable because these indicate
// something critical in the validator program or the Babylon server
var unrecoverableErrors = []*errorsmod.Error{
ftypes.ErrBlockNotFound,
ftypes.ErrInvalidFinalitySig,
ftypes.ErrHeightTooHigh,
ftypes.ErrInvalidPubRand,
ftypes.ErrNoPubRandYet,
ftypes.ErrPubRandNotFound,
ftypes.ErrTooFewPubRand,
// something critical in the validator program or the consumer chain
var unrecoverableErrors = []error{
types.ErrBlockNotFound,
types.ErrInvalidFinalitySig,
types.ErrHeightTooHigh,
types.ErrInvalidPubRand,
types.ErrNoPubRandYet,
types.ErrPubRandNotFound,
types.ErrTooFewPubRand,
types.ErrValidatorSlashed,
}

// IsUnrecoverable returns true when the error is in the unrecoverableErrors list
func IsUnrecoverable(err error) bool {
for _, e := range unrecoverableErrors {
if strings.Contains(err.Error(), e.Error()) {
if errors.Is(err, e) {
return true
}
}

return false
}

var expectedErrors = []*errorsmod.Error{
var expectedErrors = []error{
// if due to some low-level reason (e.g., network), we submit duplicated finality sig,
// we should just ignore the error
ftypes.ErrDuplicatedFinalitySig,
types.ErrDuplicatedFinalitySig,
}

// IsExpected returns true when the error is in the expectedErrors list
func IsExpected(err error) bool {
for _, e := range expectedErrors {
if strings.Contains(err.Error(), e.Error()) {
if errors.Is(err, e) {
return true
}
}
16 changes: 12 additions & 4 deletions itest/e2e_test.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/babylonchain/btc-validator/clientcontroller"
"github.com/babylonchain/btc-validator/proto"
"github.com/babylonchain/btc-validator/service"
"github.com/babylonchain/btc-validator/types"
"github.com/babylonchain/btc-validator/val"
@@ -108,8 +109,7 @@ func TestValidatorLifeCycle(t *testing.T) {
require.True(t, dels[0].BabylonPk.Equals(delData.DelegatorBabylonKey))

// check there's a block finalized
finalizedBlocks := tm.WaitForNFinalizedBlocks(t, 1)
t.Logf("the latest finalized block is at %v", finalizedBlocks[0].Height)
_ = tm.WaitForNFinalizedBlocks(t, 1)
}

// TestMultipleValidators tests starting with multiple validators
@@ -163,6 +163,7 @@ func TestMultipleValidators(t *testing.T) {

func TestJurySigSubmission(t *testing.T) {
tm := StartManagerWithValidator(t, 1, true)
// changing the mode because we need to ensure the validator is also stopped when the test is finished
defer tm.Stop(t)
app := tm.Va
valIns := app.ListValidatorInstances()[0]
@@ -172,6 +173,8 @@ func TestJurySigSubmission(t *testing.T) {

dels := tm.WaitForValNActiveDels(t, valIns.GetBtcPkBIP340(), 1)
require.True(t, dels[0].BabylonPk.Equals(delData.DelegatorBabylonKey))
err := valIns.Stop()
require.NoError(t, err)
}

// TestDoubleSigning tests the attack scenario where the validator
@@ -203,7 +206,6 @@ func TestDoubleSigning(t *testing.T) {

// check there's a block finalized
finalizedBlocks := tm.WaitForNFinalizedBlocks(t, 1)
t.Logf("the latest finalized block is at %v", finalizedBlocks[0].Height)

// attack: manually submit a finality vote over a conflicting block
// to trigger the extraction of validator's private key
@@ -218,6 +220,13 @@ func TestDoubleSigning(t *testing.T) {
localKey, err := getBtcPrivKey(app.GetKeyring(), val.KeyName(valIns.GetStoreValidator().KeyName))
require.NoError(t, err)
require.True(t, localKey.Key.Equals(&extractedKey.Key) || localKey.Key.Negate().Equals(&extractedKey.Key))

// try to submit another signature and should get error due to being slashed already
_, _, err = valIns.TestSubmitFinalitySignatureAndExtractPrivKey(b)
require.ErrorIs(t, err, types.ErrValidatorSlashed)

tm.WaitForValStopped(t, valIns.GetBabylonPk())
require.Equal(t, proto.ValidatorStatus_SLASHED, valIns.GetStatus())
}

func getBtcPrivKey(kr keyring.Keyring, keyName val.KeyName) (*btcec.PrivateKey, error) {
@@ -261,7 +270,6 @@ func TestFastSync(t *testing.T) {

// check there's a block finalized
finalizedBlocks := tm.WaitForNFinalizedBlocks(t, 1)
t.Logf("the latest finalized block is at %v", finalizedBlocks[0].Height)

n := 3
// stop the validator for a few blocks then restart to trigger the fast sync
32 changes: 29 additions & 3 deletions itest/test_manager.go
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@ import (
)

var (
eventuallyWaitTimeOut = 30 * time.Second
eventuallyWaitTimeOut = 1 * time.Minute
eventuallyPollTime = 500 * time.Millisecond
)

@@ -78,6 +78,8 @@ func StartManager(t *testing.T, isJury bool) *TestManager {
cfg := defaultValidatorConfig(bh.GetNodeDataDir(), testDir, isJury)

bc, err := clientcontroller.NewBabylonController(bh.GetNodeDataDir(), cfg.BabylonConfig, logger)
// making sure the fee is sufficient
cfg.BabylonConfig.GasAdjustment = 1.5
require.NoError(t, err)

valApp, err := service.NewValidatorAppFromConfig(cfg, logger, bc)
@@ -120,22 +122,28 @@ func StartManagerWithValidator(t *testing.T, n int, isJury bool) *TestManager {
require.NoError(t, err)
err = app.StartHandlingValidator(bbnPk)
require.NoError(t, err)
valIns, err := app.GetValidatorInstance(bbnPk)
require.NoError(t, err)
require.True(t, valIns.IsRunning())
require.NoError(t, err)
}

require.Equal(t, n, len(app.ListValidatorInstances()))

t.Logf("the test manager is running with %v validators", n)

return tm
}

func (tm *TestManager) Stop(t *testing.T) {
err := tm.Va.Stop()
require.NoError(t, err)
err = tm.BabylonHandler.Stop()
require.NoError(t, err)
err = os.RemoveAll(tm.Config.DatabaseConfig.Path)
require.NoError(t, err)
err = os.RemoveAll(tm.Config.BabylonConfig.KeyDirectory)
require.NoError(t, err)
err = tm.BabylonHandler.Stop()
require.NoError(t, err)
}

func (tm *TestManager) WaitForValRegistered(t *testing.T, bbnPk *secp256k1.PubKey) {
@@ -146,6 +154,8 @@ func (tm *TestManager) WaitForValRegistered(t *testing.T, bbnPk *secp256k1.PubKe
}
return len(queriedValidators) == 1 && queriedValidators[0].BabylonPk.Equals(bbnPk)
}, eventuallyWaitTimeOut, eventuallyPollTime)

t.Logf("the validator is successfully registered")
}

func (tm *TestManager) WaitForValPubRandCommitted(t *testing.T, valIns *service.ValidatorInstance) {
@@ -156,6 +166,8 @@ func (tm *TestManager) WaitForValPubRandCommitted(t *testing.T, valIns *service.
}
return int(tm.Config.NumPubRand) == len(randPairs)
}, eventuallyWaitTimeOut, eventuallyPollTime)

t.Logf("public randomness is successfully committed")
}

func (tm *TestManager) WaitForNPendingDels(t *testing.T, n int) []*bstypes.BTCDelegation {
@@ -171,6 +183,8 @@ func (tm *TestManager) WaitForNPendingDels(t *testing.T, n int) []*bstypes.BTCDe
return len(dels) == n
}, eventuallyWaitTimeOut, eventuallyPollTime)

t.Logf("delegations are pending")

return dels
}

@@ -188,6 +202,8 @@ func (tm *TestManager) WaitForValNActiveDels(t *testing.T, btcPk *bbntypes.BIP34
return len(dels) == n && CheckDelsStatus(dels, currentBtcTip.Height, params.FinalizationTimeoutBlocks, bstypes.BTCDelegationStatus_ACTIVE)
}, eventuallyWaitTimeOut, eventuallyPollTime)

t.Logf("the delegation is active, validators should start voting")

return dels
}

@@ -232,14 +248,24 @@ func (tm *TestManager) WaitForNFinalizedBlocks(t *testing.T, n int) []*types.Blo
require.Eventually(t, func() bool {
blocks, err = tm.BabylonClient.QueryLatestFinalizedBlocks(uint64(n))
if err != nil {
t.Logf("failed to get the latest finalized block: %s", err.Error())
return false
}
return len(blocks) == n
}, eventuallyWaitTimeOut, eventuallyPollTime)

t.Logf("the block is finalized at %v", blocks[0].Height)

return blocks
}

func (tm *TestManager) WaitForValStopped(t *testing.T, bbnPk *secp256k1.PubKey) {
require.Eventually(t, func() bool {
_, err := tm.Va.GetValidatorInstance(bbnPk)
return err != nil
}, eventuallyWaitTimeOut, eventuallyPollTime)
}

func (tm *TestManager) StopAndRestartValidatorAfterNBlocks(t *testing.T, n int, valIns *service.ValidatorInstance) {
headerBeforeStop, err := tm.BabylonClient.QueryBestHeader()
require.NoError(t, err)
82 changes: 44 additions & 38 deletions proto/validators.pb.go
4 changes: 3 additions & 1 deletion proto/validators.proto
Original file line number Diff line number Diff line change
@@ -179,6 +179,8 @@ enum ValidatorStatus {
REGISTERED = 1 [(gogoproto.enumvalue_customname) = "REGISTERED"];
// ACTIVE defines a validator that is delegated to vote
ACTIVE = 2 [(gogoproto.enumvalue_customname) = "ACTIVE"];
// INACTIVE defines a validator whose delegations are reduced to zero
// INACTIVE defines a validator whose delegations are reduced to zero but not slashed
INACTIVE = 3 [(gogoproto.enumvalue_customname) = "INACTIVE"];
// SLASHED defines a validator that has been slashed
SLASHED = 4 [(gogoproto.enumvalue_customname) = "SLASHED"];
}
48 changes: 19 additions & 29 deletions service/app.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package service

import (
"encoding/hex"
"fmt"
"sync"

@@ -71,13 +70,18 @@ func NewValidatorAppFromConfig(
}
}

vm, err := NewValidatorManager(valStore, config, kr, cc, logger)
if err != nil {
return nil, fmt.Errorf("failed to create validator manager: %w", err)
}

return &ValidatorApp{
cc: cc,
vs: valStore,
kr: kr,
config: config,
logger: logger,
validatorManager: NewValidatorManager(),
validatorManager: vm,
quit: make(chan struct{}),
sentQuit: make(chan struct{}),
eventQuit: make(chan struct{}),
@@ -172,23 +176,11 @@ func (app *ValidatorApp) RegisterValidator(keyName string) (*RegisterValidatorRe
// StartHandlingValidator starts a validator instance with the given Babylon public key
// Note: this should be called right after the validator is registered
func (app *ValidatorApp) StartHandlingValidator(bbnPk *secp256k1.PubKey) error {
return app.validatorManager.addValidatorInstance(bbnPk, app.config, app.vs, app.kr, app.cc, app.logger)
return app.validatorManager.addValidatorInstance(bbnPk)
}

func (app *ValidatorApp) StartHandlingValidators() error {
storedValidators, err := app.vs.ListRegisteredValidators()
if err != nil {
return err
}

for _, v := range storedValidators {
err = app.StartHandlingValidator(v.GetBabylonPK())
if err != nil {
return err
}
}

return nil
return app.validatorManager.Start()
}

// AddJurySignature adds a Jury signature on the given Bitcoin delegation and submits it to Babylon
@@ -405,10 +397,12 @@ func (app *ValidatorApp) Stop() error {
close(app.quit)
app.wg.Wait()

app.logger.Debug("Stopping validators")
if err := app.validatorManager.stop(); err != nil {
stopErr = err
return
if !app.IsJury() {
app.logger.Debug("Stopping validators")
if err := app.validatorManager.Stop(); err != nil {
stopErr = err
return
}
}

app.logger.Debug("Sent to Babylon loop stopped")
@@ -484,17 +478,13 @@ func (app *ValidatorApp) handleCreateValidatorRequest(req *createValidatorReques
return nil, fmt.Errorf("failed to save validator: %w", err)
}

btcPubKey := validator.MustGetBTCPK()
babylonPubKey := validator.GetBabylonPK()

app.logger.Info("successfully created validator")
app.logger.WithFields(logrus.Fields{
"btc_pub_key": hex.EncodeToString(btcPubKey.SerializeCompressed()),
"babylon_pub_key": hex.EncodeToString(babylonPubKey.Key),
}).Debug("created validator")
"btc_pub_key": validator.MustGetBIP340BTCPK().MarshalHex(),
"babylon_pub_key": validator.GetBabylonPkHexString(),
}).Debug("successfully created a validator")

return &createValidatorResponse{
BtcValidatorPk: *btcPubKey,
BabylonValidatorPk: *babylonPubKey,
BtcValidatorPk: *validator.MustGetBTCPK(),
BabylonValidatorPk: *validator.GetBabylonPK(),
}, nil
}
13 changes: 6 additions & 7 deletions service/event_loop.go
Original file line number Diff line number Diff line change
@@ -95,7 +95,7 @@ func (app *ValidatorApp) eventLoop() {
// we always check if the validator is in the DB before sending the registration request
app.logger.WithFields(logrus.Fields{
"bbn_pk": ev.bbnPubKey,
}).Fatal("Registred validator not found in DB")
}).Fatal("registered validator not found in DB")
}

// change the status of the validator to registered
@@ -158,12 +158,11 @@ func (app *ValidatorApp) handleSentToBabylonLoop() {
continue
}

if res != nil {
app.logger.WithFields(logrus.Fields{
"bbnPk": hex.EncodeToString(req.bbnPubKey.Key),
"txHash": res.TxHash,
}).Info("successfully registered validator on babylon")
}
app.logger.WithFields(logrus.Fields{
"bbnPk": hex.EncodeToString(req.bbnPubKey.Key),
"btcPubKey": req.btcPubKey.MarshalHex(),
"txHash": res.TxHash,
}).Info("successfully registered validator on babylon")

app.validatorRegisteredEventChan <- &validatorRegisteredEvent{
bbnPubKey: req.bbnPubKey,
4 changes: 1 addition & 3 deletions service/fastsync.go
Original file line number Diff line number Diff line change
@@ -86,9 +86,7 @@ func (v *ValidatorInstance) FastSync(startHeight, endHeight uint64) (*FastSyncRe
}).Debug("the validator is catching up by sending finality signatures in a batch")
}

if err := v.SetLastProcessedHeight(endHeight); err != nil {
return nil, err
}
v.MustSetLastProcessedHeight(endHeight)

return &FastSyncResult{
Responses: responses,
198 changes: 107 additions & 91 deletions service/validator_instance.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package service

import (
"encoding/hex"
"errors"
"fmt"
"strings"
"sync"
@@ -28,8 +29,6 @@ import (
"github.com/babylonchain/btc-validator/valcfg"
)

const instanceTerminatingMsg = "terminating the instance due to critical error"

type state struct {
v *proto.StoreValidator
s *val.ValidatorStore
@@ -48,6 +47,7 @@ type ValidatorInstance struct {
poller *ChainPoller

laggingTargetChan chan *types.BlockInfo
criticalErrChan chan<- *CriticalError

isStarted *atomic.Bool
inSync *atomic.Bool
@@ -65,6 +65,7 @@ func NewValidatorInstance(
s *val.ValidatorStore,
kr keyring.Keyring,
cc clientcontroller.ClientController,
errChan chan<- *CriticalError,
logger *logrus.Logger,
) (*ValidatorInstance, error) {
v, err := s.GetStoreValidator(bbnPk.Key)
@@ -90,13 +91,14 @@ func NewValidatorInstance(
v: v,
s: s,
},
cfg: cfg,
logger: logger,
isStarted: atomic.NewBool(false),
inSync: atomic.NewBool(false),
isLagging: atomic.NewBool(false),
kc: kc,
cc: cc,
cfg: cfg,
logger: logger,
isStarted: atomic.NewBool(false),
inSync: atomic.NewBool(false),
isLagging: atomic.NewBool(false),
criticalErrChan: errChan,
kc: kc,
cc: cc,
}, nil
}

@@ -105,19 +107,19 @@ func (v *ValidatorInstance) GetStoreValidator() *proto.StoreValidator {
}

func (v *ValidatorInstance) GetBabylonPk() *secp256k1.PubKey {
return v.bbnPk
return v.state.v.GetBabylonPK()
}

func (v *ValidatorInstance) GetBabylonPkHex() string {
return hex.EncodeToString(v.bbnPk.Key)
return hex.EncodeToString(v.state.v.GetBabylonPk())
}

func (v *ValidatorInstance) GetBtcPkBIP340() *bbntypes.BIP340PubKey {
return v.btcPk
return v.state.v.MustGetBIP340BTCPK()
}

func (v *ValidatorInstance) MustGetBtcPk() *btcec.PublicKey {
return v.btcPk.MustToBTCPK()
return v.state.v.MustGetBTCPK()
}

// Exposed mostly for testing purposes
@@ -126,7 +128,7 @@ func (v *ValidatorInstance) BtcPrivKey() (*btcec.PrivateKey, error) {
}

func (v *ValidatorInstance) GetBtcPkHex() string {
return v.btcPk.MarshalHex()
return v.GetBtcPkBIP340().MarshalHex()
}

func (v *ValidatorInstance) GetStatus() proto.ValidatorStatus {
@@ -174,27 +176,77 @@ func (v *ValidatorInstance) SetStatus(s proto.ValidatorStatus) error {
return v.state.s.UpdateValidator(v.state.v)
}

func (v *ValidatorInstance) MustSetStatus(s proto.ValidatorStatus) {
if err := v.SetStatus(s); err != nil {
v.logger.WithFields(logrus.Fields{
"err": err,
"btc_pk_hex": v.GetBtcPkHex(),
"status": s.String(),
}).Fatal("failed to set validator status")
}
}

func (v *ValidatorInstance) SetLastVotedHeight(height uint64) error {
v.state.v.LastVotedHeight = height
return v.state.s.UpdateValidator(v.state.v)
}

func (v *ValidatorInstance) MustSetLastVotedHeight(height uint64) {
if err := v.SetLastVotedHeight(height); err != nil {
v.logger.WithFields(logrus.Fields{
"err": err,
"btc_pk_hex": v.GetBtcPkHex(),
"height": height,
}).Fatal("failed to set last voted height")
}
}

func (v *ValidatorInstance) SetLastProcessedHeight(height uint64) error {
v.state.v.LastProcessedHeight = height
return v.state.s.UpdateValidator(v.state.v)
}

func (v *ValidatorInstance) MustSetLastProcessedHeight(height uint64) {
if err := v.SetLastProcessedHeight(height); err != nil {
v.logger.WithFields(logrus.Fields{
"err": err,
"btc_pk_hex": v.GetBtcPkHex(),
"height": height,
}).Fatal("failed to set last processed height")
}
}

func (v *ValidatorInstance) SetLastCommittedHeight(height uint64) error {
v.state.v.LastCommittedHeight = height
return v.state.s.UpdateValidator(v.state.v)
}

func (v *ValidatorInstance) MustSetLastCommittedHeight(height uint64) {
if err := v.SetLastCommittedHeight(height); err != nil {
v.logger.WithFields(logrus.Fields{
"err": err,
"btc_pk_hex": v.GetBtcPkHex(),
"height": height,
}).Fatal("failed to set last committed height")
}
}

func (v *ValidatorInstance) updateStateAfterFinalitySigSubmission(height uint64) error {
v.state.v.LastProcessedHeight = height
v.state.v.LastVotedHeight = height
return v.state.s.UpdateValidator(v.state.v)
}

func (v *ValidatorInstance) MustUpdateStateAfterFinalitySigSubmission(height uint64) {
if err := v.updateStateAfterFinalitySigSubmission(height); err != nil {
v.logger.WithFields(logrus.Fields{
"err": err,
"btc_pk_hex": v.GetBtcPkHex(),
"height": height,
}).Fatal("failed to update state after finality sig submission")
}
}

func (v *ValidatorInstance) Start() error {
if v.isStarted.Swap(true) {
return fmt.Errorf("the validator instance %s is already started", v.GetBtcPkHex())
@@ -273,6 +325,10 @@ func (v *ValidatorInstance) Stop() error {
return nil
}

func (v *ValidatorInstance) IsRunning() bool {
return v.isStarted.Load()
}

func (v *ValidatorInstance) signUnbondingTransactions(
privKey *btcec.PrivateKey,
toSign []*bstypes.BTCDelegation) ([]unbondingTxSigData, error) {
@@ -456,39 +512,21 @@ func (v *ValidatorInstance) finalitySigSubmissionLoop() {
// use the copy of the block to avoid the impact to other receivers
nextBlock := *b
should, err := v.shouldSubmitFinalitySignature(&nextBlock)

if err != nil {
v.logger.WithFields(logrus.Fields{
"err": err,
"btc_pk_hex": v.GetBtcPkHex(),
"block_height": nextBlock.Height,
}).Warnf(instanceTerminatingMsg)
return
v.reportCriticalErr(err)
continue
}

if !should {
if err := v.SetLastProcessedHeight(nextBlock.Height); err != nil {
v.logger.WithFields(logrus.Fields{
"err": err,
"btc_pk_hex": v.GetBtcPkHex(),
"block_height": nextBlock.Height,
}).Warnf(instanceTerminatingMsg)
return
}
v.MustSetLastProcessedHeight(nextBlock.Height)
continue
}

res, err := v.retrySubmitFinalitySignatureUntilBlockFinalized(&nextBlock)
if err != nil {
if strings.Contains(err.Error(), bstypes.ErrBTCValAlreadySlashed.Error()) {
_ = v.SetStatus(proto.ValidatorStatus_INACTIVE)
v.logger.Infof("the validator %s is slashed, terminating the instance", v.GetBtcPkHex())
} else {
v.logger.WithFields(logrus.Fields{
"err": err,
"btc_pk_hex": v.GetBtcPkHex(),
"block_height": nextBlock.Height,
}).Warnf(instanceTerminatingMsg)
}
// TODO: we should stop the whole validator instance other than merely close the for loop
return
v.reportCriticalErr(err)
continue
}
if res != nil {
v.logger.WithFields(logrus.Fields{
@@ -501,15 +539,14 @@ func (v *ValidatorInstance) finalitySigSubmissionLoop() {
res, err := v.tryFastSync(targetBlock)
v.isLagging.Store(false)
if err != nil {
if strings.Contains(err.Error(), bstypes.ErrBTCValAlreadySlashed.Error()) {
_ = v.SetStatus(proto.ValidatorStatus_INACTIVE)
v.logger.Infof("the validator %s is slashed, terminating the instance", v.GetBtcPkHex())
return
if errors.Is(err, types.ErrValidatorSlashed) {
v.reportCriticalErr(err)
continue
}
v.logger.WithFields(logrus.Fields{
"err": err,
"btc_pk_hex": v.GetBtcPkHex(),
}).Error("failed to sync up")
}).Error("failed to sync up, will try again later")
continue
}
// response might be nil if sync is not needed
@@ -542,25 +579,13 @@ func (v *ValidatorInstance) randomnessCommitmentLoop() {
case <-commitRandTicker.C:
tipBlock, err := v.getLatestBlockWithRetry()
if err != nil {
v.logger.WithFields(logrus.Fields{
"err": err,
"btc_pk_hex": v.GetBtcPkHex(),
}).Warnf(instanceTerminatingMsg)
return
v.reportCriticalErr(err)
continue
}
txRes, err := v.retryCommitPubRandUntilBlockFinalized(tipBlock)
if err != nil {
if strings.Contains(err.Error(), bstypes.ErrBTCValAlreadySlashed.Error()) {
_ = v.SetStatus(proto.ValidatorStatus_INACTIVE)
v.logger.Infof("the validator %s is slashed, terminating the instance", v.GetBtcPkHex())
} else {
v.logger.WithFields(logrus.Fields{
"err": err,
"btc_pk_hex": v.GetBtcPkHex(),
"block_height": tipBlock,
}).Warnf(instanceTerminatingMsg)
}
return
v.reportCriticalErr(err)
continue
}
if txRes != nil {
v.logger.WithFields(logrus.Fields{
@@ -602,7 +627,6 @@ func (v *ValidatorInstance) checkLaggingLoop() {
"err": err,
"btc_pk_hex": v.GetBtcPkHex(),
}).Error("failed to get the latest block of the consumer chain")
// TODO should terminate the validator instance from the outside
continue
}

@@ -698,9 +722,9 @@ func (v *ValidatorInstance) shouldSubmitFinalitySignature(b *types.BlockInfo) (b
if err != nil {
return false, err
}
if err = v.updateStatusWithPower(power); err != nil {
return false, err
}

v.updateStatusWithPower(power)

if power == 0 {
v.logger.WithFields(logrus.Fields{
"btc_pk_hex": v.GetBtcPkHex(),
@@ -712,6 +736,14 @@ func (v *ValidatorInstance) shouldSubmitFinalitySignature(b *types.BlockInfo) (b
return true, nil
}

func (v *ValidatorInstance) reportCriticalErr(err error) {
v.criticalErrChan <- &CriticalError{
err: err,
valBtcPk: v.GetBtcPkBIP340(),
bbnPk: v.GetBabylonPk(),
}
}

// checkLagging returns true if the lasted voted height is behind by a configured gap
func (v *ValidatorInstance) checkLagging(currentBlock *types.BlockInfo) bool {
return currentBlock.Height >= v.GetLastProcessedHeight()+v.cfg.FastSyncGap
@@ -782,6 +814,7 @@ func (v *ValidatorInstance) retryCommitPubRandUntilBlockFinalized(targetBlock *t
return nil, err
}
v.logger.WithFields(logrus.Fields{
"btc_val_pk": v.GetBtcPkHex(),
"currFailures": failedCycles,
"target_block_height": targetBlock.Height,
"error": err,
@@ -900,36 +933,25 @@ func (v *ValidatorInstance) CommitPubRand(tipBlock *types.BlockInfo) (*provider.
}

newLastCommittedHeight := startHeight + uint64(len(pubRandList)-1)
if err := v.SetLastCommittedHeight(newLastCommittedHeight); err != nil {
v.logger.WithFields(logrus.Fields{
"err": err,
"btc_pk_hex": v.GetBtcPkHex(),
}).Fatal("err while saving last committed height to DB")
}

v.MustSetLastCommittedHeight(newLastCommittedHeight)

return res, nil
}

func (v *ValidatorInstance) updateStatusWithPower(power uint64) error {
func (v *ValidatorInstance) updateStatusWithPower(power uint64) {
if power == 0 {
if v.GetStatus() == proto.ValidatorStatus_ACTIVE {
// the validator is slashed or unbonded from the consumer chain
if err := v.SetStatus(proto.ValidatorStatus_INACTIVE); err != nil {
return fmt.Errorf("cannot set the validator status: %w", err)
}
v.MustSetStatus(proto.ValidatorStatus_INACTIVE)
}

return nil
return
}

// update the status
if v.GetStatus() == proto.ValidatorStatus_REGISTERED || v.GetStatus() == proto.ValidatorStatus_INACTIVE {
if err := v.SetStatus(proto.ValidatorStatus_ACTIVE); err != nil {
return fmt.Errorf("cannot set the validator status: %w", err)
}
v.MustSetStatus(proto.ValidatorStatus_ACTIVE)
}

return nil
}

// SubmitFinalitySignature builds and sends a finality signature over the given block to the consumer chain
@@ -946,9 +968,7 @@ func (v *ValidatorInstance) SubmitFinalitySignature(b *types.BlockInfo) (*provid
}

// update DB
if err := v.updateStateAfterFinalitySigSubmission(b.Height); err != nil {
return nil, fmt.Errorf("failed to update state in DB after finality sig submission: %w", err)
}
v.MustUpdateStateAfterFinalitySigSubmission(b.Height)

return res, nil
}
@@ -977,9 +997,7 @@ func (v *ValidatorInstance) SubmitBatchFinalitySignatures(blocks []*types.BlockI

// update DB
highBlock := blocks[len(blocks)-1]
if err := v.updateStateAfterFinalitySigSubmission(highBlock.Height); err != nil {
return nil, fmt.Errorf("failed to update state in DB after submission: %w", err)
}
v.MustUpdateStateAfterFinalitySigSubmission(highBlock.Height)

return res, nil
}
@@ -1028,9 +1046,7 @@ func (v *ValidatorInstance) TestSubmitFinalitySignatureAndExtractPrivKey(b *type
if power == 0 {
if v.GetStatus() == proto.ValidatorStatus_ACTIVE {
// the validator is slashed or unbonded from the consumer chain
if err := v.SetStatus(proto.ValidatorStatus_INACTIVE); err != nil {
return nil, nil, fmt.Errorf("cannot set the validator status: %w", err)
}
v.MustSetStatus(proto.ValidatorStatus_INACTIVE)
}
v.logger.WithFields(logrus.Fields{
"btc_pk_hex": btcPk.MarshalHex(),
140 changes: 130 additions & 10 deletions service/validator_manager.go
Original file line number Diff line number Diff line change
@@ -2,39 +2,145 @@ package service

import (
"encoding/hex"
"errors"
"fmt"
"sync"

bbntypes "github.com/babylonchain/babylon/types"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
"github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1"
"github.com/sirupsen/logrus"
"go.uber.org/atomic"

"github.com/babylonchain/btc-validator/clientcontroller"
"github.com/babylonchain/btc-validator/proto"
"github.com/babylonchain/btc-validator/types"
"github.com/babylonchain/btc-validator/val"
"github.com/babylonchain/btc-validator/valcfg"
)

const instanceTerminatingMsg = "terminating the validator instance due to critical error"

type CriticalError struct {
err error
// TODO use validator BTC key as the unique id of
// the validator; currently, the storage is keyed
// the babylon public key
valBtcPk *bbntypes.BIP340PubKey
bbnPk *secp256k1.PubKey
}

type ValidatorManager struct {
isStarted *atomic.Bool

mu sync.Mutex
// validator instances map keyed by the hex string of the Babylon public key
wg sync.WaitGroup

// running validator instances map keyed by the hex string of the BTC public key
vals map[string]*ValidatorInstance

// needed for initiating validator instances
vs *val.ValidatorStore
config *valcfg.Config
kr keyring.Keyring
cc clientcontroller.ClientController
logger *logrus.Logger

criticalErrChan chan *CriticalError

quit chan struct{}
}

func NewValidatorManager() *ValidatorManager {
func NewValidatorManager(vs *val.ValidatorStore,
config *valcfg.Config,
kr keyring.Keyring,
cc clientcontroller.ClientController,
logger *logrus.Logger,
) (*ValidatorManager, error) {
return &ValidatorManager{
vals: make(map[string]*ValidatorInstance),
vals: make(map[string]*ValidatorInstance),
criticalErrChan: make(chan *CriticalError),
isStarted: atomic.NewBool(false),
vs: vs,
config: config,
kr: kr,
cc: cc,
logger: logger,
quit: make(chan struct{}),
}, nil
}

// monitorCriticalErr takes actions when it receives critical errors from a validator instance
// if the validator is slashed, it will be terminated and the program keeps running in case
// new validators join
// otherwise, the program will panic
func (vm *ValidatorManager) monitorCriticalErr() {
defer vm.wg.Done()

var criticalErr *CriticalError
for {
select {
case criticalErr = <-vm.criticalErrChan:
vi, err := vm.getValidatorInstance(criticalErr.bbnPk)
if err != nil {
panic(fmt.Errorf("failed to get the validator instance: %w", err))
}
if errors.Is(criticalErr.err, types.ErrValidatorSlashed) {
vi.MustSetStatus(proto.ValidatorStatus_SLASHED)
if err := vm.removeValidatorInstance(vi.GetBabylonPk()); err != nil {
panic(fmt.Errorf("failed to terminate a slashed validator %s: %w", vi.GetBtcPkHex(), err))
}
continue
}
vi.logger.WithFields(logrus.Fields{
"err": criticalErr.err,
"btc_pk_hex": vi.GetBtcPkHex(),
}).Fatal(instanceTerminatingMsg)
case <-vm.quit:
return
}
}
}

func (vm *ValidatorManager) stop() error {
func (vm *ValidatorManager) Start() error {
if vm.isStarted.Swap(true) {
return fmt.Errorf("the validator manager is already started")
}

storedValidators, err := vm.vs.ListRegisteredValidators()
if err != nil {
return err
}

vm.wg.Add(1)
go vm.monitorCriticalErr()

for _, v := range storedValidators {
if err := vm.addValidatorInstance(v.GetBabylonPK()); err != nil {
return err
}
}

return nil
}

func (vm *ValidatorManager) Stop() error {
if !vm.isStarted.Swap(false) {
return fmt.Errorf("the validator manager has already stopped")
}

var stopErr error

for _, v := range vm.vals {
if err := v.Stop(); err != nil {
stopErr = err
break
}
}

close(vm.quit)
vm.wg.Wait()

return stopErr
}

@@ -63,14 +169,28 @@ func (vm *ValidatorManager) getValidatorInstance(babylonPk *secp256k1.PubKey) (*
return v, nil
}

func (vm *ValidatorManager) removeValidatorInstance(babylonPk *secp256k1.PubKey) error {
vm.mu.Lock()
defer vm.mu.Unlock()

keyHex := hex.EncodeToString(babylonPk.Key)
v, exists := vm.vals[keyHex]
if !exists {
return fmt.Errorf("cannot find the validator instance with PK: %s", keyHex)
}
if v.IsRunning() {
if err := v.Stop(); err != nil {
return fmt.Errorf("failed to stop the validator instance %s", keyHex)
}
}

delete(vm.vals, keyHex)
return nil
}

// addValidatorInstance creates a validator instance, starts it and adds it into the validator manager
func (vm *ValidatorManager) addValidatorInstance(
pk *secp256k1.PubKey,
config *valcfg.Config,
valStore *val.ValidatorStore,
kr keyring.Keyring,
cc clientcontroller.ClientController,
logger *logrus.Logger,
) error {
vm.mu.Lock()
defer vm.mu.Unlock()
@@ -80,7 +200,7 @@ func (vm *ValidatorManager) addValidatorInstance(
return fmt.Errorf("validator instance already exists")
}

valIns, err := NewValidatorInstance(pk, config, valStore, kr, cc, logger)
valIns, err := NewValidatorInstance(pk, vm.config, vm.vs, vm.kr, vm.cc, vm.criticalErrChan, vm.logger)
if err != nil {
return fmt.Errorf("failed to create validator %s instance: %w", pkHex, err)
}
17 changes: 17 additions & 0 deletions types/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package types

import (
"errors"
)

var (
ErrValidatorSlashed = errors.New("the validator has been slashed")
ErrTooFewPubRand = errors.New("the request contains too few public randomness")
ErrBlockNotFound = errors.New("the block is not found")
ErrHeightTooHigh = errors.New("the chain has not reached the given height yet")
ErrInvalidPubRand = errors.New("the public randomness list is invalid")
ErrNoPubRandYet = errors.New("the BTC validator has not committed any public randomness yet")
ErrPubRandNotFound = errors.New("public randomness is not found")
ErrInvalidFinalitySig = errors.New("finality signature is not valid")
ErrDuplicatedFinalitySig = errors.New("the finality signature has been casted before")
)
3 changes: 2 additions & 1 deletion val/valstore.go
Original file line number Diff line number Diff line change
@@ -191,6 +191,7 @@ func (vs *ValidatorStore) ListValidators() ([]*proto.StoreValidator, error) {
}

// ListRegisteredValidators returns a list of validators whose status is more than CREATED
// but less than SLASHED
func (vs *ValidatorStore) ListRegisteredValidators() ([]*proto.StoreValidator, error) {
k := vs.getValidatorListKey()
valsBytes, err := vs.s.List(k)
@@ -205,7 +206,7 @@ func (vs *ValidatorStore) ListRegisteredValidators() ([]*proto.StoreValidator, e
if err != nil {
panic(fmt.Errorf("failed to unmarshal validator from the database: %w", err))
}
if val.Status >= proto.ValidatorStatus_CREATED {
if val.Status > proto.ValidatorStatus_CREATED && val.Status < proto.ValidatorStatus_SLASHED {
valsList = append(valsList, val)
}
}

0 comments on commit b288764

Please sign in to comment.