Skip to content

Commit

Permalink
V20 SQS POC
Browse files Browse the repository at this point in the history
Continuing from #6723
  • Loading branch information
p0mvn committed Nov 21, 2023
1 parent eeb3af7 commit 59998d8
Show file tree
Hide file tree
Showing 63 changed files with 7,577 additions and 933 deletions.
44 changes: 43 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"-test.timeout",
"30m",
"-test.run",
"TestKeeperTestSuite/TestYourName",
"TestKeeperTestSuite/TestComputeAndSwapOutAmtGivenIn",
"-test.v"
],
},
Expand Down Expand Up @@ -294,5 +294,47 @@
"-test.v"
],
},
{
"name": "ingest/sqs/router",
"type": "go",
"request": "launch",
"mode": "test",
"program": "${workspaceFolder}/ingest/sqs/router/usecase",
"args": [
"-test.timeout",
"30m",
"-test.run",
"TestRouterTestSuite/TestNewRouter",
"-test.v"
],
},
{
"name": "ingest/sqs/router/usecase",
"type": "go",
"request": "launch",
"mode": "test",
"program": "${workspaceFolder}/ingest/sqs/router/usecase",
"args": [
"-test.timeout",
"30m",
"-test.run",
"TestRouterTestSuite/TestCalculateTokenOutByTokenIn_Concentrated_ErrorAndEdgeCases/22",
"-test.v"
],
},
{
"name": "ingest/sqs/pools/ingester/redis",
"type": "go",
"request": "launch",
"mode": "test",
"program": "${workspaceFolder}/ingest/sqs/pools/ingester/redis",
"args": [
"-test.timeout",
"30m",
"-test.run",
"TestIngesterTestSuite/TestConvertPool_Concentrated",
"-test.v"
],
},
]
}
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,7 @@ go-mock-update:
mockgen -source=x/poolmanager/types/pool.go -destination=tests/mocks/pool.go -package=mocks
mockgen -source=x/gamm/types/pool.go -destination=tests/mocks/cfmm_pool.go -package=mocks
mockgen -source=x/concentrated-liquidity/types/cl_pool_extensionI.go -destination=tests/mocks/cl_pool.go -package=mocks
mockgen -source=ingest/sqs/domain/pools.go -destination=tests/mocks/sqs_pool.go -package=mocks -mock_names=PoolI=MockSQSPoolI

###############################################################################
### Release ###
Expand Down
49 changes: 48 additions & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"reflect"
"strconv"
"strings"

store "github.com/cosmos/cosmos-sdk/store/types"
Expand Down Expand Up @@ -69,9 +70,24 @@ import (
v8 "github.com/osmosis-labs/osmosis/v20/app/upgrades/v8"
v9 "github.com/osmosis-labs/osmosis/v20/app/upgrades/v9"
_ "github.com/osmosis-labs/osmosis/v20/client/docs/statik"
"github.com/osmosis-labs/osmosis/v20/ingest"
"github.com/osmosis-labs/osmosis/v20/ingest/sqs"

poolsingester "github.com/osmosis-labs/osmosis/v20/ingest/sqs/pools/ingester/redis"
)

const appName = "OsmosisApp"
const (
appName = "OsmosisApp"

// Environment variable configurations
// TODO: replace all SQS environment variables with a config file
ENV_NAME_INGEST_TYPE = "INGEST_TYPE"
ENV_NAME_INGEST_SQS_DBHOST = "INGEST_SQS_DBHOST"
ENV_NAME_INGEST_SQS_DBPORT = "INGEST_SQS_DBPORT"
ENV_NAME_INGEST_SQS_SERVER_ADDRESS = "INGEST_SQS_SERVER_ADDRESS"
ENV_NAME_INGEST_SQS_SERVER_TIMEOUT_DURATION_SECS = "INGEST_SQS_SERVER_TIMEOUT_DURATION_SECS"
ENV_VALUE_INGESTER_SQS = "sqs"
)

var (
// DefaultNodeHome default home directories for the application daemon
Expand Down Expand Up @@ -243,6 +259,35 @@ func NewOsmosisApp(
app.BlockedAddrs(),
)

isIngestManagerEnabled := os.Getenv(ENV_NAME_INGEST_TYPE) == ENV_VALUE_INGESTER_SQS
app.IngestManager = ingest.NewIngestManager()
if isIngestManagerEnabled {
dbHost := os.Getenv(ENV_NAME_INGEST_SQS_DBHOST)
dbPort := os.Getenv(ENV_NAME_INGEST_SQS_DBPORT)
sidecarQueryServerAddress := os.Getenv(ENV_NAME_INGEST_SQS_SERVER_ADDRESS)
sidecarQueryServerTimeoutDuration, err := strconv.Atoi(os.Getenv(ENV_NAME_INGEST_SQS_SERVER_TIMEOUT_DURATION_SECS))
if err != nil {
panic(fmt.Sprintf("error while parsing timeout duration: %s", err))
}

// Create sidecar query server
sidecarQueryServer, err := sqs.NewSideCarQueryServer(appCodec, dbHost, dbPort, sidecarQueryServerAddress, sidecarQueryServerTimeoutDuration)
if err != nil {
panic(fmt.Sprintf("error while creating sidecar query server: %s", err))
}

txManager := sidecarQueryServer.GetTxManager()

// Create pools ingester
poolsIngester := poolsingester.NewPoolIngester(sidecarQueryServer.GetPoolsRepository(), txManager, app.GAMMKeeper, app.ConcentratedLiquidityKeeper, app.CosmwasmPoolKeeper, app.BankKeeper, app.ProtoRevKeeper, app.PoolManagerKeeper)

// Create sqs ingester that encapsulates all ingesters.
sqsIngester := sqs.NewSidecarQueryServerIngester(poolsIngester, txManager)

// Set the sidecar query server ingester to the ingest manager.
app.IngestManager.SetIngester(sqsIngester)
}

// TODO: There is a bug here, where we register the govRouter routes in InitNormalKeepers and then
// call setupHooks afterwards. Therefore, if a gov proposal needs to call a method and that method calls a
// hook, we will get a nil pointer dereference error due to the hooks in the keeper not being
Expand Down Expand Up @@ -366,6 +411,8 @@ func (app *OsmosisApp) BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock)

// EndBlocker application updates every end block.
func (app *OsmosisApp) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) abci.ResponseEndBlock {
app.IngestManager.ProcessBlock(ctx)

return app.mm.EndBlock(ctx, req)
}

Expand Down
Loading

0 comments on commit 59998d8

Please sign in to comment.