diff --git a/baseapp/abci.go b/baseapp/abci.go index c462f23526..1f983d3645 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -210,17 +210,22 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc // internal CheckTx state if the AnteHandler passes. Otherwise, the ResponseCheckTx // will contain releveant error information. Regardless of tx execution outcome, // the ResponseCheckTx will contain relevant gas execution context. -func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { +func (app *BaseApp) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx { defer telemetry.MeasureSince(time.Now(), "abci", "check_tx") - tx, err := app.txDecoder(req.Tx) + if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck { + panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type)) + } + + tx, err := app.preCheckTx(req.Tx) if err != nil { return sdkerrors.ResponseCheckTx(err, 0, 0, app.trace) } - if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck { - panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type)) - } + waits, signals := app.checkAccountWGs.Register(tx) + + app.checkAccountWGs.Wait(waits) + defer app.checkAccountWGs.Done(signals) gInfo, err := app.checkTx(req.Tx, tx, req.Type == abci.CheckTxType_Recheck) if err != nil { @@ -233,6 +238,22 @@ func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { } } +func (app *BaseApp) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) { + if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck { + panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type)) + } + + reqCheckTx := &RequestCheckTxAsync{ + txBytes: req.Tx, + recheck: req.Type == abci.CheckTxType_Recheck, + callback: callback, + prepare: waitGroup1(), + } + app.chCheckTx <- reqCheckTx + + go app.prepareCheckTx(reqCheckTx) +} + // BeginRecheckTx implements the ABCI interface and set the check state based on the given header func (app *BaseApp) BeginRecheckTx(req abci.RequestBeginRecheckTx) abci.ResponseBeginRecheckTx { // NOTE: This is safe because Ostracon holds a lock on the mempool for Rechecking. diff --git a/baseapp/accountlock.go b/baseapp/accountlock.go deleted file mode 100644 index cd6828446d..0000000000 --- a/baseapp/accountlock.go +++ /dev/null @@ -1,88 +0,0 @@ -package baseapp - -import ( - "encoding/binary" - "sort" - "sync" - - sdk "github.com/line/lbm-sdk/v2/types" -) - -// NOTE should 1 <= sampleBytes <= 4. If modify it, you should revise `getAddressKey()` as well -const sampleBytes = 2 - -type AccountLock struct { - accMtx [1 << (sampleBytes * 8)]sync.Mutex -} - -func (al *AccountLock) Lock(ctx sdk.Context, tx sdk.Tx) []uint32 { - if !ctx.IsCheckTx() || ctx.IsReCheckTx() { - return nil - } - - signers := getSigners(tx) - accKeys := getUniqSortedAddressKey(signers) - - for _, key := range accKeys { - al.accMtx[key].Lock() - } - - return accKeys -} - -func (al *AccountLock) Unlock(accKeys []uint32) { - // NOTE reverse order - for i, length := 0, len(accKeys); i < length; i++ { - key := accKeys[length-1-i] - al.accMtx[key].Unlock() - } -} - -func getSigners(tx sdk.Tx) []sdk.AccAddress { - seen := map[string]bool{} - var signers []sdk.AccAddress - for _, msg := range tx.GetMsgs() { - for _, addr := range msg.GetSigners() { - if !seen[addr.String()] { - signers = append(signers, addr) - seen[addr.String()] = true - } - } - } - return signers -} - -func getUniqSortedAddressKey(addrs []sdk.AccAddress) []uint32 { - accKeys := make([]uint32, 0, len(addrs)) - for _, addr := range addrs { - accKeys = append(accKeys, getAddressKey(addr)) - } - - accKeys = uniq(accKeys) - sort.Sort(uint32Slice(accKeys)) - - return accKeys -} - -func getAddressKey(addr sdk.AccAddress) uint32 { - return uint32(binary.BigEndian.Uint16(addr)) -} - -func uniq(u []uint32) []uint32 { - seen := map[uint32]bool{} - var ret []uint32 - for _, v := range u { - if !seen[v] { - ret = append(ret, v) - seen[v] = true - } - } - return ret -} - -// Uint32Slice attaches the methods of Interface to []uint32, sorting in increasing order. -type uint32Slice []uint32 - -func (p uint32Slice) Len() int { return len(p) } -func (p uint32Slice) Less(i, j int) bool { return p[i] < p[j] } -func (p uint32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } diff --git a/baseapp/accountwgs.go b/baseapp/accountwgs.go new file mode 100644 index 0000000000..5748223a48 --- /dev/null +++ b/baseapp/accountwgs.go @@ -0,0 +1,85 @@ +package baseapp + +import ( + "sync" + + sdk "github.com/line/lbm-sdk/v2/types" +) + +type AccountWGs struct { + mtx sync.Mutex + wgs map[string]*sync.WaitGroup +} + +func NewAccountWGs() *AccountWGs { + return &AccountWGs{ + wgs: make(map[string]*sync.WaitGroup), + } +} + +func (aw *AccountWGs) Register(tx sdk.Tx) (waits []*sync.WaitGroup, signals []*AccountWG) { + signers := getUniqSigners(tx) + + aw.mtx.Lock() + defer aw.mtx.Unlock() + for _, signer := range signers { + if wg := aw.wgs[signer]; wg != nil { + waits = append(waits, wg) + } + sig := waitGroup1() + aw.wgs[signer] = sig + signals = append(signals, NewAccountWG(signer, sig)) + } + + return waits, signals +} + +func (aw *AccountWGs) Wait(waits []*sync.WaitGroup) { + for _, wait := range waits { + wait.Wait() + } +} + +func (aw *AccountWGs) Done(signals []*AccountWG) { + aw.mtx.Lock() + defer aw.mtx.Unlock() + + for _, signal := range signals { + signal.wg.Done() + if aw.wgs[signal.acc] == signal.wg { + delete(aw.wgs, signal.acc) + } + } +} + +func getUniqSigners(tx sdk.Tx) []string { + seen := map[string]bool{} + var signers []string + for _, msg := range tx.GetMsgs() { + for _, addr := range msg.GetSigners() { + if !seen[addr.String()] { + signers = append(signers, string(addr)) + seen[addr.String()] = true + } + } + } + return signers +} + +type AccountWG struct { + acc string + wg *sync.WaitGroup +} + +func NewAccountWG(acc string, wg *sync.WaitGroup) *AccountWG { + return &AccountWG{ + acc: acc, + wg: wg, + } +} + +func waitGroup1() (wg *sync.WaitGroup) { + wg = &sync.WaitGroup{} + wg.Add(1) + return wg +} diff --git a/baseapp/accountlock_test.go b/baseapp/accountwgs_test.go similarity index 57% rename from baseapp/accountlock_test.go rename to baseapp/accountwgs_test.go index 054fed9b00..be686103d7 100644 --- a/baseapp/accountlock_test.go +++ b/baseapp/accountwgs_test.go @@ -1,76 +1,66 @@ package baseapp import ( - "reflect" - "sort" - "sync" "testing" "github.com/stretchr/testify/require" - ostproto "github.com/line/ostracon/proto/ostracon/types" - "github.com/line/lbm-sdk/v2/crypto/keys/secp256k1" "github.com/line/lbm-sdk/v2/testutil/testdata" sdk "github.com/line/lbm-sdk/v2/types" ) -func TestAccountLock(t *testing.T) { +func TestConvertByteSliceToString(t *testing.T) { + b := []byte{65, 66, 67, 0, 65, 66, 67} + s := string(b) + require.Equal(t, len(b), len(s)) + require.Equal(t, uint8(0), s[3]) +} + +func TestRegister(t *testing.T) { app := setupBaseApp(t) - ctx := app.NewContext(true, ostproto.Header{}) privs := newTestPrivKeys(3) tx := newTestTx(privs) - accKeys := app.accountLock.Lock(ctx, tx) + waits, signals := app.checkAccountWGs.Register(tx) - for _, accKey := range accKeys { - require.True(t, isMutexLock(&app.accountLock.accMtx[accKey])) - } + require.Equal(t, 0, len(waits)) + require.Equal(t, 3, len(signals)) - app.accountLock.Unlock(accKeys) - - for _, accKey := range accKeys { - require.False(t, isMutexLock(&app.accountLock.accMtx[accKey])) + for _, signal := range signals { + require.Equal(t, app.checkAccountWGs.wgs[signal.acc], signal.wg) } } -func TestUnlockDoNothingWithNil(t *testing.T) { +func TestDontPanicWithNil(t *testing.T) { app := setupBaseApp(t) - require.NotPanics(t, func() { app.accountLock.Unlock(nil) }) -} -func TestGetSigner(t *testing.T) { - privs := newTestPrivKeys(3) - tx := newTestTx(privs) - signers := getSigners(tx) - - require.Equal(t, getAddrs(privs), signers) + require.NotPanics(t, func() { app.checkAccountWGs.Wait(nil) }) + require.NotPanics(t, func() { app.checkAccountWGs.Done(nil) }) } -func TestGetUniqSortedAddressKey(t *testing.T) { +func TestGetUniqSigners(t *testing.T) { privs := newTestPrivKeys(3) addrs := getAddrs(privs) addrs = append(addrs, addrs[1], addrs[0]) require.Equal(t, 5, len(addrs)) - accKeys := getUniqSortedAddressKey(addrs) + tx := newTestTx(privs) + signers := getUniqSigners(tx) // length should be reduced because `duplicated` is removed - require.Less(t, len(accKeys), len(addrs)) + require.Less(t, len(signers), len(addrs)) // check uniqueness - for i, iv := range accKeys { - for j, jv := range accKeys { + for i, iv := range signers { + for j, jv := range signers { if i != j { require.True(t, iv != jv) } } } - - // should be sorted - require.True(t, sort.IsSorted(uint32Slice(accKeys))) } type AccountLockTestTx struct { @@ -111,9 +101,3 @@ func newTestTx(privs []*secp256k1.PrivKey) sdk.Tx { } return AccountLockTestTx{Msgs: msgs} } - -// Hack (too slow) -func isMutexLock(mtx *sync.Mutex) bool { - state := reflect.ValueOf(mtx).Elem().FieldByName("state") - return state.Int() == 1 -} diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index b055b28f3f..8297c979e0 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -70,7 +70,9 @@ type BaseApp struct { // nolint: maligned deliverState *state // for DeliverTx checkStateMtx sync.RWMutex - accountLock AccountLock + + checkAccountWGs *AccountWGs + chCheckTx chan *RequestCheckTxAsync // an inter-block write-through cache provided to the context during deliverState interBlockCache sdk.MultiStorePersistentCache @@ -144,6 +146,8 @@ func NewBaseApp( msgServiceRouter: NewMsgServiceRouter(), txDecoder: txDecoder, fauxMerkleMode: false, + checkAccountWGs: NewAccountWGs(), + chCheckTx: make(chan *RequestCheckTxAsync, 10000), // TODO config channel buffer size. It might be good to set it tendermint mempool.size } for _, option := range options { @@ -156,6 +160,8 @@ func NewBaseApp( app.runTxRecoveryMiddleware = newDefaultRecoveryMiddleware() + app.startReactors() + return app } @@ -548,6 +554,26 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context return ctx.WithMultiStore(msCache), msCache } +// stateless checkTx +func (app *BaseApp) preCheckTx(txBytes []byte) (tx sdk.Tx, err error) { + defer func() { + if r := recover(); r != nil { + recoveryMW := newDefaultRecoveryMiddleware() + err = processRecovery(r, recoveryMW) + } + }() + + tx, err = app.txDecoder(txBytes) + if err != nil { + return tx, err + } + + msgs := tx.GetMsgs() + err = validateBasicTxMsgs(msgs) + + return tx, err +} + func (app *BaseApp) checkTx(txBytes []byte, tx sdk.Tx, recheck bool) (gInfo sdk.GasInfo, err error) { ctx := app.getCheckContextForTx(txBytes, recheck) gasCtx := &ctx @@ -560,14 +586,6 @@ func (app *BaseApp) checkTx(txBytes []byte, tx sdk.Tx, recheck bool) (gInfo sdk. gInfo = sdk.GasInfo{GasWanted: gasCtx.GasMeter().Limit(), GasUsed: gasCtx.GasMeter().GasConsumed()} }() - msgs := tx.GetMsgs() - if err = validateBasicTxMsgs(msgs); err != nil { - return gInfo, err - } - - accKeys := app.accountLock.Lock(ctx, tx) - defer app.accountLock.Unlock(accKeys) - var anteCtx sdk.Context anteCtx, err = app.anteTx(ctx, txBytes, tx, false) if !anteCtx.IsZero() { diff --git a/baseapp/baseapp_test.go b/baseapp/baseapp_test.go index 49ac80e86b..a3499da1d8 100644 --- a/baseapp/baseapp_test.go +++ b/baseapp/baseapp_test.go @@ -938,7 +938,7 @@ func TestCheckTx(t *testing.T) { tx := newTxCounter(i, 0) // no messages txBytes, err := codec.MarshalBinaryBare(tx) require.NoError(t, err) - r := app.CheckTx(abci.RequestCheckTx{Tx: txBytes}) + r := app.CheckTxSync(abci.RequestCheckTx{Tx: txBytes}) require.Empty(t, r.GetEvents()) require.True(t, r.IsOK(), fmt.Sprintf("%v", r)) } diff --git a/baseapp/reactor.go b/baseapp/reactor.go new file mode 100644 index 0000000000..3166bf6062 --- /dev/null +++ b/baseapp/reactor.go @@ -0,0 +1,59 @@ +package baseapp + +import ( + "sync" + + abci "github.com/line/ostracon/abci/types" + + sdk "github.com/line/lbm-sdk/v2/types" + sdkerrors "github.com/line/lbm-sdk/v2/types/errors" +) + +func (app *BaseApp) startReactors() { + go app.checkTxAsyncReactor() +} + +type RequestCheckTxAsync struct { + txBytes []byte + recheck bool + callback abci.CheckTxCallback + prepare *sync.WaitGroup + tx sdk.Tx + err error +} + +func (app *BaseApp) checkTxAsyncReactor() { + for req := range app.chCheckTx { + req.prepare.Wait() + if req.err != nil { + req.callback(sdkerrors.ResponseCheckTx(req.err, 0, 0, app.trace)) + continue + } + + waits, signals := app.checkAccountWGs.Register(req.tx) + + go app.checkTxAsync(req, waits, signals) + } +} + +func (app *BaseApp) prepareCheckTx(req *RequestCheckTxAsync) { + defer req.prepare.Done() + req.tx, req.err = app.preCheckTx(req.txBytes) +} + +func (app *BaseApp) checkTxAsync(req *RequestCheckTxAsync, waits []*sync.WaitGroup, signals []*AccountWG) { + app.checkAccountWGs.Wait(waits) + defer app.checkAccountWGs.Done(signals) + + gInfo, err := app.checkTx(req.txBytes, req.tx, req.recheck) + + if err != nil { + req.callback(sdkerrors.ResponseCheckTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace)) + return + } + + req.callback(abci.ResponseCheckTx{ + GasWanted: int64(gInfo.GasWanted), // TODO: Should type accept unsigned ints? + GasUsed: int64(gInfo.GasUsed), // TODO: Should type accept unsigned ints? + }) +} diff --git a/contrib/test_cover.sh b/contrib/test_cover.sh index 24f7804b51..89734beaca 100644 --- a/contrib/test_cover.sh +++ b/contrib/test_cover.sh @@ -6,7 +6,7 @@ PKGS=$(go list ./... | grep -v '/simapp') set -e echo "mode: atomic" > coverage.txt for pkg in ${PKGS[@]}; do - go test -v -timeout 30m -race -coverprofile=profile.out -covermode=atomic -tags='ledger test_ledger_mock' "$pkg" + go test -v -timeout 30m -race -coverprofile=profile.out -covermode=atomic -tags='ledger test_ledger_mock goleveldb' "$pkg" if [ -f profile.out ]; then tail -n +2 profile.out >> coverage.txt; rm profile.out diff --git a/go.mod b/go.mod index 99d0e42361..1e6dd185b7 100644 --- a/go.mod +++ b/go.mod @@ -28,14 +28,13 @@ require ( github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/hashicorp/golang-lru v0.5.4 github.com/line/iavl/v2 v2.0.0-init.1.0.20210419041411-7de35b5f1306 - github.com/line/ostracon v0.34.9-0.20210419031811-5254cabf172e + github.com/line/ostracon v0.34.9-0.20210429024836-e5495cecd765 github.com/line/tm-db/v2 v2.0.0-init.1.0.20210413083915-5bb60e117524 github.com/magiconair/properties v1.8.4 github.com/mattn/go-isatty v0.0.12 github.com/nxadm/tail v1.4.8 // indirect github.com/onsi/ginkgo v1.15.0 // indirect github.com/onsi/gomega v1.10.5 // indirect - github.com/otiai10/copy v1.4.2 github.com/pelletier/go-toml v1.8.0 // indirect github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.8.0 diff --git a/go.sum b/go.sum index 798fcb5c27..ba71272d3f 100644 --- a/go.sum +++ b/go.sum @@ -355,6 +355,8 @@ github.com/line/linemint v1.0.0/go.mod h1:0yUs9eIuuDq07nQql9BmI30FtYGcEC60Tu5JzB github.com/line/ostracon v0.34.9-0.20210315041958-2a1f43c788f5/go.mod h1:1THU+kF+6fxLaNYQKcdNyLCO6t9LnqSMaExDMiLozbM= github.com/line/ostracon v0.34.9-0.20210419031811-5254cabf172e h1:+F5duGTfSZiwD9LqMkdtwFgULjKn8pGRz/86U/SyIO0= github.com/line/ostracon v0.34.9-0.20210419031811-5254cabf172e/go.mod h1:w/itWXCU8Wttz/2Ftp+yJz0UXv9gX6qT1dASn+3kX5M= +github.com/line/ostracon v0.34.9-0.20210429024836-e5495cecd765 h1:Ktkqma9R7CAUi9/ByznJZdWn0TuL7YDnZ0SkiFET6pg= +github.com/line/ostracon v0.34.9-0.20210429024836-e5495cecd765/go.mod h1:w/itWXCU8Wttz/2Ftp+yJz0UXv9gX6qT1dASn+3kX5M= github.com/line/tm-db v0.5.2 h1:P8kMpcrm9Xyfl6QLyafssNIoIeC01k0fhw2zDvKhtl4= github.com/line/tm-db v0.5.2/go.mod h1:VrPTx04QJhQ9d8TFUTc2GpPBvBf/U9vIdBIzkjBk7Lk= github.com/line/tm-db/v2 v2.0.0-init.1.0.20210406062110-9424ca70955a h1:qSt/WwORC5+nVRnNqx+A0oo5gOCsoVJ0HmHF5Db1YvY= @@ -441,14 +443,6 @@ github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5/go.mod h1:/wsWhb9smxS github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= -github.com/otiai10/copy v1.4.2 h1:RTiz2sol3eoXPLF4o+YWqEybwfUa/Q2Nkc4ZIUs3fwI= -github.com/otiai10/copy v1.4.2/go.mod h1:XWfuS3CrI0R6IE0FbgHsEazaXO8G0LpMp9o8tos0x4E= -github.com/otiai10/curr v0.0.0-20150429015615-9b4961190c95/go.mod h1:9qAhocn7zKJG+0mI8eUu6xqkFDYS2kb2saOteoSB3cE= -github.com/otiai10/curr v1.0.0 h1:TJIWdbX0B+kpNagQrjgq8bCMrbhiuX73M2XwgtDMoOI= -github.com/otiai10/curr v1.0.0/go.mod h1:LskTG5wDwr8Rs+nNQ+1LlxRjAtTZZjtJW4rMXl6j4vs= -github.com/otiai10/mint v1.3.0/go.mod h1:F5AjcsTsWUqX+Na9fpHb52P8pcRX2CI6A3ctIT91xUo= -github.com/otiai10/mint v1.3.2 h1:VYWnrP5fXmz1MXvjuUvcBrXSjGE6xjON+axB/UrpO3E= -github.com/otiai10/mint v1.3.2/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH1OTc= github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=