diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 3fd21ba8f..2bf1bd3eb 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -297,6 +297,10 @@ func (mem *CListMempool) checkTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func // CONTRACT: `caller` should held `mem.updateMtx.RLock()` func (mem *CListMempool) prepareCheckTx(tx types.Tx, txInfo TxInfo) error { + if _, ok := mem.txsMap.Load(TxKey(tx)); ok { + return ErrTxInMap + } + txSize := len(tx) if err := mem.isFull(txSize); err != nil { @@ -631,6 +635,10 @@ func (mem *CListMempool) ReapMaxBytesMaxGasMaxTxs(maxBytes, maxGas, maxTxs int64 protoTxs := tmproto.Data{} for e := mem.txs.Front(); e != nil && len(txs) < int(maxTxs); e = e.Next() { memTx := e.Value.(*mempoolTx) + if _, ok := mem.txsMap.Load(TxKey(memTx.tx)); !ok { + mem.logger.Error(fmt.Sprintf("already removed on ReapMaxBytesMaxGasMaxTxs: tx=%s, height=%d", memTx.tx, mem.height)) + continue + } protoTxs.Txs = append(protoTxs.Txs, memTx.tx) // Check total size requirement @@ -750,9 +758,14 @@ func (mem *CListMempool) recheckTxs() { // Push txs to proxyAppConn // NOTE: globalCb may be called concurrently. for e := mem.txs.Front(); e != nil; e = e.Next() { + memTx := e.Value.(*mempoolTx) + if _, ok := mem.txsMap.Load(TxKey(memTx.tx)); !ok { + mem.logger.Error(fmt.Sprintf("already removed on recheckTxs: tx=%s, height=%d", memTx.tx, mem.height)) + continue + } + wg.Add(1) - memTx := e.Value.(*mempoolTx) req := abci.RequestCheckTx{ Tx: memTx.tx, Type: abci.CheckTxType_Recheck, diff --git a/mempool/clist_mempool_system_test.go b/mempool/clist_mempool_system_test.go new file mode 100644 index 000000000..dfe2fe9e7 --- /dev/null +++ b/mempool/clist_mempool_system_test.go @@ -0,0 +1,195 @@ +package mempool + +import ( + "context" + "math/rand" + "os" + "strconv" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/line/ostracon/abci/example/counter" + abci "github.com/line/ostracon/abci/types" + "github.com/line/ostracon/config" + "github.com/line/ostracon/libs/log" + "github.com/line/ostracon/proxy" + "github.com/line/ostracon/types" + "github.com/stretchr/testify/require" +) + +func setupCListMempool(ctx context.Context, t testing.TB, + height int64, size, cacheSize int) *CListMempool { + t.Helper() + + var cancel context.CancelFunc + _, cancel = context.WithCancel(ctx) + + cfg := config.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|")) + cfg.Mempool = config.DefaultMempoolConfig() + logLevel, _ := log.AllowLevel("info") + logger := log.NewFilter(log.NewOCLogger(log.NewSyncWriter(os.Stdout)), logLevel) + + appConn := proxy.NewAppConns(proxy.NewLocalClientCreator(counter.NewApplication(false))) + require.NoError(t, appConn.Start()) + + t.Cleanup(func() { + os.RemoveAll(cfg.RootDir) + cancel() + appConn.Stop() // nolint: errcheck // ignore + }) + + if size > -1 { + cfg.Mempool.Size = size + } + if cacheSize > -1 { + cfg.Mempool.CacheSize = cacheSize + } + mem := NewCListMempool(cfg.Mempool, appConn.Mempool(), height) + mem.SetLogger(logger) + return mem +} + +func TestCListMempool_SystemTestWithCacheSizeDefault(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mem := setupCListMempool(ctx, t, 1, -1, -1) // size=5000, cacheSize=10000 + recvTxCnt := &receiveTxCounter{} + stop := make(chan struct{}, 1) + go gossipRoutine(ctx, t, mem, recvTxCnt, stop) + makeBlocksAndCommits(ctx, t, mem) + close(stop) + expected := int64(0) + require.NotEqual(t, expected, recvTxCnt.sent, recvTxCnt.sent) + require.NotEqual(t, expected, recvTxCnt.success, recvTxCnt.success) + require.NotEqual(t, expected, recvTxCnt.failInMap, recvTxCnt.failInMap) + require.NotEqual(t, expected, recvTxCnt.failInCache, recvTxCnt.failInCache) + require.Equal(t, expected, recvTxCnt.failTooLarge, recvTxCnt.failTooLarge) + require.NotEqual(t, expected, recvTxCnt.failIsFull, recvTxCnt.failIsFull) + require.Equal(t, expected, recvTxCnt.failPreCheck, recvTxCnt.failPreCheck) + require.Equal(t, expected, recvTxCnt.abciFail, recvTxCnt.abciFail) +} + +func createProposalBlockAndDeliverTxs( + mem *CListMempool, height int64) (*types.Block, []*abci.ResponseDeliverTx) { + // mempool.lock/unlock in ReapMaxBytesMaxGasMaxTxs + txs := mem.ReapMaxBytesMaxGasMaxTxs(mem.config.MaxTxsBytes, 0, int64(mem.config.Size)) + block := types.MakeBlock(height, txs, nil, nil) + deliverTxResponses := make([]*abci.ResponseDeliverTx, len(block.Txs)) + for i, tx := range block.Txs { + deliverTxResponses[i] = &abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK, + Data: tx, + } + } + return block, deliverTxResponses +} + +func commitBlock(ctx context.Context, t testing.TB, + mem *CListMempool, block *types.Block, deliverTxResponses []*abci.ResponseDeliverTx) { + mem.Lock() + defer mem.Unlock() + err := mem.Update(block, deliverTxResponses, nil) + require.NoError(t, err) +} + +func receiveTx(ctx context.Context, t testing.TB, mem *CListMempool, tx []byte, receiveTxCounter *receiveTxCounter) { + atomic.AddInt64(&receiveTxCounter.sent, 1) + txInfo := TxInfo{} + // mempool.lock/unlock in CheckTxAsync + mem.CheckTxAsync(tx, txInfo, + func(err error) { + if err != nil { + switch err { + case ErrTxInCache: + atomic.AddInt64(&receiveTxCounter.failInCache, 1) + case ErrTxInMap: + atomic.AddInt64(&receiveTxCounter.failInMap, 1) + } + switch err.(type) { + case ErrTxTooLarge: + atomic.AddInt64(&receiveTxCounter.failTooLarge, 1) + case ErrMempoolIsFull: + atomic.AddInt64(&receiveTxCounter.failIsFull, 1) + case ErrPreCheck: + atomic.AddInt64(&receiveTxCounter.failPreCheck, 1) + } + } + }, + func(res *abci.Response) { + resCheckTx := res.GetCheckTx() + if resCheckTx.Code != abci.CodeTypeOK && len(resCheckTx.Log) != 0 { + atomic.AddInt64(&receiveTxCounter.abciFail, 1) + } else { + atomic.AddInt64(&receiveTxCounter.success, 1) + } + }) +} + +type receiveTxCounter struct { + sent int64 + success int64 + failInMap int64 + failInCache int64 + failTooLarge int64 + failIsFull int64 + failPreCheck int64 + abciFail int64 +} + +func gossipRoutine(ctx context.Context, t testing.TB, mem *CListMempool, + receiveTxCounter *receiveTxCounter, stop chan struct{}) { + for i := 0; i < nodeNum; i++ { + select { + case <-stop: + return + default: + go receiveRoutine(ctx, t, mem, receiveTxCounter, stop) + } + } +} + +func receiveRoutine(ctx context.Context, t testing.TB, mem *CListMempool, + receiveTxCounter *receiveTxCounter, stop chan struct{}) { + for { + select { + case <-stop: + return + default: + tx := []byte(strconv.Itoa(rand.Intn(mem.config.CacheSize * 2))) + // mempool.lock/unlock in CheckTxAsync + receiveTx(ctx, t, mem, tx, receiveTxCounter) + if receiveTxCounter.sent%2000 == 0 { + time.Sleep(time.Second) // for avoiding mempool full + } + } + } +} + +func makeBlocksAndCommits(ctx context.Context, t testing.TB, mem *CListMempool) { + for i := 0; i < blockNum; i++ { + block, deliverTxResponses := createProposalBlockAndDeliverTxs(mem, int64(i+1)) + time.Sleep(randQuadraticCurveInterval(deliveredTimeMin, deliveredTimeMax, deliveredTimeRadix)) + commitBlock(ctx, t, mem, block, deliverTxResponses) + time.Sleep(randQuadraticCurveInterval(blockIntervalMin, blockIntervalMax, blockIntervalRadix)) + } +} + +const ( + nodeNum = 1 + blockNum = 15 + blockIntervalMin = 1.0 // second + blockIntervalMax = 1.0 // second + blockIntervalRadix = 0.1 + deliveredTimeMin = 2.0 // second + deliveredTimeMax = 10.0 // second + deliveredTimeRadix = 0.1 +) + +func randQuadraticCurveInterval(min, max, radix float64) time.Duration { + rand.Seed(time.Now().UnixNano()) + x := rand.Float64()*(max-min) + min + y := (x * x) * radix + return time.Duration(y*1000) * time.Millisecond +} diff --git a/mempool/errors.go b/mempool/errors.go index 00f288ac8..be520742e 100644 --- a/mempool/errors.go +++ b/mempool/errors.go @@ -6,7 +6,9 @@ import ( ) var ( - // ErrTxInCache is returned to the client if we saw tx earlier + // ErrTxInMap is returned to the client if we saw tx earlier in txsMap + ErrTxInMap = errors.New("tx already exists in txsMap") + // ErrTxInCache is returned to the client if we saw tx earlier in cache ErrTxInCache = errors.New("tx already exists in cache") )