Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(submission): fix counting and time #969

Merged
merged 78 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from 73 commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
7e84310
log error from manager err group
danwt Jul 18, 2024
2fb1e65
make produce loop more readable
danwt Jul 18, 2024
552b62b
tidy ups in produce loop
danwt Jul 18, 2024
b104705
renames
danwt Jul 18, 2024
40382f5
move log statement
danwt Jul 18, 2024
291cf7d
pre remove start height and end height fields
danwt Jul 18, 2024
a714bdb
make batch start and end height into functions, fix builds but not tests
danwt Jul 18, 2024
c4d1c4f
delete grey avail test
danwt Jul 18, 2024
6c15052
fixes tests
danwt Jul 18, 2024
a8ea801
tests pass, pre remove unneeded nil out
danwt Jul 18, 2024
fb59aa8
cleanup
danwt Jul 18, 2024
8594afc
pre fix the pattern
danwt Jul 18, 2024
e01f45d
lfg
danwt Jul 18, 2024
82c4c80
start to add the allow production channel
danwt Jul 18, 2024
acb7921
CreateNextBatchToSubmit is no longer a method
danwt Jul 18, 2024
b89ce10
todo
danwt Jul 18, 2024
3230c3f
refactor without functional change
danwt Jul 18, 2024
f070b6a
try
danwt Jul 18, 2024
b91ead0
change sign
danwt Jul 18, 2024
4a0eea1
working on tests
danwt Jul 18, 2024
fe0baae
gonna move the event to the production side
danwt Jul 18, 2024
142063b
move the production stop logs to the production thread
danwt Jul 18, 2024
b3e98a4
need to check go channel behavior
danwt Jul 18, 2024
9a4be77
tests pass
danwt Jul 18, 2024
f508d77
fix most tests but have failure in TestCreateNextDABatchWithBytesLimit
danwt Jul 18, 2024
9b5b982
fix log statements
danwt Jul 18, 2024
5111094
tweaks
danwt Jul 18, 2024
45deb44
cp
zale144 Jul 18, 2024
d3e27a2
Merge branch 'main' into danwt/963-fix-production-loop-0001
danwt Jul 18, 2024
8b2967b
add proper handling
danwt Jul 19, 2024
5bdad3a
need to test this thing properly
danwt Jul 19, 2024
f1a3ff3
lets really get this working
danwt Jul 19, 2024
c4fa01c
factor out a unit testable core
danwt Jul 19, 2024
71ecf91
gonna change it a bit
danwt Jul 19, 2024
824b77b
working on it
danwt Jul 19, 2024
b07e21d
gonna change to uint64
danwt Jul 19, 2024
0068ee4
need to finish off the test
danwt Jul 19, 2024
81e6d26
gonna check pruning
danwt Jul 19, 2024
0c3d95a
cp
danwt Jul 19, 2024
fb61910
bump test
danwt Jul 19, 2024
f150a85
why tho
danwt Jul 19, 2024
ca4e437
wow
danwt Jul 19, 2024
4d28ad9
cp
danwt Jul 19, 2024
599c6ac
why tho
danwt Jul 19, 2024
3d8773b
can I change teh deisgn a bit
danwt Jul 19, 2024
12fd608
lfg
danwt Jul 19, 2024
bc1bf46
can I write a rapid test
danwt Jul 19, 2024
e448914
gonna make the skew a hard invariant
danwt Jul 19, 2024
3fc4740
can I get ride of the counter waker
danwt Jul 19, 2024
cc47c39
gonna remove panic
danwt Jul 19, 2024
edffef1
lets see
danwt Jul 19, 2024
05fa2b0
gonna split into two
danwt Jul 19, 2024
ae0f825
gonna wrap it up
danwt Jul 19, 2024
8322173
going to do a long test without the bonus ticker line
danwt Jul 19, 2024
520febe
long test
danwt Jul 19, 2024
b3c2fee
cp
danwt Jul 19, 2024
5021a7f
test some requirements
danwt Jul 19, 2024
4cf9fce
remove pending bytes re
danwt Jul 19, 2024
9b1bd07
why so many produced
danwt Jul 19, 2024
151f0d8
need to refine the submission test
danwt Jul 19, 2024
7e9eb6d
test is ok for now
danwt Jul 19, 2024
a64c728
lets do timer
danwt Jul 19, 2024
4eb84ea
adds the time check
danwt Jul 19, 2024
cae267d
delets the time check todo
danwt Jul 19, 2024
0244c29
just need metrics
danwt Jul 19, 2024
af80f99
adds metrics
danwt Jul 19, 2024
3c0c9f4
rename estimate method name
danwt Jul 22, 2024
fb9ec8e
better naming
danwt Jul 22, 2024
dc0aa8d
make cond more readable
danwt Jul 22, 2024
5c75955
more style suggestion
danwt Jul 22, 2024
d23cba5
make method a manager method
danwt Jul 22, 2024
8509b1b
bump last
danwt Jul 22, 2024
dd6dafa
make sub utils panic if deadline exceeded
danwt Jul 22, 2024
e984d29
add ticker commment
danwt Jul 22, 2024
bf63cf5
trying to get rid of chan size 1 constraint
danwt Jul 22, 2024
0d8fe37
use chan without size 1
danwt Jul 22, 2024
9742595
fix test
danwt Jul 22, 2024
01a58df
move comment
danwt Jul 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 11 additions & 24 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@
DAClient da.DataAvailabilityLayerClient
SLClient settlement.ClientI

/*
Production
*/
producedSizeC chan uint64 // for the producer to report the size of the block+commit it produced

/*
Submission
*/
Expand Down Expand Up @@ -114,7 +109,6 @@
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
targetSyncHeight: diodes.NewOneToOne(1, nil),
producedSizeC: make(chan uint64),
logger: logger,
blockCache: make(map[uint64]CachedBlock),
}
Expand Down Expand Up @@ -163,11 +157,14 @@
if isSequencer {
// Sequencer must wait till DA is synced to start submitting blobs
<-m.DAClient.Synced()

bytesProducedC := make(chan int, 1)
danwt marked this conversation as resolved.
Show resolved Hide resolved
bytesProducedC <- m.GetUnsubmittedBytes()
eg.Go(func() error {
return m.SubmitLoop(ctx)
return m.SubmitLoop(ctx, bytesProducedC)
})
eg.Go(func() error {
return m.ProduceBlockLoop(ctx)
return m.ProduceBlockLoop(ctx, bytesProducedC)
})
} else {
eg.Go(func() error {
Expand All @@ -177,6 +174,12 @@
return m.SyncToTargetHeightLoop(ctx)
})
}

go func() {
err := eg.Wait()
m.logger.Info("Block manager err group finished.", "err", err)
}()
Comment on lines +180 to +183

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism

return nil
}

Expand Down Expand Up @@ -220,19 +223,3 @@
m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSubmittedHeight.Load())
return nil
}

func (m *Manager) MustLoadBlock(h uint64) *types.Block {
ret, err := m.Store.LoadBlock(h)
if err != nil {
panic(fmt.Errorf("store load block: height: %d: %w", h, err))
}
return ret
}

func (m *Manager) MustLoadCommit(h uint64) *types.Commit {
ret, err := m.Store.LoadCommit(h)
if err != nil {
panic(fmt.Errorf("store load commit: height: %d: %w", h, err))
}
return ret
}
30 changes: 15 additions & 15 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
assert.Equal(t, daResultSubmitBatch.Code, da.StatusSuccess)
err = manager.SLClient.SubmitBatch(batch, manager.DAClient.GetClientType(), &daResultSubmitBatch)
require.NoError(t, err)
nextBatchStartHeight = batch.EndHeight + 1
nextBatchStartHeight = batch.EndHeight() + 1
// Wait until daHeight is updated
time.Sleep(time.Millisecond * 500)
}
Expand All @@ -148,9 +148,9 @@
assert.NoError(t, err)
}()
<-ctx.Done()
assert.Equal(t, batch.EndHeight, manager.LastSubmittedHeight.Load())
assert.Equal(t, batch.EndHeight(), manager.LastSubmittedHeight.Load())
// validate that we produced blocks
assert.Greater(t, manager.State.Height(), batch.EndHeight)
assert.Greater(t, manager.State.Height(), batch.EndHeight())
}

func TestRetrieveDaBatchesFailed(t *testing.T) {
Expand Down Expand Up @@ -319,7 +319,7 @@
require.NoError(err)
// Init manager
managerConfig := testutil.GetManagerConfig()
managerConfig.BlockBatchMaxSizeBytes = batchLimitBytes // enough for 2 block, not enough for 10 blocks
managerConfig.BatchMaxSizeBytes = batchLimitBytes // enough for 2 block, not enough for 10 blocks
manager, err := testutil.GetManager(managerConfig, nil, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)

Expand Down Expand Up @@ -354,23 +354,23 @@
// Call createNextDABatch function
startHeight := manager.NextHeightToSubmit()
endHeight := startHeight + uint64(tc.blocksToProduce) - 1
batch, err := manager.CreateNextBatchToSubmit(startHeight, endHeight)
batch, err := block.CreateBatch(manager.Store, manager.Conf.BatchMaxSizeBytes, startHeight, endHeight)

Check failure on line 357 in block/manager_test.go

View workflow job for this annotation

GitHub Actions / build

undefined: block.CreateBatch
assert.NoError(err)

assert.Equal(batch.StartHeight, startHeight)
assert.LessOrEqual(batch.ToProto().Size(), int(managerConfig.BlockBatchMaxSizeBytes))
assert.Equal(batch.StartHeight(), startHeight)
assert.LessOrEqual(batch.SizeBytes(), int(managerConfig.BatchMaxSizeBytes))

if !tc.expectedToBeTruncated {
assert.Equal(batch.EndHeight, endHeight)
assert.Equal(batch.EndHeight(), endHeight)
} else {
assert.Equal(batch.EndHeight, batch.StartHeight+uint64(len(batch.Blocks))-1)
assert.Less(batch.EndHeight, endHeight)
assert.Equal(batch.EndHeight(), batch.StartHeight()+batch.NumBlocks()-1)
assert.Less(batch.EndHeight(), endHeight)

// validate next added block to batch would have been actually too big
// First relax the byte limit so we could proudce larger batch
manager.Conf.BlockBatchMaxSizeBytes = 10 * manager.Conf.BlockBatchMaxSizeBytes
newBatch, err := manager.CreateNextBatchToSubmit(startHeight, batch.EndHeight+1)
assert.Greater(newBatch.ToProto().Size(), batchLimitBytes)
// First relax the byte limit so we could produce larger batch
manager.Conf.BatchMaxSizeBytes = 10 * manager.Conf.BatchMaxSizeBytes
newBatch, err := block.CreateBatch(manager.Store, manager.Conf.BatchMaxSizeBytes, startHeight, batch.EndHeight()+1)

Check failure on line 372 in block/manager_test.go

View workflow job for this annotation

GitHub Actions / build

undefined: block.CreateBatch
assert.Greater(newBatch.SizeBytes(), batchLimitBytes)

assert.NoError(err)
}
Expand Down Expand Up @@ -431,7 +431,7 @@
t.Run(c.name, func(t *testing.T) {
app.On("Commit", mock.Anything).Return(abci.ResponseCommit{Data: commitHash[:]}).Once()
app.On("Info", mock.Anything).Return(abci.ResponseInfo{
LastBlockHeight: int64(batch.EndHeight),
LastBlockHeight: int64(batch.EndHeight()),
LastBlockAppHash: commitHash[:],
})
err := manager.ProcessNextDABatch(c.daMetaData)
Expand Down
53 changes: 30 additions & 23 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
)

// ProduceBlockLoop is calling publishBlock in a loop as long as we're synced.
func (m *Manager) ProduceBlockLoop(ctx context.Context) (err error) {
// A signal will be sent to the bytesProduced channel for each block produced
// In this way it's possible to pause block production by not consuming the channel
func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int) error {
m.logger.Info("Started block producer loop.")

ticker := time.NewTicker(m.Conf.BlockTime)
Expand All @@ -36,51 +38,56 @@
for {
select {
case <-ctx.Done():
return
return nil
case <-ticker.C:
// if empty blocks are configured to be enabled, and one is scheduled...
produceEmptyBlock := firstBlock || 0 == m.Conf.MaxIdleTime || nextEmptyBlock.Before(time.Now())
firstBlock = false

var (
block *types.Block
commit *types.Commit
)
block, commit, err = m.ProduceAndGossipBlock(ctx, produceEmptyBlock)
block, commit, err := m.ProduceAndGossipBlock(ctx, produceEmptyBlock)
if errors.Is(err, context.Canceled) {
m.logger.Error("Produce and gossip: context canceled.", "error", err)
return
return nil
}
if errors.Is(err, types.ErrSkippedEmptyBlock) {
if errors.Is(err, types.ErrEmptyBlock) { // occurs if the block was empty but we don't want to produce one
continue
}
if errors.Is(err, ErrNonRecoverable) {
m.logger.Error("Produce and gossip: non-recoverable.", "error", err) // TODO: flush? or don't log at all?
uevent.MustPublish(ctx, m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList)
return
return err
}
if err != nil {
m.logger.Error("Produce and gossip: uncategorized, assuming recoverable.", "error", err)
continue
}

// If IBC transactions are present, set proof required to true
// This will set a shorter timer for the next block
// currently we set it for all txs as we don't have a way to determine if an IBC tx is present (https://github.com/dymensionxyz/dymint/issues/709)
nextEmptyBlock = time.Now().Add(m.Conf.MaxIdleTime)
if 0 < len(block.Data.Txs) {
// the block wasn't empty so we want to make sure we don't wait too long before producing another one, in order to facilitate proofs for ibc
// TODO: optimize to only do this if IBC transactions are present (https://github.com/dymensionxyz/dymint/issues/709)
nextEmptyBlock = time.Now().Add(m.Conf.MaxProofTime)
} else {
m.logger.Info("Produced empty block.")
}

// Send the size to the accumulated size channel
// This will block in case the submitter is too slow and it's buffer is full
size := uint64(block.ToProto().Size()) + uint64(commit.ToProto().Size())
pause := len(bytesProducedC) == cap(bytesProducedC) // here we assume cap is at least 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think something like this would be more concise and easier to understand:

select {
	case <-ctx.Done():
		return nil
	case bytesProducedC <- block.SizeBytes() + commit.SizeBytes():
	default:
		evt := &events.DataHealthStatus{Error: fmt.Errorf("bytes produced channel is full: %w", gerrc.ErrResourceExhausted)}
		uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
		m.logger.Error("Enough bytes to build a batch have been accumulated, but too many batches are pending submission. Pausing block production until a signal is consumed.")

		bytesProducedC <- block.SizeBytes() + commit.SizeBytes()
		evt = &events.DataHealthStatus{Error: nil}
		uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
		m.logger.Info("Resumed block production.")
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks wrong because you are not waiting on context on the second bytesProducedC <- block.SizeBytes() + commit.SizeBytes() line

if pause {
evt := &events.DataHealthStatus{Error: fmt.Errorf("bytes produced channel is full: %w", gerrc.ErrResourceExhausted)}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
m.logger.Error("Enough bytes to build a batch have been accumulated, but too many batches are pending submission." +
"Pausing block production until a signal is consumed.")
}

select {
case <-ctx.Done():
return
case m.producedSizeC <- size:
return nil
case bytesProducedC <- block.SizeBytes() + commit.SizeBytes():
}

if pause {
evt := &events.DataHealthStatus{Error: nil}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
m.logger.Info("Resumed block production.")
}
}
}
Expand Down Expand Up @@ -139,10 +146,10 @@
return nil, nil, fmt.Errorf("load block: height: %d: %w: %w", newHeight, err, ErrNonRecoverable)
} else {
// limit to the max block data, so we don't create a block that is too big to fit in a batch
maxBlockDataSize := uint64(float64(m.Conf.BlockBatchMaxSizeBytes) * types.MaxBlockSizeAdjustment)
maxBlockDataSize := uint64(float64(m.Conf.BatchMaxSizeBytes) * types.MaxBlockSizeAdjustment)

Check notice

Code scanning / CodeQL

Floating point arithmetic Note

Floating point arithmetic operations are not associative and a possible source of non-determinism
block = m.Executor.CreateBlock(newHeight, lastCommit, lastHeaderHash, m.State, maxBlockDataSize)
if !allowEmpty && len(block.Data.Txs) == 0 {
return nil, nil, fmt.Errorf("%w: %w", types.ErrSkippedEmptyBlock, ErrRecoverable)
return nil, nil, fmt.Errorf("%w: %w", types.ErrEmptyBlock, ErrRecoverable)
}

abciHeaderPb := types.ToABCIHeaderPB(&block.Header)
Expand Down Expand Up @@ -201,8 +208,8 @@
v := vote.ToProto()
// convert libp2p key to tm key
// TODO: move to types
raw_key, _ := m.LocalKey.Raw()
tmprivkey := tmed25519.PrivKey(raw_key)
rawKey, _ := m.LocalKey.Raw()
tmprivkey := tmed25519.PrivKey(rawKey)
tmprivkey.PubKey().Bytes()
// Create a mock validator to sign the vote
tmvalidator := tmtypes.NewMockPVWithParams(tmprivkey, false, false)
Expand Down
29 changes: 13 additions & 16 deletions block/production_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/dymensionxyz/dymint/mempool"
mempoolv1 "github.com/dymensionxyz/dymint/mempool/v1"
"github.com/dymensionxyz/dymint/node/events"
uchannel "github.com/dymensionxyz/dymint/utils/channel"
uevent "github.com/dymensionxyz/dymint/utils/event"
tmcfg "github.com/tendermint/tendermint/config"

Expand Down Expand Up @@ -57,13 +58,11 @@ func TestCreateEmptyBlocksEnableDisable(t *testing.T) {

mCtx, cancel := context.WithTimeout(context.Background(), runTime)
defer cancel()
go manager.ProduceBlockLoop(mCtx)
go managerWithEmptyBlocks.ProduceBlockLoop(mCtx)

buf1 := make(chan struct{}, 100) // dummy to avoid unhealthy event
buf2 := make(chan struct{}, 100) // dummy to avoid unhealthy event
go manager.AccumulatedDataLoop(mCtx, buf1)
go managerWithEmptyBlocks.AccumulatedDataLoop(mCtx, buf2)
bytesProduced1 := make(chan int)
bytesProduced2 := make(chan int)
go manager.ProduceBlockLoop(mCtx, bytesProduced1)
go managerWithEmptyBlocks.ProduceBlockLoop(mCtx, bytesProduced2)
uchannel.DrainForever(bytesProduced1, bytesProduced2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is draining needed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to stop the producer being blocked

<-mCtx.Done()

require.Greater(manager.State.Height(), initialHeight)
Expand Down Expand Up @@ -143,7 +142,9 @@ func TestCreateEmptyBlocksNew(t *testing.T) {

mCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go manager.ProduceBlockLoop(mCtx)
bytesProduced := make(chan int)
go manager.ProduceBlockLoop(mCtx, bytesProduced)
uchannel.DrainForever(bytesProduced)

<-time.Tick(1 * time.Second)
err = mpool.CheckTx([]byte{1, 2, 3, 4}, nil, mempool.TxInfo{})
Expand Down Expand Up @@ -182,7 +183,7 @@ func TestStopBlockProduction(t *testing.T) {
require := require.New(t)

managerConfig := testutil.GetManagerConfig()
managerConfig.BlockBatchMaxSizeBytes = 1000 // small batch size to fill up quickly
managerConfig.BatchMaxSizeBytes = 1000 // small batch size to fill up quickly
manager, err := testutil.GetManager(managerConfig, nil, nil, 1, 1, 0, nil, nil)
require.NoError(err)

Expand All @@ -201,14 +202,10 @@ func TestStopBlockProduction(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

go func() {
manager.ProduceBlockLoop(ctx)
wg.Done() // Decrease counter when this goroutine finishes
}()
bytesProducedC := make(chan int)

toSubmit := make(chan struct{})
go func() {
manager.AccumulatedDataLoop(ctx, toSubmit)
manager.ProduceBlockLoop(ctx, bytesProducedC)
wg.Done() // Decrease counter when this goroutine finishes
}()

Expand All @@ -232,7 +229,7 @@ func TestStopBlockProduction(t *testing.T) {
assert.Equal(stoppedHeight, manager.State.Height())

// consume the signal
<-toSubmit
<-bytesProducedC

// check for health status event and block production to continue
select {
Expand Down
2 changes: 1 addition & 1 deletion block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (e *Executor) UpdateMempoolAfterInitChain(s *types.State) {
e.mempool.SetPostCheckFn(mempool.PostCheckMaxGas(s.ConsensusParams.Block.MaxGas))
}

// Update state from Commit response
// UpdateStateAfterCommit using commit response
func (e *Executor) UpdateStateAfterCommit(s *types.State, resp *tmstate.ABCIResponses, appHash []byte, height uint64, valSet *tmtypes.ValidatorSet) {
copy(s.AppHash[:], appHash[:])
copy(s.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())
Expand Down
Loading
Loading