From 3578a098b588f16ab7e8d07ad6f53432144e8ca6 Mon Sep 17 00:00:00 2001 From: tnasu Date: Thu, 14 Apr 2022 17:04:02 +0900 Subject: [PATCH] Improve checking tx in txsMap --- mempool/clist_mempool.go | 4 + mempool/clist_mempool_system_test.go | 209 +++++++++++++++++++++++++++ mempool/errors.go | 4 +- 3 files changed, 216 insertions(+), 1 deletion(-) create mode 100644 mempool/clist_mempool_system_test.go diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 3fd21ba8f..2fcf3d416 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -297,6 +297,10 @@ func (mem *CListMempool) checkTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func // CONTRACT: `caller` should held `mem.updateMtx.RLock()` func (mem *CListMempool) prepareCheckTx(tx types.Tx, txInfo TxInfo) error { + if _, ok := mem.txsMap.Load(TxKey(tx)); ok { + return ErrTxInMap + } + txSize := len(tx) if err := mem.isFull(txSize); err != nil { diff --git a/mempool/clist_mempool_system_test.go b/mempool/clist_mempool_system_test.go new file mode 100644 index 000000000..07c2f20bf --- /dev/null +++ b/mempool/clist_mempool_system_test.go @@ -0,0 +1,209 @@ +package mempool + +import ( + "context" + "math/rand" + "os" + "strconv" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/line/ostracon/abci/example/counter" + abci "github.com/line/ostracon/abci/types" + "github.com/line/ostracon/config" + "github.com/line/ostracon/libs/log" + "github.com/line/ostracon/proxy" + "github.com/line/ostracon/types" + "github.com/stretchr/testify/require" +) + +func setupCListMempool(ctx context.Context, t testing.TB, + height int64, size, cacheSize int) *CListMempool { + t.Helper() + + var cancel context.CancelFunc + _, cancel = context.WithCancel(ctx) + + cfg := config.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|")) + cfg.Mempool = config.DefaultMempoolConfig() + logLevel, _ := log.AllowLevel("info") + logger := log.NewFilter(log.NewOCLogger(log.NewSyncWriter(os.Stdout)), logLevel) + + appConn := proxy.NewAppConns(proxy.NewLocalClientCreator(counter.NewApplication(false))) + require.NoError(t, appConn.Start()) + + t.Cleanup(func() { + os.RemoveAll(cfg.RootDir) + cancel() + appConn.Stop() // nolint: errcheck // ignore + }) + + if size > -1 { + cfg.Mempool.Size = size + } + if cacheSize > -1 { + cfg.Mempool.CacheSize = cacheSize + } + mem := NewCListMempool(cfg.Mempool, appConn.Mempool(), height) + mem.SetLogger(logger) + return mem +} + +func TestCListMempool_SystemTestWithCacheSizeDefault(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mem := setupCListMempool(ctx, t, 1, -1, -1) // size=5000, cacheSize=10000 + recvTxCnt := &receiveTxCounter{} + stop := make(chan struct{}, 1) + go gossipRoutine(ctx, t, mem, recvTxCnt, stop) + makeBlocksAndCommits(ctx, t, mem) + close(stop) + expected := int64(0) + actual := recvTxCnt.threadSafeCopy() + require.NotEqual(t, expected, actual.sent, "actual", actual.sent) + require.NotEqual(t, expected, actual.success, "actual", actual.success) + require.NotEqual(t, expected, actual.failInMap, "actual", actual.failInMap) + require.NotEqual(t, expected, actual.failInCache, "actual", actual.failInCache) + require.Equal(t, expected, actual.failTooLarge, "actual", actual.failTooLarge) + require.NotEqual(t, expected, actual.failIsFull, "actual", actual.failIsFull) + require.Equal(t, expected, actual.failPreCheck, "actual", actual.failPreCheck) + require.Equal(t, expected, actual.abciFail, "actual", actual.abciFail) +} + +func createProposalBlockAndDeliverTxs( + mem *CListMempool, height int64) (*types.Block, []*abci.ResponseDeliverTx) { + // mempool.lock/unlock in ReapMaxBytesMaxGasMaxTxs + txs := mem.ReapMaxBytesMaxGasMaxTxs(mem.config.MaxTxsBytes, 0, int64(mem.config.Size)) + block := types.MakeBlock(height, txs, nil, nil) + deliverTxResponses := make([]*abci.ResponseDeliverTx, len(block.Txs)) + for i, tx := range block.Txs { + deliverTxResponses[i] = &abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK, + Data: tx, + } + } + return block, deliverTxResponses +} + +func commitBlock(ctx context.Context, t testing.TB, + mem *CListMempool, block *types.Block, deliverTxResponses []*abci.ResponseDeliverTx) { + mem.Lock() + defer mem.Unlock() + err := mem.Update(block, deliverTxResponses, nil) + require.NoError(t, err) +} + +func receiveTx(ctx context.Context, t testing.TB, mem *CListMempool, tx []byte, receiveTxCounter *receiveTxCounter) { + atomic.AddInt64(&receiveTxCounter.sent, 1) + txInfo := TxInfo{} + // mempool.lock/unlock in CheckTxAsync + mem.CheckTxAsync(tx, txInfo, + func(err error) { + if err != nil { + switch err { + case ErrTxInCache: + atomic.AddInt64(&receiveTxCounter.failInCache, 1) + case ErrTxInMap: + atomic.AddInt64(&receiveTxCounter.failInMap, 1) + } + switch err.(type) { + case ErrTxTooLarge: + atomic.AddInt64(&receiveTxCounter.failTooLarge, 1) + case ErrMempoolIsFull: + atomic.AddInt64(&receiveTxCounter.failIsFull, 1) + case ErrPreCheck: + atomic.AddInt64(&receiveTxCounter.failPreCheck, 1) + } + } + }, + func(res *abci.Response) { + resCheckTx := res.GetCheckTx() + if resCheckTx.Code != abci.CodeTypeOK && len(resCheckTx.Log) != 0 { + atomic.AddInt64(&receiveTxCounter.abciFail, 1) + } else { + atomic.AddInt64(&receiveTxCounter.success, 1) + } + }) +} + +type receiveTxCounter struct { + sent int64 + success int64 + failInMap int64 + failInCache int64 + failTooLarge int64 + failIsFull int64 + failPreCheck int64 + abciFail int64 +} + +func (r *receiveTxCounter) threadSafeCopy() receiveTxCounter { + return receiveTxCounter{ + sent: atomic.LoadInt64(&r.sent), + success: atomic.LoadInt64(&r.success), + failInMap: atomic.LoadInt64(&r.failInMap), + failInCache: atomic.LoadInt64(&r.failInCache), + failTooLarge: atomic.LoadInt64(&r.failTooLarge), + failIsFull: atomic.LoadInt64(&r.failIsFull), + failPreCheck: atomic.LoadInt64(&r.failPreCheck), + abciFail: atomic.LoadInt64(&r.abciFail), + } +} + +func gossipRoutine(ctx context.Context, t testing.TB, mem *CListMempool, + receiveTxCounter *receiveTxCounter, stop chan struct{}) { + for i := 0; i < nodeNum; i++ { + select { + case <-stop: + return + default: + go receiveRoutine(ctx, t, mem, receiveTxCounter, stop) + } + } +} + +func receiveRoutine(ctx context.Context, t testing.TB, mem *CListMempool, + receiveTxCounter *receiveTxCounter, stop chan struct{}) { + for { + select { + case <-stop: + return + default: + tx := []byte(strconv.Itoa(rand.Intn(mem.config.CacheSize * 2))) + // mempool.lock/unlock in CheckTxAsync + receiveTx(ctx, t, mem, tx, receiveTxCounter) + if receiveTxCounter.sent%2000 == 0 { + time.Sleep(time.Second) // for avoiding mempool full + } + } + } +} + +func makeBlocksAndCommits(ctx context.Context, t testing.TB, mem *CListMempool) { + for i := 0; i < blockNum; i++ { + block, deliverTxResponses := createProposalBlockAndDeliverTxs(mem, int64(i+1)) + time.Sleep(randQuadraticCurveInterval(deliveredTimeMin, deliveredTimeMax, deliveredTimeRadix)) + commitBlock(ctx, t, mem, block, deliverTxResponses) + time.Sleep(randQuadraticCurveInterval(blockIntervalMin, blockIntervalMax, blockIntervalRadix)) + } +} + +const ( + nodeNum = 1 + blockNum = 10 + blockIntervalMin = 1.0 // second + blockIntervalMax = 1.0 // second + blockIntervalRadix = 0.1 + deliveredTimeMin = 2.0 // second + deliveredTimeMax = 10.0 // second + deliveredTimeRadix = 0.1 +) + +func randQuadraticCurveInterval(min, max, radix float64) time.Duration { + rand.Seed(time.Now().UnixNano()) + x := rand.Float64()*(max-min) + min + y := (x * x) * radix + return time.Duration(y*1000) * time.Millisecond +} diff --git a/mempool/errors.go b/mempool/errors.go index 00f288ac8..be520742e 100644 --- a/mempool/errors.go +++ b/mempool/errors.go @@ -6,7 +6,9 @@ import ( ) var ( - // ErrTxInCache is returned to the client if we saw tx earlier + // ErrTxInMap is returned to the client if we saw tx earlier in txsMap + ErrTxInMap = errors.New("tx already exists in txsMap") + // ErrTxInCache is returned to the client if we saw tx earlier in cache ErrTxInCache = errors.New("tx already exists in cache") )