Skip to content

Commit

Permalink
Improve checking tx in txsMap
Browse files Browse the repository at this point in the history
  • Loading branch information
tnasu committed Apr 14, 2022
1 parent a2c183f commit 39c17e9
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 2 deletions.
15 changes: 14 additions & 1 deletion mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ func (mem *CListMempool) checkTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func

// CONTRACT: `caller` should held `mem.updateMtx.RLock()`
func (mem *CListMempool) prepareCheckTx(tx types.Tx, txInfo TxInfo) error {
if _, ok := mem.txsMap.Load(TxKey(tx)); ok {
return ErrTxInMap
}

txSize := len(tx)

if err := mem.isFull(txSize); err != nil {
Expand Down Expand Up @@ -631,6 +635,10 @@ func (mem *CListMempool) ReapMaxBytesMaxGasMaxTxs(maxBytes, maxGas, maxTxs int64
protoTxs := tmproto.Data{}
for e := mem.txs.Front(); e != nil && len(txs) < int(maxTxs); e = e.Next() {
memTx := e.Value.(*mempoolTx)
if _, ok := mem.txsMap.Load(TxKey(memTx.tx)); !ok {
mem.logger.Error(fmt.Sprintf("already removed on ReapMaxBytesMaxGasMaxTxs: tx=%s, height=%d", memTx.tx, mem.height))
continue
}

protoTxs.Txs = append(protoTxs.Txs, memTx.tx)
// Check total size requirement
Expand Down Expand Up @@ -750,9 +758,14 @@ func (mem *CListMempool) recheckTxs() {
// Push txs to proxyAppConn
// NOTE: globalCb may be called concurrently.
for e := mem.txs.Front(); e != nil; e = e.Next() {
memTx := e.Value.(*mempoolTx)
if _, ok := mem.txsMap.Load(TxKey(memTx.tx)); !ok {
mem.logger.Error(fmt.Sprintf("already removed on recheckTxs: tx=%s, height=%d", memTx.tx, mem.height))
continue
}

wg.Add(1)

memTx := e.Value.(*mempoolTx)
req := abci.RequestCheckTx{
Tx: memTx.tx,
Type: abci.CheckTxType_Recheck,
Expand Down
195 changes: 195 additions & 0 deletions mempool/clist_mempool_system_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package mempool

import (
"context"
"math/rand"
"os"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/line/ostracon/abci/example/counter"
abci "github.com/line/ostracon/abci/types"
"github.com/line/ostracon/config"
"github.com/line/ostracon/libs/log"
"github.com/line/ostracon/proxy"
"github.com/line/ostracon/types"
"github.com/stretchr/testify/require"
)

func setupCListMempool(ctx context.Context, t testing.TB,
height int64, size, cacheSize int) *CListMempool {
t.Helper()

var cancel context.CancelFunc
_, cancel = context.WithCancel(ctx)

cfg := config.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|"))
cfg.Mempool = config.DefaultMempoolConfig()
logLevel, _ := log.AllowLevel("info")
logger := log.NewFilter(log.NewOCLogger(log.NewSyncWriter(os.Stdout)), logLevel)

appConn := proxy.NewAppConns(proxy.NewLocalClientCreator(counter.NewApplication(false)))
require.NoError(t, appConn.Start())

t.Cleanup(func() {
os.RemoveAll(cfg.RootDir)
cancel()
appConn.Stop() // nolint: errcheck // ignore
})

if size > -1 {
cfg.Mempool.Size = size
}
if cacheSize > -1 {
cfg.Mempool.CacheSize = cacheSize
}
mem := NewCListMempool(cfg.Mempool, appConn.Mempool(), height)
mem.SetLogger(logger)
return mem
}

func TestCListMempool_SystemTestWithCacheSizeDefault(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mem := setupCListMempool(ctx, t, 1, -1, -1) // size=5000, cacheSize=10000
recvTxCnt := &receiveTxCounter{}
stop := make(chan struct{}, 1)
go gossipRoutine(ctx, t, mem, recvTxCnt, stop)
makeBlocksAndCommits(ctx, t, mem)
close(stop)
expected := int64(0)
require.NotEqual(t, expected, recvTxCnt.sent, recvTxCnt.sent)
require.NotEqual(t, expected, recvTxCnt.success, recvTxCnt.success)
require.NotEqual(t, expected, recvTxCnt.failInMap, recvTxCnt.failInMap)
require.NotEqual(t, expected, recvTxCnt.failInCache, recvTxCnt.failInCache)
require.Equal(t, expected, recvTxCnt.failTooLarge, recvTxCnt.failTooLarge)
require.NotEqual(t, expected, recvTxCnt.failIsFull, recvTxCnt.failIsFull)
require.Equal(t, expected, recvTxCnt.failPreCheck, recvTxCnt.failPreCheck)
require.Equal(t, expected, recvTxCnt.abciFail, recvTxCnt.abciFail)
}

func createProposalBlockAndDeliverTxs(
mem *CListMempool, height int64) (*types.Block, []*abci.ResponseDeliverTx) {
// mempool.lock/unlock in ReapMaxBytesMaxGasMaxTxs
txs := mem.ReapMaxBytesMaxGasMaxTxs(mem.config.MaxTxsBytes, 0, int64(mem.config.Size))
block := types.MakeBlock(height, txs, nil, nil)
deliverTxResponses := make([]*abci.ResponseDeliverTx, len(block.Txs))
for i, tx := range block.Txs {
deliverTxResponses[i] = &abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
Data: tx,
}
}
return block, deliverTxResponses
}

func commitBlock(ctx context.Context, t testing.TB,
mem *CListMempool, block *types.Block, deliverTxResponses []*abci.ResponseDeliverTx) {
mem.Lock()
defer mem.Unlock()
err := mem.Update(block, deliverTxResponses, nil)
require.NoError(t, err)
}

func receiveTx(ctx context.Context, t testing.TB, mem *CListMempool, tx []byte, receiveTxCounter *receiveTxCounter) {
atomic.AddInt64(&receiveTxCounter.sent, 1)
txInfo := TxInfo{}
// mempool.lock/unlock in CheckTxAsync
mem.CheckTxAsync(tx, txInfo,
func(err error) {
if err != nil {
switch err {
case ErrTxInCache:
atomic.AddInt64(&receiveTxCounter.failInCache, 1)
case ErrTxInMap:
atomic.AddInt64(&receiveTxCounter.failInMap, 1)
}
switch err.(type) {
case ErrTxTooLarge:
atomic.AddInt64(&receiveTxCounter.failTooLarge, 1)
case ErrMempoolIsFull:
atomic.AddInt64(&receiveTxCounter.failIsFull, 1)
case ErrPreCheck:
atomic.AddInt64(&receiveTxCounter.failPreCheck, 1)
}
}
},
func(res *abci.Response) {
resCheckTx := res.GetCheckTx()
if resCheckTx.Code != abci.CodeTypeOK && len(resCheckTx.Log) != 0 {
atomic.AddInt64(&receiveTxCounter.abciFail, 1)
} else {
atomic.AddInt64(&receiveTxCounter.success, 1)
}
})
}

type receiveTxCounter struct {
sent int64
success int64
failInMap int64
failInCache int64
failTooLarge int64
failIsFull int64
failPreCheck int64
abciFail int64
}

func gossipRoutine(ctx context.Context, t testing.TB, mem *CListMempool,
receiveTxCounter *receiveTxCounter, stop chan struct{}) {
for i := 0; i < nodeNum; i++ {
select {
case <-stop:
return
default:
go receiveRoutine(ctx, t, mem, receiveTxCounter, stop)
}
}
}

func receiveRoutine(ctx context.Context, t testing.TB, mem *CListMempool,
receiveTxCounter *receiveTxCounter, stop chan struct{}) {
for {
select {
case <-stop:
return
default:
tx := []byte(strconv.Itoa(rand.Intn(mem.config.CacheSize * 2)))
// mempool.lock/unlock in CheckTxAsync
receiveTx(ctx, t, mem, tx, receiveTxCounter)
if receiveTxCounter.sent%2000 == 0 {
time.Sleep(time.Second) // for avoiding mempool full
}
}
}
}

func makeBlocksAndCommits(ctx context.Context, t testing.TB, mem *CListMempool) {
for i := 0; i < blockNum; i++ {
block, deliverTxResponses := createProposalBlockAndDeliverTxs(mem, int64(i+1))
time.Sleep(randQuadraticCurveInterval(deliveredTimeMin, deliveredTimeMax, deliveredTimeRadix))
commitBlock(ctx, t, mem, block, deliverTxResponses)
time.Sleep(randQuadraticCurveInterval(blockIntervalMin, blockIntervalMax, blockIntervalRadix))
}
}

const (
nodeNum = 1
blockNum = 15
blockIntervalMin = 1.0 // second
blockIntervalMax = 1.0 // second
blockIntervalRadix = 0.1
deliveredTimeMin = 2.0 // second
deliveredTimeMax = 10.0 // second
deliveredTimeRadix = 0.1
)

func randQuadraticCurveInterval(min, max, radix float64) time.Duration {
rand.Seed(time.Now().UnixNano())
x := rand.Float64()*(max-min) + min
y := (x * x) * radix
return time.Duration(y*1000) * time.Millisecond
}
4 changes: 3 additions & 1 deletion mempool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
)

var (
// ErrTxInCache is returned to the client if we saw tx earlier
// ErrTxInMap is returned to the client if we saw tx earlier in txsMap
ErrTxInMap = errors.New("tx already exists in txsMap")
// ErrTxInCache is returned to the client if we saw tx earlier in cache
ErrTxInCache = errors.New("tx already exists in cache")
)

Expand Down

0 comments on commit 39c17e9

Please sign in to comment.