Skip to content

Commit

Permalink
feat: stop chain block production if node unhealthy event is emitted (#…
Browse files Browse the repository at this point in the history
…319)

Co-authored-by: Michael Tsitrin <[email protected]>
  • Loading branch information
omritoptix and mtsitrin authored May 21, 2023
1 parent 8b006de commit 7b017fc
Show file tree
Hide file tree
Showing 8 changed files with 475 additions and 91 deletions.
117 changes: 65 additions & 52 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"github.com/avast/retry-go"
cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec"
abciconv "github.com/dymensionxyz/dymint/conv/abci"
"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/utils"
"github.com/libp2p/go-libp2p-core/crypto"
abci "github.com/tendermint/tendermint/abci/types"
tmcrypto "github.com/tendermint/tendermint/crypto"
Expand Down Expand Up @@ -78,6 +80,8 @@ type Manager struct {
batchRetryCancel context.CancelFunc
batchRetryMu sync.RWMutex

shouldProduceBlocksCh chan bool

syncTarget uint64
isSyncedCond sync.Cond

Expand Down Expand Up @@ -128,6 +132,9 @@ func NewManager(
logger.Info("WARNING: using default DA batch size bytes limit", "BlockBatchSizeBytes", config.DefaultNodeConfig.BlockBatchSizeBytes)
conf.BlockBatchSizeBytes = config.DefaultNodeConfig.BlockBatchSizeBytes
}
if conf.BlockTime == 0 {
panic("Block production time must be a positive number")
}

exec := state.NewBlockExecutor(proposerAddress, conf.NamespaceID, genesis.ChainID, mempool, proxyApp, eventBus, logger)

Expand Down Expand Up @@ -176,16 +183,33 @@ func NewManager(
settlementClient: settlementClient,
retriever: dalc.(da.BatchRetriever),
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
syncTargetDiode: diodes.NewOneToOne(1, nil),
syncCache: make(map[uint64]*types.Block),
isSyncedCond: *sync.NewCond(new(sync.Mutex)),
batchInProcess: batchInProcess,
logger: logger,
syncTargetDiode: diodes.NewOneToOne(1, nil),
syncCache: make(map[uint64]*types.Block),
isSyncedCond: *sync.NewCond(new(sync.Mutex)),
batchInProcess: batchInProcess,
shouldProduceBlocksCh: make(chan bool, 1),
logger: logger,
}

return agg, nil
}

// Start starts the block manager.
func (m *Manager) Start(ctx context.Context, isAggregator bool) error {
m.logger.Info("Starting the block manager")
if isAggregator {
m.logger.Info("Starting in aggregator mode")
// TODO(omritoptix): change to private methods
go m.ProduceBlockLoop(ctx)
}
// TODO(omritoptix): change to private methods
go m.RetriveLoop(ctx)
go m.SyncTargetLoop(ctx)
m.EventListener(ctx)

return nil
}

func getAddress(key crypto.PrivKey) ([]byte, error) {
rawKey, err := key.GetPublic().Raw()
if err != nil {
Expand All @@ -194,6 +218,30 @@ func getAddress(key crypto.PrivKey) ([]byte, error) {
return tmcrypto.AddressHash(rawKey), nil
}

// EventListener registers events to callbacks.
func (m *Manager) EventListener(ctx context.Context) {
go utils.SubscribeAndHandleEvents(ctx, m.pubsub, "nodeHealthStatusHandler", events.EventQueryHealthStatus, m.healthStatusEventCallback, m.logger)
go utils.SubscribeAndHandleEvents(ctx, m.pubsub, "ApplyBlockLoop", p2p.EventQueryNewNewGossipedBlock, m.applyBlockCallback, m.logger, 100)

}

func (m *Manager) healthStatusEventCallback(event pubsub.Message) {
eventData := event.Data().(*events.EventDataHealthStatus)
m.logger.Info("Received health status event", "eventData", eventData)
m.shouldProduceBlocksCh <- eventData.Healthy
}

func (m *Manager) applyBlockCallback(event pubsub.Message) {
m.logger.Debug("Received new block event", "eventData", event.Data())
eventData := event.Data().(p2p.GossipedBlock)
block := eventData.Block
commit := eventData.Commit
err := m.applyBlock(context.Background(), &block, &commit, blockMetaData{source: gossipedBlock})
if err != nil {
m.logger.Debug("Failed to apply block", "err", err)
}
}

// SetDALC is used to set DataAvailabilityLayerClient used by Manager.
// TODO(omritoptix): Remove this from here as it's only being used for tests.
func (m *Manager) SetDALC(dalc da.DataAvailabilityLayerClient) {
Expand Down Expand Up @@ -264,33 +312,24 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) {
if err != nil {
m.logger.Error("failed to wait for sync", "err", err)
}
// If we get blockTime of 0 we'll just run publishBlock in a loop
// vs waiting for ticks
produceBlockCh := make(chan bool, 1)
ticker := &time.Ticker{}
if m.conf.BlockTime == 0 {
produceBlockCh <- true
} else {
ticker = time.NewTicker(m.conf.BlockTime)
defer ticker.Stop()
}
// The func to invoke upon block publish
produceBlockLoop := func() {
err := m.produceBlock(ctx)
if err != nil {
m.logger.Error("error while producing block", "error", err)
}
}

ticker := time.NewTicker(m.conf.BlockTime)

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
produceBlockLoop()
case <-produceBlockCh:
for {
produceBlockLoop()
err := m.produceBlock(ctx)
if err != nil {
m.logger.Error("error while producing block", "error", err)
}
case shouldProduceBlocks := <-m.shouldProduceBlocksCh:
for !shouldProduceBlocks {
m.logger.Info("Stopped block production")
shouldProduceBlocks = <-m.shouldProduceBlocksCh
}
m.logger.Info("Resumed Block production")
}

}
Expand Down Expand Up @@ -400,32 +439,6 @@ func (m *Manager) syncUntilTarget(ctx context.Context, syncTarget uint64) {
}
}

// ApplyBlockLoop is responsible for applying blocks retrieved from pubsub server.
func (m *Manager) ApplyBlockLoop(ctx context.Context) {
subscription, err := m.pubsub.Subscribe(ctx, "ApplyBlockLoop", p2p.EventQueryNewNewGossipedBlock, 100)
if err != nil {
m.logger.Error("failed to subscribe to gossiped blocked events")
panic(err)
}
for {
select {
case blockEvent := <-subscription.Out():
m.logger.Debug("Received new block event", "eventData", blockEvent.Data())
eventData := blockEvent.Data().(p2p.GossipedBlock)
block := eventData.Block
commit := eventData.Commit
err := m.applyBlock(ctx, &block, &commit, blockMetaData{source: gossipedBlock})
if err != nil {
continue
}
case <-ctx.Done():
return
case <-subscription.Cancelled():
m.logger.Info("Subscription for gossied blocked events canceled")
}
}
}

// applyBlock applies the block to the store and the abci app.
// steps: save block -> execute block with app -> update state -> commit block to app -> update store height and state hash.
// As the entire process can't be atomic we need to make sure the following condition apply before
Expand Down
84 changes: 80 additions & 4 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"sync/atomic"
"testing"
"time"
Expand All @@ -13,6 +14,7 @@ import (

"github.com/dymensionxyz/dymint/log/test"
mempoolv1 "github.com/dymensionxyz/dymint/mempool/v1"
"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/testutil"
Expand Down Expand Up @@ -107,7 +109,8 @@ func TestInitialState(t *testing.T) {
t.Run(c.name, func(t *testing.T) {

dalc := getMockDALC(100*time.Second, logger)
agg, err := NewManager(key, conf, c.genesis, c.store, nil, proxyApp, dalc, settlementlc, nil, pubsubServer, p2pClient, logger)
agg, err := NewManager(key, conf, c.genesis, c.store, nil, proxyApp, dalc, settlementlc,
nil, pubsubServer, p2pClient, logger)
assert.NoError(err)
assert.NotNil(agg)
assert.Equal(c.expectedChainID, agg.lastState.ChainID)
Expand Down Expand Up @@ -156,8 +159,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
t.Log("Sync the manager")
ctx, cancel = context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
go manager.RetriveLoop(ctx)
go manager.ApplyBlockLoop(ctx)
go manager.Start(ctx, false)
select {
case <-ctx.Done():
assert.Greater(t, manager.store.Height(), lastStoreHeight)
Expand Down Expand Up @@ -269,6 +271,79 @@ func TestProducePendingBlock(t *testing.T) {
assert.Equal(t, block.Header.Hash(), *(*[32]byte)(manager.lastState.LastBlockID.Hash))
}

// TestBlockProductionNodeHealth tests the different scenarios of block production when the node health is toggling.
// The test does the following:
// 1. Send healthy event and validate blocks are produced
// 2. Send unhealthy event and validate blocks are not produced
// 3. Send another unhealthy event and validate blocks are still not produced
// 4. Send healthy event and validate blocks are produced
func TestBlockProductionNodeHealth(t *testing.T) {
require := require.New(t)
assert := assert.New(t)
// Setup app
app := testutil.GetAppMock()
// Create proxy app
clientCreator := proxy.NewLocalClientCreator(app)
proxyApp := proxy.NewAppConns(clientCreator)
err := proxyApp.Start()
require.NoError(err)
// Init manager
manager, err := getManager(getManagerConfig(), nil, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)

cases := []struct {
name string
healthStatusEvent map[string][]string
healthStatusEventData interface{}
shouldProduceBlocks bool
}{
{
name: "HealthyEventBlocksProduced",
healthStatusEvent: map[string][]string{events.EventNodeTypeKey: {events.EventHealthStatus}},
healthStatusEventData: &events.EventDataHealthStatus{Healthy: true, Error: nil},
shouldProduceBlocks: true,
},
{
name: "UnhealthyEventBlocksNotProduced",
healthStatusEvent: map[string][]string{events.EventNodeTypeKey: {events.EventHealthStatus}},
healthStatusEventData: &events.EventDataHealthStatus{Healthy: false, Error: errors.New("Unhealthy")},
shouldProduceBlocks: false,
},
{
name: "UnhealthyEventBlocksStillNotProduced",
healthStatusEvent: map[string][]string{events.EventNodeTypeKey: {events.EventHealthStatus}},
healthStatusEventData: &events.EventDataHealthStatus{Healthy: false, Error: errors.New("Unhealthy")},
shouldProduceBlocks: false,
},
{
name: "HealthyEventBlocksProduced",
healthStatusEvent: map[string][]string{events.EventNodeTypeKey: {events.EventHealthStatus}},
healthStatusEventData: &events.EventDataHealthStatus{Healthy: true, Error: nil},
shouldProduceBlocks: true,
},
}
// Start the manager
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = manager.Start(ctx, true)
require.NoError(err)
time.Sleep(100 * time.Millisecond)

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
manager.pubsub.PublishWithEvents(context.Background(), c.healthStatusEventData, c.healthStatusEvent)
time.Sleep(500 * time.Millisecond)
blockHeight := manager.store.Height()
time.Sleep(500 * time.Millisecond)
if c.shouldProduceBlocks {
assert.Greater(manager.store.Height(), blockHeight)
} else {
assert.Equal(blockHeight, manager.store.Height())
}
})
}
}

// Test that in case we fail after the proxy app commit, next time we won't commit again to the proxy app
// and only update the store height and app hash. This test does the following:
// 1. Produce first block successfully
Expand Down Expand Up @@ -560,7 +635,8 @@ func getManager(conf config.BlockManagerConfig, settlementlc settlement.LayerCli
return nil, err
}

manager, err := NewManager(proposerKey, conf, genesis, managerStore, mp, proxyApp, dalc, settlementlc, nil, pubsubServer, p2pClient, logger)
manager, err := NewManager(proposerKey, conf, genesis, managerStore, mp, proxyApp, dalc, settlementlc, nil,
pubsubServer, p2pClient, logger)
if err != nil {
return nil, err
}
Expand Down
37 changes: 37 additions & 0 deletions da/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package da

import (
"fmt"

tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
)

// Define the event type keys
const (
// EventTypeKey is a reserved composite key for event name.
EventTypeKey = "da.event"
)

// Define the event types
const (
EventDAHealthStatus = "DAHealthStatus"
)

// EventDataDAHealthStatus defines the structure of the event data for the EventDataDAHealthStatus
type EventDataDAHealthStatus struct {
// Healthy is true if the da layer is healthy
Healthy bool
// Error is the error that was encountered in case of a health check failure
Error error
}

// Define queries
var (
EventQueryDAHealthStatus = QueryForEvent(EventDAHealthStatus)
)

// QueryForEvent returns a query for the given event.
func QueryForEvent(eventType string) tmpubsub.Query {
return tmquery.MustParse(fmt.Sprintf("%s='%s'", EventTypeKey, eventType))
}
37 changes: 37 additions & 0 deletions node/events/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package events

import (
"fmt"

tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
)

// Define the event type keys
const (
// EventTypeKey is a reserved composite key for event name.
EventNodeTypeKey = "node.event"
)

// Define the event types
const (
EventHealthStatus = "HealthStatus"
)

// EventDataHealthStatus defines the structure of the event data for the EventHealthStatus
type EventDataHealthStatus struct {
// Healthy is true if the base layers are healthy
Healthy bool
// Error is the error that was encountered in case of a health check failure
Error error
}

// Define queries
var (
EventQueryHealthStatus = QueryForEvent(EventHealthStatus)
)

// QueryForEvent returns a query for the given event.
func QueryForEvent(eventType string) tmpubsub.Query {
return tmquery.MustParse(fmt.Sprintf("%s='%s'", EventNodeTypeKey, eventType))
}
Loading

0 comments on commit 7b017fc

Please sign in to comment.