Skip to content

Commit

Permalink
feat: concurrent recheckTx (#52) (#155)
Browse files Browse the repository at this point in the history
* chore: bump up ostracon

* feat: concurrent recheckTx (#52)

* chore: simply implement abci.`CheckTxSync()` and abci.`CheckTxAsync()`

* chore: move `accountLock` from `anteTx()` to `checkTxWithLock()`

* feat: impl `abci.CheckTxAsync()` with reactor

# Conflicts:
#	baseapp/baseapp.go

* feat: impl `accountwgs`

* feat: impl `accountwgs_test`

* chore: revise code after cherry-pick

* chore: bump-up tendermint & iavl

* chore: rename func from `startCheckTxAsyncReactor()` to `checkTxAsyncReactor()`

* fix: imports for lint

* fix: imports for lint

* chore: rename `Waits()` to `Wait()`
# Conflicts:
#	baseapp/abci.go
#	baseapp/accountlock.go
#	baseapp/accountwgs_test.go
#	baseapp/baseapp.go
#	baseapp/baseapp_test.go
#	go.mod
#	go.sum

* fix: add a tag, `goleveldb`, to `test_cover.sh`
  • Loading branch information
jinsan-line authored Apr 29, 2021
1 parent 2a9f3c7 commit 258163d
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 152 deletions.
31 changes: 26 additions & 5 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,17 +210,22 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
// internal CheckTx state if the AnteHandler passes. Otherwise, the ResponseCheckTx
// will contain releveant error information. Regardless of tx execution outcome,
// the ResponseCheckTx will contain relevant gas execution context.
func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
func (app *BaseApp) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx {
defer telemetry.MeasureSince(time.Now(), "abci", "check_tx")

tx, err := app.txDecoder(req.Tx)
if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck {
panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type))
}

tx, err := app.preCheckTx(req.Tx)
if err != nil {
return sdkerrors.ResponseCheckTx(err, 0, 0, app.trace)
}

if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck {
panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type))
}
waits, signals := app.checkAccountWGs.Register(tx)

app.checkAccountWGs.Wait(waits)
defer app.checkAccountWGs.Done(signals)

gInfo, err := app.checkTx(req.Tx, tx, req.Type == abci.CheckTxType_Recheck)
if err != nil {
Expand All @@ -233,6 +238,22 @@ func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
}
}

func (app *BaseApp) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) {
if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck {
panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type))
}

reqCheckTx := &RequestCheckTxAsync{
txBytes: req.Tx,
recheck: req.Type == abci.CheckTxType_Recheck,
callback: callback,
prepare: waitGroup1(),
}
app.chCheckTx <- reqCheckTx

go app.prepareCheckTx(reqCheckTx)
}

// BeginRecheckTx implements the ABCI interface and set the check state based on the given header
func (app *BaseApp) BeginRecheckTx(req abci.RequestBeginRecheckTx) abci.ResponseBeginRecheckTx {
// NOTE: This is safe because Ostracon holds a lock on the mempool for Rechecking.
Expand Down
88 changes: 0 additions & 88 deletions baseapp/accountlock.go

This file was deleted.

85 changes: 85 additions & 0 deletions baseapp/accountwgs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package baseapp

import (
"sync"

sdk "github.com/line/lbm-sdk/v2/types"
)

type AccountWGs struct {
mtx sync.Mutex
wgs map[string]*sync.WaitGroup
}

func NewAccountWGs() *AccountWGs {
return &AccountWGs{
wgs: make(map[string]*sync.WaitGroup),
}
}

func (aw *AccountWGs) Register(tx sdk.Tx) (waits []*sync.WaitGroup, signals []*AccountWG) {
signers := getUniqSigners(tx)

aw.mtx.Lock()
defer aw.mtx.Unlock()
for _, signer := range signers {
if wg := aw.wgs[signer]; wg != nil {
waits = append(waits, wg)
}
sig := waitGroup1()
aw.wgs[signer] = sig
signals = append(signals, NewAccountWG(signer, sig))
}

return waits, signals
}

func (aw *AccountWGs) Wait(waits []*sync.WaitGroup) {
for _, wait := range waits {
wait.Wait()
}
}

func (aw *AccountWGs) Done(signals []*AccountWG) {
aw.mtx.Lock()
defer aw.mtx.Unlock()

for _, signal := range signals {
signal.wg.Done()
if aw.wgs[signal.acc] == signal.wg {
delete(aw.wgs, signal.acc)
}
}
}

func getUniqSigners(tx sdk.Tx) []string {
seen := map[string]bool{}
var signers []string
for _, msg := range tx.GetMsgs() {
for _, addr := range msg.GetSigners() {
if !seen[addr.String()] {
signers = append(signers, string(addr))
seen[addr.String()] = true
}
}
}
return signers
}

type AccountWG struct {
acc string
wg *sync.WaitGroup
}

func NewAccountWG(acc string, wg *sync.WaitGroup) *AccountWG {
return &AccountWG{
acc: acc,
wg: wg,
}
}

func waitGroup1() (wg *sync.WaitGroup) {
wg = &sync.WaitGroup{}
wg.Add(1)
return wg
}
60 changes: 22 additions & 38 deletions baseapp/accountlock_test.go → baseapp/accountwgs_test.go
Original file line number Diff line number Diff line change
@@ -1,76 +1,66 @@
package baseapp

import (
"reflect"
"sort"
"sync"
"testing"

"github.com/stretchr/testify/require"

ostproto "github.com/line/ostracon/proto/ostracon/types"

"github.com/line/lbm-sdk/v2/crypto/keys/secp256k1"
"github.com/line/lbm-sdk/v2/testutil/testdata"
sdk "github.com/line/lbm-sdk/v2/types"
)

func TestAccountLock(t *testing.T) {
func TestConvertByteSliceToString(t *testing.T) {
b := []byte{65, 66, 67, 0, 65, 66, 67}
s := string(b)
require.Equal(t, len(b), len(s))
require.Equal(t, uint8(0), s[3])
}

func TestRegister(t *testing.T) {
app := setupBaseApp(t)
ctx := app.NewContext(true, ostproto.Header{})

privs := newTestPrivKeys(3)
tx := newTestTx(privs)

accKeys := app.accountLock.Lock(ctx, tx)
waits, signals := app.checkAccountWGs.Register(tx)

for _, accKey := range accKeys {
require.True(t, isMutexLock(&app.accountLock.accMtx[accKey]))
}
require.Equal(t, 0, len(waits))
require.Equal(t, 3, len(signals))

app.accountLock.Unlock(accKeys)

for _, accKey := range accKeys {
require.False(t, isMutexLock(&app.accountLock.accMtx[accKey]))
for _, signal := range signals {
require.Equal(t, app.checkAccountWGs.wgs[signal.acc], signal.wg)
}
}

func TestUnlockDoNothingWithNil(t *testing.T) {
func TestDontPanicWithNil(t *testing.T) {
app := setupBaseApp(t)
require.NotPanics(t, func() { app.accountLock.Unlock(nil) })
}

func TestGetSigner(t *testing.T) {
privs := newTestPrivKeys(3)
tx := newTestTx(privs)
signers := getSigners(tx)

require.Equal(t, getAddrs(privs), signers)
require.NotPanics(t, func() { app.checkAccountWGs.Wait(nil) })
require.NotPanics(t, func() { app.checkAccountWGs.Done(nil) })
}

func TestGetUniqSortedAddressKey(t *testing.T) {
func TestGetUniqSigners(t *testing.T) {
privs := newTestPrivKeys(3)

addrs := getAddrs(privs)
addrs = append(addrs, addrs[1], addrs[0])
require.Equal(t, 5, len(addrs))

accKeys := getUniqSortedAddressKey(addrs)
tx := newTestTx(privs)
signers := getUniqSigners(tx)

// length should be reduced because `duplicated` is removed
require.Less(t, len(accKeys), len(addrs))
require.Less(t, len(signers), len(addrs))

// check uniqueness
for i, iv := range accKeys {
for j, jv := range accKeys {
for i, iv := range signers {
for j, jv := range signers {
if i != j {
require.True(t, iv != jv)
}
}
}

// should be sorted
require.True(t, sort.IsSorted(uint32Slice(accKeys)))
}

type AccountLockTestTx struct {
Expand Down Expand Up @@ -111,9 +101,3 @@ func newTestTx(privs []*secp256k1.PrivKey) sdk.Tx {
}
return AccountLockTestTx{Msgs: msgs}
}

// Hack (too slow)
func isMutexLock(mtx *sync.Mutex) bool {
state := reflect.ValueOf(mtx).Elem().FieldByName("state")
return state.Int() == 1
}
Loading

0 comments on commit 258163d

Please sign in to comment.