Skip to content

Commit

Permalink
feat: [indexer][v25.x] publish created pools (#8652)
Browse files Browse the repository at this point in the history
* Support pool_created event. Tested in v25.x

* Refactored to improve abstraction. Added test cases

* fixed broken unit test cases and retested in v25.x

* support pool_created during cold start on the first block

---------

Co-authored-by: Calvin <[email protected]>
  • Loading branch information
cryptomatictrader and cryptomatictrader authored Aug 30, 2024
1 parent 2cf0edf commit 8c89343
Show file tree
Hide file tree
Showing 17 changed files with 460 additions and 34 deletions.
10 changes: 10 additions & 0 deletions ingest/common/domain/block_pools.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
package commondomain

import (
"time"

poolmanagertypes "github.com/osmosis-labs/osmosis/v25/x/poolmanager/types"
)

// PoolCreation contains the information about a pool creation.
type PoolCreation struct {
PoolId uint64
BlockHeight int64
BlockTime time.Time
TxnHash string
}

// BlockPools contains the pools to be ingested in a block.
type BlockPools struct {
// ConcentratedPools are the concentrated pools to be ingested.
Expand Down
21 changes: 19 additions & 2 deletions ingest/common/domain/mocks/pools_extracter_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,39 @@ type PoolsExtractorMock struct {
ProcessAllBlockDataPanicMsg string
// Block pools to return
BlockPools commondomain.BlockPools
// CreatedPoolIDs is the map of created pool IDs to return when ExtractCreated is called.
CreatedPoolIDs map[uint64]commondomain.PoolCreation
// CreatedPoolsError is the error to return when ExtractCreated is called.
CreatedPoolsError error
// IsProcessCreatedCalled is a flag indicating if ProcessCreated was called.
IsProcessCreatedCalled bool
}

var _ commondomain.PoolExtractor = &PoolsExtractorMock{}

// ExtractAll implements commondomain.PoolExtractor.
func (p *PoolsExtractorMock) ExtractAll(ctx types.Context) (commondomain.BlockPools, error) {
func (p *PoolsExtractorMock) ExtractAll(ctx types.Context) (commondomain.BlockPools, map[uint64]commondomain.PoolCreation, error) {
if p.ProcessAllBlockDataPanicMsg != "" {
panic(p.ProcessAllBlockDataPanicMsg)
}

p.IsProcessAllBlockDataCalled = true
return p.BlockPools, p.AllBlockDataError
return p.BlockPools, p.CreatedPoolIDs, p.AllBlockDataError
}

// ExtractChanged implements commondomain.PoolExtractor.
func (p *PoolsExtractorMock) ExtractChanged(ctx types.Context) (commondomain.BlockPools, error) {
p.IsProcessAllChangedDataCalled = true
return p.BlockPools, p.ChangedBlockDataError
}

// ExtractCreated implements commondomain.PoolExtractor.
func (p *PoolsExtractorMock) ExtractCreated(ctx types.Context) (commondomain.BlockPools, map[uint64]commondomain.PoolCreation, error) {
p.IsProcessCreatedCalled = true
return p.BlockPools, p.CreatedPoolIDs, p.CreatedPoolsError
}

// ResetPoolTracker implements commondomain.PoolExtractor.
func (p *PoolsExtractorMock) ResetPoolTracker(ctx types.Context) {
panic("unimplemented")
}
7 changes: 6 additions & 1 deletion ingest/common/domain/pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ import (
type PoolExtractor interface {
// ExtractAll extracts all the pools available within the height associated
// with the context.
ExtractAll(ctx sdk.Context) (BlockPools, error)
ExtractAll(ctx sdk.Context) (BlockPools, map[uint64]PoolCreation, error)
// ExtractChanged extracts the pools that were changed in the block height associated
// with the context.
ExtractChanged(ctx sdk.Context) (BlockPools, error)
// ExtractrCreated extracts the pools that were created in the block height associated
// with the context.
ExtractCreated(ctx sdk.Context) (BlockPools, map[uint64]PoolCreation, error)
// ResetPoolTracker resets the underlying internal pool tracker
ResetPoolTracker(ctx sdk.Context)
}
19 changes: 18 additions & 1 deletion ingest/common/poolextractor/pool_exractor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,19 @@ func (s *PoolExtractorTestSuite) TestExtractor() {
// Track tick change for a concentraed pool.
poolTracker.TrackConcentratedPoolIDTickChange(concentratedPoolWithPosition.GetId())

// Inject a new pool creation and track it
poolTracker.TrackCreatedPoolID(commondomain.PoolCreation{
PoolId: concentratedPool.GetId(),
BlockHeight: 1000,
BlockTime: s.Ctx.BlockTime(),
TxnHash: "txnhash",
})

// Initialize the extractor
extractor := poolextractor.New(keepers, poolTracker)

// System under test #1
blockPools, err := extractor.ExtractAll(s.Ctx)
blockPools, createdPoolIDs, err := extractor.ExtractAll(s.Ctx)
s.Require().NoError(err)

// Validate all pools are extracted
Expand All @@ -75,6 +83,15 @@ func (s *PoolExtractorTestSuite) TestExtractor() {
changedPools := blockPools.GetAll()
s.Require().Equal(2, len(changedPools))

// Validate that the newly created pool is extracted
// Since only one newly created pool is injected during the test in the above code earlier,
// the length of the createdPoolIDs should be 1.
// the length of the pools.GetAll() should be equal to the length of the createdPoolIDs.
pools, createdPoolIDs, err := extractor.ExtractCreated(s.Ctx)
s.Require().NoError(err)
s.Require().Equal(len(createdPoolIDs), len(pools.GetAll()))
s.Require().Equal(1, len(createdPoolIDs))

// Validate that the tick change is detected
s.Require().Len(blockPools.ConcentratedPoolIDTickChange, 2)
_, ok := blockPools.ConcentratedPoolIDTickChange[concentratedPoolWithPosition.GetId()]
Expand Down
65 changes: 60 additions & 5 deletions ingest/common/poolextractor/pool_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,30 @@ func New(keepers commondomain.PoolExtractorKeepers, poolTracker domain.BlockPool
}

// ExtractAll implements commondomain.PoolExtractor.
func (p *poolExtractor) ExtractAll(ctx sdk.Context) (commondomain.BlockPools, error) {
func (p *poolExtractor) ExtractAll(ctx sdk.Context) (commondomain.BlockPools, map[uint64]commondomain.PoolCreation, error) {
// If cold start, we process all the pools in the chain.
// Get the pool IDs that were created in the block where cold start happens
createdPoolIDs := p.poolTracker.GetCreatedPoolIDs()

// Concentrated pools

concentratedPools, err := p.keepers.ConcentratedKeeper.GetPools(ctx)
if err != nil {
return commondomain.BlockPools{}, err
return commondomain.BlockPools{}, nil, err
}

// CFMM pools

cfmmPools, err := p.keepers.GammKeeper.GetPools(ctx)
if err != nil {
return commondomain.BlockPools{}, err
return commondomain.BlockPools{}, nil, err
}

// CosmWasm pools

cosmWasmPools, err := p.keepers.CosmWasmPoolKeeper.GetPoolsWithWasmKeeper(ctx)
if err != nil {
return commondomain.BlockPools{}, err
return commondomain.BlockPools{}, nil, err
}

// Generate the initial cwPool address to pool mapping
Expand All @@ -55,7 +59,7 @@ func (p *poolExtractor) ExtractAll(ctx sdk.Context) (commondomain.BlockPools, er
CFMMPools: cfmmPools,
}

return blockPools, nil
return blockPools, createdPoolIDs, nil
}

// ExtractChanged implements commondomain.PoolExtractor.
Expand Down Expand Up @@ -99,3 +103,54 @@ func (p *poolExtractor) ExtractChanged(ctx sdk.Context) (commondomain.BlockPools

return changedBlockPools, nil
}

// ExtractCreated implements commondomain.PoolExtractor.
func (p *poolExtractor) ExtractCreated(ctx sdk.Context) (commondomain.BlockPools, map[uint64]commondomain.PoolCreation, error) {
changedPools, err := p.ExtractChanged(ctx)
if err != nil {
return commondomain.BlockPools{}, nil, err
}

createdPoolIDs := p.poolTracker.GetCreatedPoolIDs()

result := commondomain.BlockPools{
ConcentratedPoolIDTickChange: make(map[uint64]struct{}),
}

// Copy over the pools that were created in the block

// CFMM
for _, pool := range changedPools.CFMMPools {
if _, ok := createdPoolIDs[pool.GetId()]; ok {
result.CFMMPools = append(result.CFMMPools, pool)
}
}

// CosmWasm
for _, pool := range changedPools.CosmWasmPools {
if _, ok := createdPoolIDs[pool.GetId()]; ok {
result.CosmWasmPools = append(result.CosmWasmPools, pool)
}
}

// Concentrated
for _, pool := range changedPools.ConcentratedPools {
if _, ok := createdPoolIDs[pool.GetId()]; ok {
result.ConcentratedPools = append(result.ConcentratedPools, pool)
}
}

// Concentrated ticks
for poolID := range changedPools.ConcentratedPoolIDTickChange {
if _, ok := createdPoolIDs[poolID]; ok {
result.ConcentratedPoolIDTickChange[poolID] = struct{}{}
}
}

return result, createdPoolIDs, nil
}

// ResetPoolTracker implements commondomain.PoolExtractor
func (p *poolExtractor) ResetPoolTracker(ctx sdk.Context) {
p.poolTracker.Reset()
}
15 changes: 15 additions & 0 deletions ingest/common/pooltracker/memory_pool_tracker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pooltracker

import (
commondomain "github.com/osmosis-labs/osmosis/v25/ingest/common/domain"
"github.com/osmosis-labs/osmosis/v25/ingest/sqs/domain"
poolmanagertypes "github.com/osmosis-labs/osmosis/v25/x/poolmanager/types"
)
Expand All @@ -12,6 +13,8 @@ type poolBlockUpdateTracker struct {
cfmmPools map[uint64]poolmanagertypes.PoolI
cosmwasmPools map[uint64]poolmanagertypes.PoolI
cosmwasmPoolsAddressToPoolMap map[string]poolmanagertypes.PoolI
// Tracks the pool IDs that were created in the block.
createdPoolIDs map[uint64]commondomain.PoolCreation
}

// NewMemory creates a new memory pool tracker.
Expand All @@ -22,6 +25,7 @@ func NewMemory() domain.BlockPoolUpdateTracker {
cfmmPools: map[uint64]poolmanagertypes.PoolI{},
cosmwasmPools: map[uint64]poolmanagertypes.PoolI{},
cosmwasmPoolsAddressToPoolMap: map[string]poolmanagertypes.PoolI{},
createdPoolIDs: map[uint64]commondomain.PoolCreation{},
}
}

Expand Down Expand Up @@ -50,6 +54,11 @@ func (pt *poolBlockUpdateTracker) TrackConcentratedPoolIDTickChange(poolID uint6
pt.concentratedPoolIDTickChange[poolID] = struct{}{}
}

// TrackCreatedPoolID implements domain.BlockPoolUpdateTracker.
func (pt *poolBlockUpdateTracker) TrackCreatedPoolID(poolCreation commondomain.PoolCreation) {
pt.createdPoolIDs[poolCreation.PoolId] = poolCreation
}

// GetConcentratedPools implements PoolTracker.
func (pt *poolBlockUpdateTracker) GetConcentratedPools() []poolmanagertypes.PoolI {
return poolMapToSlice(pt.concentratedPools)
Expand All @@ -75,12 +84,18 @@ func (pt *poolBlockUpdateTracker) GetCosmWasmPoolsAddressToIDMap() map[string]po
return pt.cosmwasmPoolsAddressToPoolMap
}

// GetCreatedPoolIDs implements domain.BlockPoolUpdateTracker.
func (pt *poolBlockUpdateTracker) GetCreatedPoolIDs() map[uint64]commondomain.PoolCreation {
return pt.createdPoolIDs
}

// Reset implements PoolTracker.
func (pt *poolBlockUpdateTracker) Reset() {
pt.concentratedPools = map[uint64]poolmanagertypes.PoolI{}
pt.cfmmPools = map[uint64]poolmanagertypes.PoolI{}
pt.cosmwasmPools = map[uint64]poolmanagertypes.PoolI{}
pt.concentratedPoolIDTickChange = map[uint64]struct{}{}
pt.createdPoolIDs = map[uint64]commondomain.PoolCreation{}
}

// poolMapToSlice converts a map of pools to a slice of pools.
Expand Down
19 changes: 11 additions & 8 deletions ingest/indexer/domain/pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ import (

// Pair represents a pair of tokens in a pool and message to be published to PubSub
type Pair struct {
PoolID uint64 `json:"pool_id"`
MultiAsset bool `json:"multi_asset"`
Denom0 string `json:"denom_0"`
IdxDenom0 uint8 `json:"idx_denom_0"`
Denom1 string `json:"denom_1"`
IdxDenom1 uint8 `json:"idx_denom_1"`
FeeBps uint64 `json:"fee_bps"`
IngestedAt time.Time `json:"ingested_at"`
PoolID uint64 `json:"pool_id"`
MultiAsset bool `json:"multi_asset"`
Denom0 string `json:"denom_0"`
IdxDenom0 uint8 `json:"idx_denom_0"`
Denom1 string `json:"denom_1"`
IdxDenom1 uint8 `json:"idx_denom_1"`
FeeBps uint64 `json:"fee_bps"`
IngestedAt time.Time `json:"ingested_at"`
PairCreatedAt time.Time `json:"pair_created_at"`
PairCreatedAtHeight uint64 `json:"pair_created_at_height"`
PairCreatedAtTxnHash string `json:"pair_created_at_txn_hash"`
}

// ShouldFilterDenom returns true if the given denom should be filtered out.
Expand Down
7 changes: 6 additions & 1 deletion ingest/indexer/domain/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

sdk "github.com/cosmos/cosmos-sdk/types"

commondomain "github.com/osmosis-labs/osmosis/v25/ingest/common/domain"
poolmanagertypes "github.com/osmosis-labs/osmosis/v25/x/poolmanager/types"
)

Expand All @@ -26,5 +27,9 @@ type Publisher interface {
// PairPublisher is an interface for publishing pair data.
type PairPublisher interface {
// PublishPoolPairs publishes the given pools as pairs.
PublishPoolPairs(ctx sdk.Context, pools []poolmanagertypes.PoolI) error
// The difference between this function and PublishPair is:
// - PublishPair operates on the pair level and publishes a single pair.
// - PublishPoolPairs operates on the pool level and publishes all the pair combo in the pool
// with the taker fee and spread factor, as well as the newly created pool metadata, if any.
PublishPoolPairs(ctx sdk.Context, pools []poolmanagertypes.PoolI, createdPoolIDs map[uint64]commondomain.PoolCreation) error
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,35 @@ func (f *blockUpdatesIndexerBlockProcessStrategy) IsFullBlockProcessor() bool {
// ProcessBlock implements commondomain.BlockProcessStrategy.
func (f *blockUpdatesIndexerBlockProcessStrategy) ProcessBlock(ctx types.Context) error {
// Publish supplies
if err := f.publishChangedPools(ctx); err != nil {
if err := f.publishCreatedPools(ctx); err != nil {
return err
}

return nil
}

// publishChangedPools publishes the pools that were changed in the block.
func (f *blockUpdatesIndexerBlockProcessStrategy) publishChangedPools(ctx types.Context) error {
// publishCreatedPools publishes the pools that were created in the block.
func (f *blockUpdatesIndexerBlockProcessStrategy) publishCreatedPools(ctx types.Context) error {
// Extract the pools that were changed in the block
blockPools, err := f.poolExtractor.ExtractChanged(ctx)
blockPools, createdPoolIDs, err := f.poolExtractor.ExtractCreated(ctx)
if err != nil {
return err
}

pools := blockPools.GetAll()

// Do nothing if no pools were created, or pool metadata is nil
if len(createdPoolIDs) == 0 || len(pools) == 0 {
return nil
}

// Publish pool pairs
if err := f.poolPairPublisher.PublishPoolPairs(ctx, pools); err != nil {
if err := f.poolPairPublisher.PublishPoolPairs(ctx, pools, createdPoolIDs); err != nil {
return err
}

// Reset the pool tracker
f.poolExtractor.ResetPoolTracker(ctx)

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (f *fullIndexerBlockProcessStrategy) publishAllSupplies(ctx sdk.Context) {

// processPools publishes all the pools in the block.
func (f *fullIndexerBlockProcessStrategy) processPools(ctx sdk.Context) error {
blockPools, err := f.poolExtractor.ExtractAll(ctx)
blockPools, createdPoolIDs, err := f.poolExtractor.ExtractAll(ctx)
if err != nil {
return err
}
Expand All @@ -105,7 +105,7 @@ func (f *fullIndexerBlockProcessStrategy) processPools(ctx sdk.Context) error {
pools := blockPools.GetAll()

// Process pool pairs
if err := f.poolPairPublisher.PublishPoolPairs(ctx, pools); err != nil {
if err := f.poolPairPublisher.PublishPoolPairs(ctx, pools, createdPoolIDs); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 8c89343

Please sign in to comment.