Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

draft for test: concurrent recheckTx #165

Closed
wants to merge 10 commits into from
21 changes: 14 additions & 7 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,19 @@ func (app *localClient) DeliverTxAsync(params types.RequestDeliverTx) *ReqRes {
)
}

func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes {
res := app.Application.CheckTx(req)
return app.callback(
types.ToRequestCheckTx(req),
types.ToResponseCheckTx(res),
)
func (app *localClient) CheckTxAsync(params types.RequestCheckTx) *ReqRes {
req := types.ToRequestCheckTx(params)
reqRes := NewReqRes(req)

app.Application.CheckTxAsync(params, func(r types.ResponseCheckTx) {
res := types.ToResponseCheckTx(r)
app.Callback(req, res)
reqRes.Response = res
reqRes.SetDone()
reqRes.Done()
})

return reqRes
}

func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes {
Expand Down Expand Up @@ -201,7 +208,7 @@ func (app *localClient) DeliverTxSync(req types.RequestDeliverTx) (*types.Respon
}

func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
res := app.Application.CheckTx(req)
res := app.Application.CheckTxSync(req)
return &res, nil
}

Expand Down
10 changes: 9 additions & 1 deletion abci/example/counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,15 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli
return types.ResponseDeliverTx{Code: code.CodeTypeOK}
}

func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
func (app *Application) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx {
return app.checkTx(req)
}

func (app *Application) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) {
callback(app.checkTx(req))
}

func (app *Application) checkTx(req types.RequestCheckTx) types.ResponseCheckTx {
if app.serial {
if len(req.Tx) > 8 {
return types.ResponseCheckTx{
Expand Down
10 changes: 9 additions & 1 deletion abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,15 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli
return types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: events}
}

func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
func (app *Application) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx {
return app.checkTx(req)
}

func (app *Application) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) {
callback(app.checkTx(req))
}

func (app *Application) checkTx(req types.RequestCheckTx) types.ResponseCheckTx {
return types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}
}

Expand Down
8 changes: 6 additions & 2 deletions abci/example/kvstore/persistent_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,12 @@ func (app *PersistentKVStoreApplication) DeliverTx(req types.RequestDeliverTx) t
return app.app.DeliverTx(req)
}

func (app *PersistentKVStoreApplication) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
return app.app.CheckTx(req)
func (app *PersistentKVStoreApplication) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx {
return app.app.CheckTxSync(req)
}

func (app *PersistentKVStoreApplication) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) {
app.app.CheckTxAsync(req, callback)
}

func (app *PersistentKVStoreApplication) BeginRecheckTx(req types.RequestBeginRecheckTx) types.ResponseBeginRecheckTx {
Expand Down
13 changes: 10 additions & 3 deletions abci/types/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
context "golang.org/x/net/context"
)

type CheckTxCallback func(ResponseCheckTx)

// Application is an interface that enables any finite, deterministic state machine
// to be driven by a blockchain-based replication engine via the ABCI.
// All methods take a RequestXxx argument and return a ResponseXxx argument,
Expand All @@ -15,7 +17,8 @@ type Application interface {
Query(RequestQuery) ResponseQuery // Query for state

// Mempool Connection
CheckTx(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool
CheckTxSync(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool
CheckTxAsync(RequestCheckTx, CheckTxCallback) // Asynchronously validate a tx for the mempool
BeginRecheckTx(RequestBeginRecheckTx) ResponseBeginRecheckTx // Signals the beginning of rechecking
EndRecheckTx(RequestEndRecheckTx) ResponseEndRecheckTx // Signals the end of rechecking

Expand Down Expand Up @@ -51,10 +54,14 @@ func (BaseApplication) DeliverTx(req RequestDeliverTx) ResponseDeliverTx {
return ResponseDeliverTx{Code: CodeTypeOK}
}

func (BaseApplication) CheckTx(req RequestCheckTx) ResponseCheckTx {
func (BaseApplication) CheckTxSync(req RequestCheckTx) ResponseCheckTx {
return ResponseCheckTx{Code: CodeTypeOK}
}

func (BaseApplication) CheckTxAsync(req RequestCheckTx, callback CheckTxCallback) {
callback(ResponseCheckTx{Code: CodeTypeOK})
}

func (BaseApplication) BeginRecheckTx(req RequestBeginRecheckTx) ResponseBeginRecheckTx {
return ResponseBeginRecheckTx{Code: CodeTypeOK}
}
Expand Down Expand Up @@ -114,7 +121,7 @@ func (app *GRPCApplication) DeliverTx(ctx context.Context, req *RequestDeliverTx
}

func (app *GRPCApplication) CheckTx(ctx context.Context, req *RequestCheckTx) (*ResponseCheckTx, error) {
res := app.app.CheckTx(*req)
res := app.app.CheckTxSync(*req)
return &res, nil
}

Expand Down
6 changes: 5 additions & 1 deletion blockchain/v0/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,14 @@ func (app *testApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx
return abci.ResponseDeliverTx{Events: []abci.Event{}}
}

func (app *testApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
func (app *testApp) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx {
return abci.ResponseCheckTx{}
}

func (app *testApp) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) {
callback(abci.ResponseCheckTx{})
}

func (app *testApp) Commit() abci.ResponseCommit {
return abci.ResponseCommit{}
}
Expand Down
16 changes: 12 additions & 4 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func deliverTxsRange(cs *State, start, end int) {
for i := start; i < end; i++ {
txBytes := make([]byte, 8)
binary.BigEndian.PutUint64(txBytes, uint64(i))
err := assertMempool(cs.txNotifier).CheckTx(txBytes, nil, mempl.TxInfo{})
_, err := assertMempool(cs.txNotifier).CheckTxSync(txBytes, mempl.TxInfo{})
if err != nil {
panic(fmt.Sprintf("Error after CheckTx: %v", err))
}
Expand Down Expand Up @@ -161,13 +161,13 @@ func TestMempoolRmBadTx(t *testing.T) {
// Try to send the tx through the mempool.
// CheckTx should not err, but the app should return a bad abci code
// and the tx should get removed from the pool
err := assertMempool(cs.txNotifier).CheckTx(txBytes, func(r *abci.Response) {
err := assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(r *abci.Response) {
if r.GetCheckTx().Code != code.CodeTypeBadNonce {
t.Errorf("expected checktx to return bad nonce, got %v", r)
return
}
checkTxRespCh <- struct{}{}
}, mempl.TxInfo{})
})
if err != nil {
t.Errorf("error after CheckTx: %v", err)
return
Expand Down Expand Up @@ -233,7 +233,15 @@ func (app *CounterApplication) DeliverTx(req abci.RequestDeliverTx) abci.Respons
return abci.ResponseDeliverTx{Code: code.CodeTypeOK}
}

func (app *CounterApplication) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
func (app *CounterApplication) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx {
return app.checkTx(req)
}

func (app *CounterApplication) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) {
callback(app.checkTx(req))
}

func (app *CounterApplication) checkTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
txValue := txAsUint64(req.Tx)
app.mempoolTxCountMtx.Lock()
defer app.mempoolTxCountMtx.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)

// send a tx
if err := assertMempool(css[3].txNotifier).CheckTx([]byte{1, 2, 3}, nil, mempl.TxInfo{}); err != nil {
if _, err := assertMempool(css[3].txNotifier).CheckTxSync([]byte{1, 2, 3}, mempl.TxInfo{}); err != nil {
t.Error(err)
}

Expand Down Expand Up @@ -543,7 +543,7 @@ func waitForAndValidateBlock(
err := validateBlock(newBlock, activeVals)
assert.Nil(t, err)
for _, tx := range txs {
err := assertMempool(css[j].txNotifier).CheckTx(tx, nil, mempl.TxInfo{})
_, err := assertMempool(css[j].txNotifier).CheckTxSync(tx, mempl.TxInfo{})
assert.Nil(t, err)
}
}, css)
Expand Down
14 changes: 7 additions & 7 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func sendTxs(ctx context.Context, cs *State) {
return
default:
tx := []byte{byte(i)}
assertMempool(cs.txNotifier).CheckTx(tx, nil, mempl.TxInfo{})
assertMempool(cs.txNotifier).CheckTxSync(tx, mempl.TxInfo{})
i++
}
}
Expand Down Expand Up @@ -350,7 +350,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
require.NoError(t, err)
valPubKey1ABCI := types.TM2PB.PubKey(newValidatorPubKey1)
newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx1, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ := css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts := propBlock.MakePartSet(partSize)
Expand All @@ -376,7 +376,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
require.NoError(t, err)
updatePubKey1ABCI := types.TM2PB.PubKey(updateValidatorPubKey1)
updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25)
err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(updateValidatorTx1, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
Expand All @@ -402,13 +402,13 @@ func TestSimulateValidatorsChange(t *testing.T) {
require.NoError(t, err)
newVal2ABCI := types.TM2PB.PubKey(newValidatorPubKey2)
newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx2, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx2, mempl.TxInfo{})
assert.Nil(t, err)
newValidatorPubKey3, err := css[nVals+2].privValidator.GetPubKey()
require.NoError(t, err)
newVal3ABCI := types.TM2PB.PubKey(newValidatorPubKey3)
newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx3, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
Expand Down Expand Up @@ -442,7 +442,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
ensureNewProposal(proposalCh, height, round)

removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx2, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(removeValidatorTx2, mempl.TxInfo{})
assert.Nil(t, err)

rs = css[0].GetRoundState()
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
height++
incrementHeight(vss...)
removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(removeValidatorTx3, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
Expand Down
37 changes: 34 additions & 3 deletions mempool/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,46 @@ func BenchmarkReap(b *testing.B) {
for i := 0; i < size; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTx(tx, nil, TxInfo{})
mempool.CheckTxSync(tx, TxInfo{})
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
mempool.ReapMaxBytesMaxGas(100000000, 10000000)
}
}

func BenchmarkCheckTx(b *testing.B) {
func BenchmarkReapWithCheckTxAsync(b *testing.B) {
app := kvstore.NewApplication()
cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithApp(cc)
defer cleanup()

size := 10000
for i := 0; i < size; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTxAsync(tx, TxInfo{}, nil)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
mempool.ReapMaxBytesMaxGas(100000000, 10000000)
}
}

func BenchmarkCheckTxSync(b *testing.B) {
app := kvstore.NewApplication()
cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithApp(cc)
defer cleanup()

for i := 0; i < b.N; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTxSync(tx, TxInfo{})
}
}

func BenchmarkCheckTxAsync(b *testing.B) {
app := kvstore.NewApplication()
cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithApp(cc)
Expand All @@ -35,7 +66,7 @@ func BenchmarkCheckTx(b *testing.B) {
for i := 0; i < b.N; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTx(tx, nil, TxInfo{})
mempool.CheckTxAsync(tx, TxInfo{}, nil)
}
}

Expand Down
4 changes: 2 additions & 2 deletions mempool/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestCacheAfterUpdate(t *testing.T) {
for tcIndex, tc := range tests {
for i := 0; i < tc.numTxsToCreate; i++ {
tx := types.Tx{byte(i)}
err := mempool.CheckTx(tx, nil, TxInfo{})
_, err := mempool.CheckTxSync(tx, TxInfo{})
require.NoError(t, err)
}

Expand All @@ -71,7 +71,7 @@ func TestCacheAfterUpdate(t *testing.T) {

for _, v := range tc.reAddIndices {
tx := types.Tx{byte(v)}
_ = mempool.CheckTx(tx, nil, TxInfo{})
_, _ = mempool.CheckTxSync(tx, TxInfo{})
}

cache := mempool.cache.(*mapTxCache)
Expand Down
Loading