Skip to content

Commit

Permalink
make it more like upstream solution
Browse files Browse the repository at this point in the history
  • Loading branch information
czarcas7ic committed Apr 21, 2024
1 parent d43a3bb commit 55aadba
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 50 deletions.
9 changes: 8 additions & 1 deletion rpc/core/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,14 @@ func TxSearch(
*pagePtr = 1
}

results, totalCount, err := env.TxIndexer.Search(ctx.Context(), q, *pagePtr, perPage, orderBy)
pagSettings := ctypes.Pagination{
OrderDesc: orderBy == "desc",
IsPaginated: true,
Page: *pagePtr,
PerPage: perPage,
}

results, totalCount, err := env.TxIndexer.Search(ctx.Context(), q, pagSettings)
if err != nil {
return nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions rpc/core/types/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,13 @@ type ResultEvent struct {
Data types.TMEventData `json:"data"`
Events map[string][]string `json:"events"`
}

// Pagination provides pagination information for queries.
// This allows us to use the same TxSearch API for pruning to return all relevant data,
// while still limiting public queries to pagination.
type Pagination struct {
OrderDesc bool
IsPaginated bool
Page int
PerPage int
}
3 changes: 2 additions & 1 deletion state/indexer/sink/psql/backport.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/pubsub/query"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/cometbft/cometbft/state/txindex"
"github.com/cometbft/cometbft/types"
)
Expand Down Expand Up @@ -55,7 +56,7 @@ func (BackportTxIndexer) Get([]byte) (*abci.TxResult, error) {

// Search is implemented to satisfy the TxIndexer interface, but it is not
// supported by the psql event sink and reports an error for all inputs.
func (BackportTxIndexer) Search(ctx context.Context, q *query.Query, page, perPage int, orderBy string) ([]*abci.TxResult, int, error) {
func (BackportTxIndexer) Search(ctx context.Context, q *query.Query, pagSettings ctypes.Pagination) ([]*abci.TxResult, int, error) {
return nil, 0, errors.New("the TxIndexer.Search method is not supported")
}

Expand Down
3 changes: 2 additions & 1 deletion state/txindex/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/pubsub/query"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
)

// XXX/TODO: These types should be moved to the indexer package.
Expand All @@ -25,7 +26,7 @@ type TxIndexer interface {
Get(hash []byte) (*abci.TxResult, error)

// Search allows you to query for transactions.
Search(ctx context.Context, q *query.Query, page, perPage int, orderBy string) ([]*abci.TxResult, int, error)
Search(ctx context.Context, q *query.Query, pagSettings ctypes.Pagination) ([]*abci.TxResult, int, error)
}

// Batch groups together multiple Index operations to be performed at the same time.
Expand Down
81 changes: 54 additions & 27 deletions state/txindex/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/pubsub/query"
"github.com/cometbft/cometbft/rpc/core"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/cometbft/cometbft/state/indexer"
"github.com/cometbft/cometbft/state/txindex"
"github.com/cometbft/cometbft/types"
Expand Down Expand Up @@ -199,7 +199,7 @@ func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Ba
//
// Search will exit early and return any result fetched so far,
// when a message is received on the context chan.
func (txi *TxIndex) Search(ctx context.Context, q *query.Query, page, perPage int, orderBy string) ([]*abci.TxResult, int, error) {
func (txi *TxIndex) Search(ctx context.Context, q *query.Query, pagSettings ctypes.Pagination) ([]*abci.TxResult, int, error) {
select {
case <-ctx.Done():
return make([]*abci.TxResult, 0), 0, nil
Expand Down Expand Up @@ -298,17 +298,7 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query, page, perPage in
}
}

// Now that we know the total number of results, validate that the page
// requested is within bounds
numResults := len(filteredHashes)
page, err = core.ValidatePage(&page, perPage, numResults)
if err != nil {
return nil, 0, err
}

// Calculate pagination start and end indices
startIndex := (page - 1) * perPage
endIndex := startIndex + perPage

// Convert map keys to slice for deterministic ordering
hashKeys := make([]string, 0, numResults)
Expand All @@ -322,32 +312,48 @@ func (txi *TxIndex) Search(ctx context.Context, q *query.Query, page, perPage in
hj := filteredHashes[hashKeys[j]].Height
if hi == hj {
// If heights are equal, sort lexicographically
if orderBy == "asc" {
return hashKeys[i] < hashKeys[j]
} else {
if pagSettings.OrderDesc {
return hashKeys[i] > hashKeys[j]
} else {
return hashKeys[i] < hashKeys[j]
}
}
if orderBy == "asc" {
return hi < hj
} else {
if pagSettings.OrderDesc {
return hi > hj

} else {
return hi < hj
}
})

// Apply pagination limits
if endIndex > len(hashKeys) {
endIndex = len(hashKeys)
}
if startIndex >= len(hashKeys) {
return []*abci.TxResult{}, 0, nil
// If paginated, determine which hash keys to return
if pagSettings.IsPaginated {
// Now that we know the total number of results, validate that the page
// requested is within bounds
pagSettings.Page, err = validatePage(&pagSettings.Page, pagSettings.PerPage, numResults)
if err != nil {
return nil, 0, err
}

// Calculate pagination start and end indices
startIndex := (pagSettings.Page - 1) * pagSettings.PerPage
endIndex := startIndex + pagSettings.PerPage

// Apply pagination limits
if endIndex > len(hashKeys) {
endIndex = len(hashKeys)
}
if startIndex >= len(hashKeys) {
return []*abci.TxResult{}, 0, nil
}

hashKeys = hashKeys[startIndex:endIndex]
}

paginatedKeys := hashKeys[startIndex:endIndex]
results := make([]*abci.TxResult, 0, len(paginatedKeys))
results := make([]*abci.TxResult, 0, len(hashKeys))
resultMap := make(map[string]struct{})
RESULTS_LOOP:
for _, hKey := range paginatedKeys {
for _, hKey := range hashKeys {
h := filteredHashes[hKey].TxBytes
res, err := txi.Get(h)
if err != nil {
Expand Down Expand Up @@ -767,3 +773,24 @@ func lookForHeight(conditions []query.Condition) (height int64) {
}
return 0
}

func validatePage(pagePtr *int, perPage, totalCount int) (int, error) {
if perPage < 1 {
panic(fmt.Sprintf("zero or negative perPage: %d", perPage))
}

if pagePtr == nil { // no page parameter
return 1, nil
}

pages := ((totalCount - 1) / perPage) + 1
if pages == 0 {
pages = 1 // one page (even if it's empty)
}
page := *pagePtr
if page <= 0 || page > pages {
return 1, fmt.Errorf("page should be within [1, %d] range, given %d", pages, page)
}

return page, nil
}
2 changes: 1 addition & 1 deletion state/txindex/kv/kv_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func BenchmarkTxSearch(b *testing.B) {
ctx := context.Background()

for i := 0; i < b.N; i++ {
if _, _, err := indexer.Search(ctx, txQuery, 1, 100, "asc"); err != nil {
if _, _, err := indexer.Search(ctx, txQuery, DefaultPagination); err != nil {
b.Errorf("failed to query for txs: %s", err)
}
}
Expand Down
24 changes: 16 additions & 8 deletions state/txindex/kv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,18 @@ import (
abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/pubsub/query"
cmtrand "github.com/cometbft/cometbft/libs/rand"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/cometbft/cometbft/state/txindex"
"github.com/cometbft/cometbft/types"
)

var DefaultPagination = ctypes.Pagination{
IsPaginated: true,
Page: 1,
PerPage: 100,
OrderDesc: false,
}

func TestBigInt(t *testing.T) {
indexer := NewTxIndex(db.NewMemDB())

Expand Down Expand Up @@ -80,7 +88,7 @@ func TestBigInt(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.q, func(t *testing.T) {
results, _, err := indexer.Search(ctx, query.MustParse(tc.q), 1, 100, "asc")
results, _, err := indexer.Search(ctx, query.MustParse(tc.q), DefaultPagination)
assert.NoError(t, err)
assert.Len(t, results, tc.resultsLength)
if tc.resultsLength > 0 && tc.txRes != nil {
Expand Down Expand Up @@ -212,7 +220,7 @@ func TestTxSearch(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.q, func(t *testing.T) {
results, _, err := indexer.Search(ctx, query.MustParse(tc.q), 1, 100, "asc")
results, _, err := indexer.Search(ctx, query.MustParse(tc.q), DefaultPagination)
assert.NoError(t, err)

assert.Len(t, results, tc.resultsLength)
Expand Down Expand Up @@ -310,7 +318,7 @@ func TestTxSearchEventMatch(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.q, func(t *testing.T) {
results, _, err := indexer.Search(ctx, query.MustParse(tc.q), 1, 100, "asc")
results, _, err := indexer.Search(ctx, query.MustParse(tc.q), DefaultPagination)
assert.NoError(t, err)

assert.Len(t, results, tc.resultsLength)
Expand Down Expand Up @@ -386,7 +394,7 @@ func TestTxSearchEventMatchByHeight(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.q, func(t *testing.T) {
results, _, err := indexer.Search(ctx, query.MustParse(tc.q), 1, 100, "asc")
results, _, err := indexer.Search(ctx, query.MustParse(tc.q), DefaultPagination)
assert.NoError(t, err)

assert.Len(t, results, tc.resultsLength)
Expand Down Expand Up @@ -418,7 +426,7 @@ func TestTxSearchWithCancelation(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
cancel()
results, _, err := indexer.Search(ctx, query.MustParse("account.number = 1"), 1, 100, "asc")
results, _, err := indexer.Search(ctx, query.MustParse("account.number = 1"), DefaultPagination)
assert.NoError(t, err)
assert.Empty(t, results)
}
Expand Down Expand Up @@ -491,7 +499,7 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.q, func(t *testing.T) {
results, _, err := indexer.Search(ctx, query.MustParse(tc.q), 1, 100, "asc")
results, _, err := indexer.Search(ctx, query.MustParse(tc.q), DefaultPagination)
require.NoError(t, err)
for _, txr := range results {
for _, tr := range tc.results {
Expand Down Expand Up @@ -573,7 +581,7 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) {
ctx := context.Background()

for _, tc := range testCases {
results, _, err := indexer.Search(ctx, query.MustParse(tc.q), 1, 100, "asc")
results, _, err := indexer.Search(ctx, query.MustParse(tc.q), DefaultPagination)
assert.NoError(t, err)
len := 0
if tc.found {
Expand Down Expand Up @@ -730,7 +738,7 @@ func TestTxSearchMultipleTxs(t *testing.T) {

ctx := context.Background()

results, _, err := indexer.Search(ctx, query.MustParse("account.number >= 1"), 1, 100, "asc")
results, _, err := indexer.Search(ctx, query.MustParse("account.number >= 1"), DefaultPagination)
assert.NoError(t, err)

require.Len(t, results, 3)
Expand Down
22 changes: 12 additions & 10 deletions state/txindex/mocks/tx_indexer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion state/txindex/null/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/pubsub/query"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/cometbft/cometbft/state/txindex"
)

Expand All @@ -29,6 +30,6 @@ func (txi *TxIndex) Index(result *abci.TxResult) error {
return nil
}

func (txi *TxIndex) Search(ctx context.Context, q *query.Query, page, perPage int, orderBy string) ([]*abci.TxResult, int, error) {
func (txi *TxIndex) Search(ctx context.Context, q *query.Query, pagSettings ctypes.Pagination) ([]*abci.TxResult, int, error) {
return []*abci.TxResult{}, 0, nil
}

0 comments on commit 55aadba

Please sign in to comment.