Skip to content

Commit

Permalink
fix: fix overriding tx index of duplicated txs (#498)
Browse files Browse the repository at this point in the history
* Fix duplicated tx index

* Update CHANGELOG

* Revert "Update CHANGELOG"

This reverts commit e02f6b8.
  • Loading branch information
ulbqb authored Nov 15, 2022
1 parent 8d8570b commit 15a55a2
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 0 deletions.
48 changes: 48 additions & 0 deletions state/txindex/indexer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package txindex
import (
"context"

abci "github.com/line/ostracon/abci/types"
"github.com/line/ostracon/libs/service"
"github.com/line/ostracon/state/indexer"
"github.com/line/ostracon/types"
Expand Down Expand Up @@ -82,6 +83,11 @@ func (is *IndexerService) OnStart() error {
is.Logger.Info("indexed block", "height", height)
}

batch.Ops, err = DeduplicateBatch(batch.Ops, is.txIdxr)
if err != nil {
is.Logger.Error("deduplicate batch", "height", height)
}

if err = is.txIdxr.AddBatch(batch); err != nil {
is.Logger.Error("failed to index block txs", "height", height, "err", err)
} else {
Expand All @@ -98,3 +104,45 @@ func (is *IndexerService) OnStop() {
_ = is.eventBus.UnsubscribeAll(context.Background(), subscriber)
}
}

// DeduplicateBatch consider the case of duplicate txs.
// if the current one under investigation is NOT OK, then we need to check
// whether there's a previously indexed tx.
// SKIP the current tx if the previously indexed record is found and successful.
func DeduplicateBatch(ops []*abci.TxResult, txIdxr TxIndexer) ([]*abci.TxResult, error) {
result := make([]*abci.TxResult, 0, len(ops))

// keep track of successful txs in this block in order to suppress latter ones being indexed.
var successfulTxsInThisBlock = make(map[string]struct{})

for _, txResult := range ops {
hash := types.Tx(txResult.Tx).Hash()

if txResult.Result.IsOK() {
successfulTxsInThisBlock[string(hash)] = struct{}{}
} else {
// if it already appeared in current block and was successful, skip.
if _, found := successfulTxsInThisBlock[string(hash)]; found {
continue
}

// check if this tx hash is already indexed
old, err := txIdxr.Get(hash)

// if db op errored
// Not found is not an error
if err != nil {
return nil, err
}

// if it's already indexed in an older block and was successful, skip.
if old != nil && old.Result.Code == abci.CodeTypeOK {
continue
}
}

result = append(result, txResult)
}

return result, nil
}
161 changes: 161 additions & 0 deletions state/txindex/indexer_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,164 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
require.NoError(t, err)
require.Equal(t, txResult2, res)
}

func TestTxIndexDuplicatePreviouslySuccessful(t *testing.T) {
var mockTx = types.Tx("MOCK_TX_HASH")

testCases := []struct {
name string
tx1 abci.TxResult
tx2 abci.TxResult
expSkip bool // do we expect the second tx to be skipped by tx indexer
}{
{"skip, previously successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
true,
},
{"not skip, previously unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
false,
},
{"not skip, both successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
false,
},
{"not skip, both unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
false,
},
{"skip, same block, previously successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
true,
},
{"not skip, same block, previously unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
indexer := kv.NewTxIndex(db.NewMemDB())

if tc.tx1.Height != tc.tx2.Height {
// index the first tx
err := indexer.AddBatch(&txindex.Batch{
Ops: []*abci.TxResult{&tc.tx1},
})
require.NoError(t, err)

// check if the second one should be skipped.
ops, err := txindex.DeduplicateBatch([]*abci.TxResult{&tc.tx2}, indexer)
require.NoError(t, err)

if tc.expSkip {
require.Empty(t, ops)
} else {
require.Equal(t, []*abci.TxResult{&tc.tx2}, ops)
}
} else {
// same block
ops := []*abci.TxResult{&tc.tx1, &tc.tx2}
ops, err := txindex.DeduplicateBatch(ops, indexer)
require.NoError(t, err)
if tc.expSkip {
// the second one is skipped
require.Equal(t, []*abci.TxResult{&tc.tx1}, ops)
} else {
require.Equal(t, []*abci.TxResult{&tc.tx1, &tc.tx2}, ops)
}
}
})
}
}

0 comments on commit 15a55a2

Please sign in to comment.