Skip to content

Commit

Permalink
fix(rpc): panic and publish health event only on create batch error (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
zale144 authored Jul 9, 2024
1 parent 7cb2843 commit b09c75a
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 73 deletions.
25 changes: 19 additions & 6 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ import (
"sync/atomic"

"code.cloudfoundry.org/go-diodes"
"github.com/dymensionxyz/gerr-cosmos/gerrc"
"golang.org/x/sync/errgroup"

"github.com/dymensionxyz/dymint/store"
uevent "github.com/dymensionxyz/dymint/utils/event"
"github.com/dymensionxyz/gerr-cosmos/gerrc"

"github.com/dymensionxyz/dymint/p2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/tendermint/tendermint/libs/pubsub"
tmtypes "github.com/tendermint/tendermint/types"

"github.com/dymensionxyz/dymint/p2p"

"github.com/tendermint/tendermint/proxy"

"github.com/dymensionxyz/dymint/config"
Expand Down Expand Up @@ -159,14 +162,24 @@ func (m *Manager) Start(ctx context.Context) error {
return fmt.Errorf("sync block manager: %w", err)
}

eg, ctx := errgroup.WithContext(ctx)

if isSequencer {
// Sequencer must wait till DA is synced to start submitting blobs
<-m.DAClient.Synced()
go m.ProduceBlockLoop(ctx)
go m.SubmitLoop(ctx)
eg.Go(func() error {
return m.SubmitLoop(ctx)
})
eg.Go(func() error {
return m.ProduceBlockLoop(ctx)
})
} else {
go m.RetrieveLoop(ctx)
go m.SyncToTargetHeightLoop(ctx)
eg.Go(func() error {
return m.RetrieveLoop(ctx)
})
eg.Go(func() error {
return m.SyncToTargetHeightLoop(ctx)
})
}
return nil
}
Expand Down
25 changes: 18 additions & 7 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,29 @@ import (
"fmt"
"time"

"github.com/dymensionxyz/dymint/store"
"github.com/dymensionxyz/gerr-cosmos/gerrc"

"github.com/dymensionxyz/dymint/types"
"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/store"
uevent "github.com/dymensionxyz/dymint/utils/event"

tmed25519 "github.com/tendermint/tendermint/crypto/ed25519"
cmtproto "github.com/tendermint/tendermint/proto/tendermint/types"
tmtypes "github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"

"github.com/dymensionxyz/dymint/types"
)

// ProduceBlockLoop is calling publishBlock in a loop as long as we're synced.
func (m *Manager) ProduceBlockLoop(ctx context.Context) {
func (m *Manager) ProduceBlockLoop(ctx context.Context) (err error) {
m.logger.Info("Started block producer loop.")

ticker := time.NewTicker(m.Conf.BlockTime)
defer ticker.Stop()
defer func() {
ticker.Stop()
m.logger.Info("Stopped block producer loop.")
}()

var nextEmptyBlock time.Time
firstBlock := true
Expand All @@ -31,12 +38,15 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:

// if empty blocks are configured to be enabled, and one is scheduled...
produceEmptyBlock := firstBlock || 0 == m.Conf.MaxIdleTime || nextEmptyBlock.Before(time.Now())
firstBlock = false

block, commit, err := m.ProduceAndGossipBlock(ctx, produceEmptyBlock)
var (
block *types.Block
commit *types.Commit
)
block, commit, err = m.ProduceAndGossipBlock(ctx, produceEmptyBlock)
if errors.Is(err, context.Canceled) {
m.logger.Error("Produce and gossip: context canceled.", "error", err)
return
Expand All @@ -46,7 +56,8 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) {
}
if errors.Is(err, ErrNonRecoverable) {
m.logger.Error("Produce and gossip: non-recoverable.", "error", err) // TODO: flush? or don't log at all?
panic(fmt.Errorf("produce and gossip block: %w", err))
uevent.MustPublish(ctx, m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList)
return
}
if err != nil {
m.logger.Error("Produce and gossip: uncategorized, assuming recoverable.", "error", err)
Expand Down
9 changes: 5 additions & 4 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// RetrieveLoop listens for new target sync heights and then syncs the chain by
// fetching batches from the settlement layer and then fetching the actual blocks
// from the DA.
func (m *Manager) RetrieveLoop(ctx context.Context) {
func (m *Manager) RetrieveLoop(ctx context.Context) (err error) {
m.logger.Info("Started retrieve loop.")
p := diodes.NewPoller(m.targetSyncHeight, diodes.WithPollingContext(ctx))

Expand All @@ -21,9 +21,10 @@ func (m *Manager) RetrieveLoop(ctx context.Context) {
if targetHeight == nil {
return
}
err := m.syncToTargetHeight(*(*uint64)(targetHeight))
if err != nil {
panic(fmt.Errorf("sync until target: %w", err))

if err = m.syncToTargetHeight(*(*uint64)(targetHeight)); err != nil {
err = fmt.Errorf("sync until target: %w", err)
return
}
}
}
Expand Down
15 changes: 10 additions & 5 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@ import (
"fmt"
"time"

"github.com/dymensionxyz/gerr-cosmos/gerrc"

"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/types"
uevent "github.com/dymensionxyz/dymint/utils/event"
"github.com/dymensionxyz/gerr-cosmos/gerrc"
)

// SubmitLoop is the main loop for submitting blocks to the DA and SL layers.
// It submits a batch when either
// 1) It accumulates enough block data, so it's necessary to submit a batch to avoid exceeding the max size
// 2) Enough time passed since the last submitted batch, so it's necessary to submit a batch to avoid exceeding the max time
func (m *Manager) SubmitLoop(ctx context.Context) {
func (m *Manager) SubmitLoop(ctx context.Context) (err error) {
maxTime := time.NewTicker(m.Conf.BatchSubmitMaxTime)
defer maxTime.Stop()

Expand All @@ -26,6 +27,8 @@ func (m *Manager) SubmitLoop(ctx context.Context) {

// defer func to clear the channels to release blocked goroutines on shutdown
defer func() {
m.logger.Info("Stopped submit loop.")

for {
select {
case <-m.producedSizeCh:
Expand All @@ -45,17 +48,19 @@ func (m *Manager) SubmitLoop(ctx context.Context) {
}

/*
Note: since we dont explicitly coordinate changes to the accumulated size with actual batch creation
Note: since we don't explicitly coordinate changes to the accumulated size with actual batch creation
we don't have a guarantee that the accumulated size is the same as the actual batch size that will be made.
But the batch creation step will also check the size is OK, so it's not a problem.
*/
m.AccumulatedBatchSize.Store(0)

// modular submission methods have own retries mechanism.
// if error returned, we assume it's unrecoverable.
err := m.HandleSubmissionTrigger()
err = m.HandleSubmissionTrigger()
if err != nil {
panic(fmt.Errorf("handle submission trigger: %w", err))
m.logger.Error("Error submitting batch", "error", err)
uevent.MustPublish(ctx, m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList)
return
}
maxTime.Reset(m.Conf.BatchSubmitMaxTime)
}
Expand Down
53 changes: 29 additions & 24 deletions block/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
cosmosed25519 "github.com/cosmos/cosmos-sdk/crypto/keys/ed25519"
"github.com/libp2p/go-libp2p/core/crypto"

"github.com/dymensionxyz/dymint/block"
"github.com/dymensionxyz/dymint/config"
slmocks "github.com/dymensionxyz/dymint/mocks/github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/testutil"
Expand Down Expand Up @@ -264,35 +265,39 @@ func TestSubmissionByBatchSize(t *testing.T) {
require.Equal(manager.AccumulatedBatchSize.Load(), uint64(0))
assert.Equal(manager.State.Height(), uint64(0))

var wg sync.WaitGroup
wg.Add(2) // Add 2 because we have 2 goroutines
submissionByBatchSize(manager, assert, c.expectedSubmission)
}
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
func submissionByBatchSize(manager *block.Manager, assert *assert.Assertions, expectedSubmission bool) {
var wg sync.WaitGroup
wg.Add(2) // Add 2 because we have 2 goroutines

go func() {
manager.ProduceBlockLoop(ctx)
wg.Done() // Decrease counter when this goroutine finishes
}()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

go func() {
manager.ProduceBlockLoop(ctx)
wg.Done() // Decrease counter when this goroutine finishes
}()

go func() {
assert.Zero(manager.LastSubmittedHeight.Load())
manager.SubmitLoop(ctx)
wg.Done() // Decrease counter when this goroutine finishes
}()
go func() {
assert.Zero(manager.LastSubmittedHeight.Load())
manager.SubmitLoop(ctx)
wg.Done() // Decrease counter when this goroutine finishes
}()

// wait for block to be produced but not for submission threshold
time.Sleep(200 * time.Millisecond)
// assert block produced but nothing submitted yet
assert.Greater(manager.State.Height(), uint64(0))
assert.Greater(manager.AccumulatedBatchSize.Load(), uint64(0))
// wait for block to be produced but not for submission threshold
time.Sleep(200 * time.Millisecond)
// assert block produced but nothing submitted yet
assert.Greater(manager.State.Height(), uint64(0))
assert.Greater(manager.AccumulatedBatchSize.Load(), uint64(0))

wg.Wait() // Wait for all goroutines to finish
wg.Wait() // Wait for all goroutines to finish

if c.expectedSubmission {
assert.Positive(manager.LastSubmittedHeight.Load())
} else {
assert.Zero(manager.LastSubmittedHeight.Load())
}
if expectedSubmission {
assert.Positive(manager.LastSubmittedHeight.Load())
} else {
assert.Zero(manager.LastSubmittedHeight.Load())
}
}
10 changes: 7 additions & 3 deletions block/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,24 @@ package block
import (
"context"

"github.com/tendermint/tendermint/libs/pubsub"

"github.com/dymensionxyz/dymint/types"

"code.cloudfoundry.org/go-diodes"

"github.com/dymensionxyz/dymint/settlement"
)

// SyncToTargetHeightLoop gets real time updates about settlement batch submissions and sends the latest height downstream
// to be retrieved by another process which will pull the data.
func (m *Manager) SyncToTargetHeightLoop(ctx context.Context) {
func (m *Manager) SyncToTargetHeightLoop(ctx context.Context) (err error) {
m.logger.Info("Started sync target loop")
subscription, err := m.Pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted)
var subscription *pubsub.Subscription
subscription, err = m.Pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted)
if err != nil {
m.logger.Error("subscribe to state update events", "error", err)
panic(err)
return
}

for {
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/dgraph-io/badger/v3 v3.2103.3
github.com/dymensionxyz/cosmosclient v0.4.2-beta
github.com/dymensionxyz/dymension/v3 v3.1.0-rc03.0.20240411195658-f7cd96f53b56
github.com/dymensionxyz/gerr-cosmos v0.1.2
github.com/dymensionxyz/gerr-cosmos v1.0.0
github.com/go-kit/kit v0.12.0
github.com/gofrs/uuid v4.3.0+incompatible
github.com/gogo/protobuf v1.3.3
Expand Down Expand Up @@ -62,7 +62,7 @@ require (
github.com/celestiaorg/rsmt2d v0.11.0 // indirect
github.com/cometbft/cometbft v0.37.2
github.com/cometbft/cometbft-db v0.11.0 // indirect
github.com/cosmos/cosmos-proto v1.0.0-beta.3 // indirect
github.com/cosmos/cosmos-proto v1.0.0-beta.3
github.com/cosmos/gogoproto v1.4.11 // indirect
github.com/creachadair/taskgroup v0.3.2 // indirect
github.com/deckarep/golang-set v1.8.0 // indirect
Expand Down Expand Up @@ -231,7 +231,7 @@ require (
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/mod v0.15.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sync v0.6.0
golang.org/x/sys v0.19.0 // indirect
golang.org/x/term v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand All @@ -255,7 +255,7 @@ require (
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/cosmos/ibc-go/v6 v6.2.1 // indirect
github.com/danwt/gerr v0.1.5 // indirect
github.com/danwt/gerr v1.0.0 // indirect
github.com/decred/dcrd/dcrec/edwards v1.0.0 // indirect
github.com/evmos/evmos/v12 v12.1.6 // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ github.com/creachadair/taskgroup v0.3.2/go.mod h1:wieWwecHVzsidg2CsUnFinW1faVN4+
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0=
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
github.com/danwt/gerr v0.1.5 h1:5s3G3cnftZG7Rq0+k+taMVteTmwKU66/g47kH6hY1js=
github.com/danwt/gerr v0.1.5/go.mod h1:tIj6P8ZPBLAbr64HdWqKHGUXbWJT6HenD08Fn98oAnE=
github.com/danwt/gerr v1.0.0 h1:v3Do0h1r+uctQQVYJfOTCo8uigp8oIaY4OL/wUU8LzI=
github.com/danwt/gerr v1.0.0/go.mod h1:tIj6P8ZPBLAbr64HdWqKHGUXbWJT6HenD08Fn98oAnE=
github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down Expand Up @@ -310,8 +310,8 @@ github.com/dymensionxyz/dymension/v3 v3.1.0-rc03.0.20240411195658-f7cd96f53b56 h
github.com/dymensionxyz/dymension/v3 v3.1.0-rc03.0.20240411195658-f7cd96f53b56/go.mod h1:3Pfrr8j/BR9ztNKztGfC5PqDiO6CcrzMLCJtFtPEVW4=
github.com/dymensionxyz/evmos/v12 v12.1.6-dymension-v0.3 h1:vmAdUGUc4rTIiO3Phezr7vGq+0uPDVKSA4WAe8+yl6w=
github.com/dymensionxyz/evmos/v12 v12.1.6-dymension-v0.3/go.mod h1:LfPv2O1HXMgETpka81Pg3nXy+U/7urq8dn85ZnSXK5Y=
github.com/dymensionxyz/gerr-cosmos v0.1.2 h1:4NiB9psF6swnWTCDYnHgHKtVEaRHuuNRuqvPGDw1BI8=
github.com/dymensionxyz/gerr-cosmos v0.1.2/go.mod h1:tXIhx3WdryAnYRISC3Weh+2xeXwaf1l4Yb1zjDUsT7k=
github.com/dymensionxyz/gerr-cosmos v1.0.0 h1:oi91rgOkpJWr41oX9JOyjvvBnhGY54tj513x8VlDAEc=
github.com/dymensionxyz/gerr-cosmos v1.0.0/go.mod h1:n+0olxPogzWqFKba45mCpvrHLGmeS8W9UZjggHnWk6c=
github.com/dymensionxyz/go-libp2p-pubsub v0.0.0-20240513081713-3ecd83c19ea2 h1:5FMEOpX5OuoRfwwjjA+LxRJXoDT0fFvg8/rlat7z8bE=
github.com/dymensionxyz/go-libp2p-pubsub v0.0.0-20240513081713-3ecd83c19ea2/go.mod h1:1OxbaT/pFRO5h+Dpze8hdHQ63R0ke55XTs6b6NwLLkw=
github.com/dymensionxyz/rpc v1.3.1 h1:7EXWIobaBes5zldRvTIg7TmNsEKjicrWA/OjCc0NaGs=
Expand Down
Loading

0 comments on commit b09c75a

Please sign in to comment.