Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: concurrent recheckTx #52

Merged
merged 11 commits into from
Jan 26, 2021
31 changes: 26 additions & 5 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,20 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
// internal CheckTx state if the AnteHandler passes. Otherwise, the ResponseCheckTx
// will contain releveant error information. Regardless of tx execution outcome,
// the ResponseCheckTx will contain relevant gas execution context.
func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
tx, err := app.txDecoder(req.Tx)
func (app *BaseApp) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx {
if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck {
panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type))
}

tx, err := app.preCheckTx(req.Tx)
if err != nil {
return sdkerrors.ResponseCheckTx(err, 0, 0, app.trace)
}

if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck {
panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type))
}
waits, signals := app.checkAccountWGs.Register(tx)

app.checkAccountWGs.Wait(waits)
defer app.checkAccountWGs.Done(signals)

gInfo, err := app.checkTx(req.Tx, tx, req.Type == abci.CheckTxType_Recheck)
if err != nil {
Expand All @@ -182,6 +187,22 @@ func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
}
}

func (app *BaseApp) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) {
if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck {
panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type))
}

reqCheckTx := &RequestCheckTxAsync{
txBytes: req.Tx,
recheck: req.Type == abci.CheckTxType_Recheck,
callback: callback,
prepare: waitGroup1(),
}
app.chCheckTx <- reqCheckTx

go app.prepareCheckTx(reqCheckTx)
wetcod marked this conversation as resolved.
Show resolved Hide resolved
}

// BeginRecheckTx implements the ABCI interface and set the check state based on the given header
func (app *BaseApp) BeginRecheckTx(req abci.RequestBeginRecheckTx) abci.ResponseBeginRecheckTx {
// NOTE: This is safe because Tendermint holds a lock on the mempool for Rechecking.
Expand Down
88 changes: 0 additions & 88 deletions baseapp/accountlock.go

This file was deleted.

85 changes: 85 additions & 0 deletions baseapp/accountwgs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package baseapp

import (
"sync"

sdk "github.com/cosmos/cosmos-sdk/types"
)

type AccountWGs struct {
mtx sync.Mutex
wgs map[string]*sync.WaitGroup
}

func NewAccountWGs() *AccountWGs {
return &AccountWGs{
wgs: make(map[string]*sync.WaitGroup),
}
}

func (aw *AccountWGs) Register(tx sdk.Tx) (waits []*sync.WaitGroup, signals []*AccountWG) {
signers := getUniqSigners(tx)

aw.mtx.Lock()
defer aw.mtx.Unlock()
for _, signer := range signers {
if wg := aw.wgs[signer]; wg != nil {
waits = append(waits, wg)
}
sig := waitGroup1()
aw.wgs[signer] = sig
signals = append(signals, NewAccountWG(signer, sig))
}

return waits, signals
}

func (aw *AccountWGs) Wait(waits []*sync.WaitGroup) {
for _, wait := range waits {
wait.Wait()
}
}

func (aw *AccountWGs) Done(signals []*AccountWG) {
aw.mtx.Lock()
defer aw.mtx.Unlock()

for _, signal := range signals {
signal.wg.Done()
if aw.wgs[signal.acc] == signal.wg {
delete(aw.wgs, signal.acc)
}
}
}

func getUniqSigners(tx sdk.Tx) []string {
seen := map[string]bool{}
var signers []string
for _, msg := range tx.GetMsgs() {
for _, addr := range msg.GetSigners() {
if !seen[addr.String()] {
signers = append(signers, string(addr))
seen[addr.String()] = true
}
}
}
return signers
}

type AccountWG struct {
acc string
wg *sync.WaitGroup
}

func NewAccountWG(acc string, wg *sync.WaitGroup) *AccountWG {
return &AccountWG{
acc: acc,
wg: wg,
}
}

func waitGroup1() (wg *sync.WaitGroup) {
wg = &sync.WaitGroup{}
wg.Add(1)
return wg
}
59 changes: 22 additions & 37 deletions baseapp/accountlock_test.go → baseapp/accountwgs_test.go
Original file line number Diff line number Diff line change
@@ -1,76 +1,67 @@
package baseapp

import (
"reflect"
"sort"
"sync"
"testing"

"github.com/stretchr/testify/require"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/secp256k1"

sdk "github.com/cosmos/cosmos-sdk/types"
)

func TestAccountLock(t *testing.T) {
func TestConvertByteSliceToString(t *testing.T) {
b := []byte{65, 66, 67, 0, 65, 66, 67}
s := string(b)
require.Equal(t, len(b), len(s))
require.Equal(t, uint8(0), s[3])
}

func TestRegister(t *testing.T) {
app := setupBaseApp(t)
ctx := app.NewContext(true, abci.Header{})

privs := newTestPrivKeys(3)
tx := newTestTx(privs)

accKeys := app.accountLock.Lock(ctx, tx)

for _, accKey := range accKeys {
require.True(t, isMutexLock(&app.accountLock.accMtx[accKey]))
}
waits, signals := app.checkAccountWGs.Register(tx)

app.accountLock.Unlock(accKeys)
require.Equal(t, 0, len(waits))
require.Equal(t, 3, len(signals))

for _, accKey := range accKeys {
require.False(t, isMutexLock(&app.accountLock.accMtx[accKey]))
for _, signal := range signals {
require.Equal(t, app.checkAccountWGs.wgs[signal.acc], signal.wg)
}
}

func TestUnlockDoNothingWithNil(t *testing.T) {
func TestDontPanicWithNil(t *testing.T) {
app := setupBaseApp(t)
require.NotPanics(t, func() { app.accountLock.Unlock(nil) })
}

func TestGetSigner(t *testing.T) {
privs := newTestPrivKeys(3)
tx := newTestTx(privs)
signers := getSigners(tx)

require.Equal(t, getAddrs(privs), signers)
require.NotPanics(t, func() { app.checkAccountWGs.Wait(nil) })
require.NotPanics(t, func() { app.checkAccountWGs.Done(nil) })
}

func TestGetUniqSortedAddressKey(t *testing.T) {
func TestGetUniqSigners(t *testing.T) {
privs := newTestPrivKeys(3)

addrs := getAddrs(privs)
addrs = append(addrs, addrs[1], addrs[0])
require.Equal(t, 5, len(addrs))

accKeys := getUniqSortedAddressKey(addrs)
tx := newTestTx(privs)
signers := getUniqSigners(tx)

// length should be reduced because `duplicated` is removed
require.Less(t, len(accKeys), len(addrs))
require.Less(t, len(signers), len(addrs))

// check uniqueness
for i, iv := range accKeys {
for j, jv := range accKeys {
for i, iv := range signers {
for j, jv := range signers {
if i != j {
require.True(t, iv != jv)
}
}
}

// should be sorted
require.True(t, sort.IsSorted(uint32Slice(accKeys)))
}

type AccountLockTestTx struct {
Expand Down Expand Up @@ -111,9 +102,3 @@ func newTestTx(privs []crypto.PrivKey) sdk.Tx {
}
return AccountLockTestTx{Msgs: msgs}
}

// Hack (too slow)
func isMutexLock(mtx *sync.Mutex) bool {
state := reflect.ValueOf(mtx).Elem().FieldByName("state")
return state.Int() == 1
}
Loading