Skip to content

Commit

Permalink
feat: celatone for osmosis v17
Browse files Browse the repository at this point in the history
  • Loading branch information
traviolus committed Aug 17, 2023
1 parent 2f698ba commit 5c7fe81
Show file tree
Hide file tree
Showing 35 changed files with 3,920 additions and 7 deletions.
52 changes: 49 additions & 3 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ import (
"github.com/cosmos/cosmos-sdk/x/crisis"
upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types"

"github.com/osmosis-labs/osmosis/v17/hooks/common"
"github.com/osmosis-labs/osmosis/v17/hooks/emitter"

"github.com/osmosis-labs/osmosis/v17/app/keepers"
"github.com/osmosis-labs/osmosis/v17/app/upgrades"
v10 "github.com/osmosis-labs/osmosis/v17/app/upgrades/v10"
Expand Down Expand Up @@ -144,6 +147,13 @@ type OsmosisApp struct {
mm *module.Manager
configurator module.Configurator
homePath string

// DeliverContext is set during InitGenesis/BeginBlock and cleared during Commit.
// It allows anyone to read/mutate Osmosis consensus state at anytime.
DeliverContext sdk.Context

// List of hooks
hooks common.Hooks
}

// init sets DefaultNodeHome to default osmosisd install location.
Expand Down Expand Up @@ -182,6 +192,7 @@ func NewOsmosisApp(
loadLatest bool,
skipUpgradeHeights map[int64]bool,
homePath string,
withEmitter string,
invCheckPeriod uint,
appOpts servertypes.AppOptions,
wasmOpts []wasm.Option,
Expand Down Expand Up @@ -308,6 +319,12 @@ func NewOsmosisApp(
app.SetPostHandler(NewPostHandler(app.ProtoRevKeeper))
app.SetEndBlocker(app.EndBlocker)

// Initialize emitter hook and append to the app hooks.
app.hooks = make(common.Hooks, 0)
if withEmitter != "" {
app.hooks = append(app.hooks, emitter.NewHook(encodingConfig, app.AppKeepers, withEmitter))
}

// Register snapshot extensions to enable state-sync for wasm.
if manager := app.SnapshotManager(); manager != nil {
err := manager.RegisterExtensions(
Expand Down Expand Up @@ -350,12 +367,29 @@ func (app *OsmosisApp) Name() string { return app.BaseApp.Name() }
// BeginBlocker application updates every begin block.
func (app *OsmosisApp) BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock) abci.ResponseBeginBlock {
BeginBlockForks(ctx, app)
return app.mm.BeginBlock(ctx, req)
app.DeliverContext = ctx
res := app.mm.BeginBlock(ctx, req)
cacheContext, _ := ctx.CacheContext()
app.hooks.AfterBeginBlock(cacheContext, req, res)

return res
}

// EndBlocker application updates every end block.
func (app *OsmosisApp) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) abci.ResponseEndBlock {
return app.mm.EndBlock(ctx, req)
res := app.mm.EndBlock(ctx, req)
cacheContext, _ := ctx.CacheContext()
app.hooks.AfterEndBlock(cacheContext, req, res)

return res
}

// Commit overrides the default BaseApp's ABCI commit by adding DeliverContext clearing.
func (app *OsmosisApp) Commit() (res abci.ResponseCommit) {
app.hooks.BeforeCommit()
app.DeliverContext = sdk.Context{}

return app.BaseApp.Commit()
}

// InitChainer application update at chain initialization.
Expand All @@ -366,8 +400,20 @@ func (app *OsmosisApp) InitChainer(ctx sdk.Context, req abci.RequestInitChain) a
}

app.UpgradeKeeper.SetModuleVersionMap(ctx, app.mm.GetVersionMap())
res := app.mm.InitGenesis(ctx, app.appCodec, genesisState)
cacheContext, _ := ctx.CacheContext()
app.hooks.AfterInitChain(cacheContext, req, res)

return res
}

// DeliverTx overwrite DeliverTx to apply the AfterDeliverTx hook.
func (app *OsmosisApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
res := app.BaseApp.DeliverTx(req)
cacheCtx, _ := app.DeliverContext.CacheContext()
app.hooks.AfterDeliverTx(cacheCtx, req, res)

return app.mm.InitGenesis(ctx, app.appCodec, genesisState)
return res
}

// LoadHeight loads a particular height.
Expand Down
2 changes: 1 addition & 1 deletion app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func DefaultConfig() network.Config {
func NewAppConstructor() network.AppConstructor {
return func(val network.Validator) servertypes.Application {
return NewOsmosisApp(
val.Ctx.Logger, dbm.NewMemDB(), nil, true, make(map[int64]bool), val.Ctx.Config.RootDir, 0,
val.Ctx.Logger, dbm.NewMemDB(), nil, true, make(map[int64]bool), val.Ctx.Config.RootDir, "", 0,
simapp.EmptyAppOptions{},
EmptyWasmOpts,
baseapp.SetMinGasPrices(val.AppConfig.MinGasPrices),
Expand Down
4 changes: 2 additions & 2 deletions app/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func getDefaultGenesisStateBytes() []byte {
// SetupWithCustomHome initializes a new OsmosisApp with a custom home directory
func SetupWithCustomHome(isCheckTx bool, dir string) *OsmosisApp {
db := dbm.NewMemDB()
app := NewOsmosisApp(log.NewNopLogger(), db, nil, true, map[int64]bool{}, dir, 0, simapp.EmptyAppOptions{}, EmptyWasmOpts)
app := NewOsmosisApp(log.NewNopLogger(), db, nil, true, map[int64]bool{}, dir, "", 0, simapp.EmptyAppOptions{}, EmptyWasmOpts)
if !isCheckTx {
stateBytes := getDefaultGenesisStateBytes()

Expand Down Expand Up @@ -61,7 +61,7 @@ func SetupTestingAppWithLevelDb(isCheckTx bool) (app *OsmosisApp, cleanupFn func
if err != nil {
panic(err)
}
app = NewOsmosisApp(log.NewNopLogger(), db, nil, true, map[int64]bool{}, DefaultNodeHome, 5, simapp.EmptyAppOptions{}, EmptyWasmOpts)
app = NewOsmosisApp(log.NewNopLogger(), db, nil, true, map[int64]bool{}, DefaultNodeHome, "", 5, simapp.EmptyAppOptions{}, EmptyWasmOpts)
if !isCheckTx {
genesisState := NewDefaultGenesisState()
stateBytes, err := json.MarshalIndent(genesisState, "", " ")
Expand Down
6 changes: 5 additions & 1 deletion cmd/osmosisd/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ var (
testnetId = "osmo-test-5"
)

const FlagWithEmitter = "with-emitter"

func loadAssetList(initClientCtx client.Context, cmd *cobra.Command, basedenomToIBC, IBCtoBasedenom bool) (map[string]DenomUnitMap, map[string]string) {
var assetList AssetList

Expand Down Expand Up @@ -505,6 +507,7 @@ func initRootCmd(rootCmd *cobra.Command, encodingConfig params.EncodingConfig) {
func addModuleInitFlags(startCmd *cobra.Command) {
crisis.AddModuleInitFlags(startCmd)
wasm.AddModuleInitFlags(startCmd)
startCmd.Flags().String(FlagWithEmitter, "", "Enable data indexing to a message queue")
}

// queryCommand adds transaction and account querying commands.
Expand Down Expand Up @@ -595,6 +598,7 @@ func newApp(logger log.Logger, db dbm.DB, traceStore io.Writer, appOpts serverty
return osmosis.NewOsmosisApp(
logger, db, traceStore, true, skipUpgradeHeights,
cast.ToString(appOpts.Get(flags.FlagHome)),
cast.ToString(appOpts.Get(FlagWithEmitter)),
cast.ToUint(appOpts.Get(server.FlagInvCheckPeriod)),
appOpts,
wasmOpts,
Expand All @@ -620,7 +624,7 @@ func createOsmosisAppAndExport(
encCfg.Marshaler = codec.NewProtoCodec(encCfg.InterfaceRegistry)
loadLatest := height == -1
homeDir := cast.ToString(appOpts.Get(flags.FlagHome))
app := osmosis.NewOsmosisApp(logger, db, traceStore, loadLatest, map[int64]bool{}, homeDir, 0, appOpts, osmosis.EmptyWasmOpts)
app := osmosis.NewOsmosisApp(logger, db, traceStore, loadLatest, map[int64]bool{}, homeDir, "", 0, appOpts, osmosis.EmptyWasmOpts)

if !loadLatest {
if err := app.LoadHeight(height); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions flusher/.idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions flusher/.idea/flusher.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions flusher/.idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions flusher/.idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions flusher/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM python:3.8-slim

COPY . .
RUN apt-get update && apt-get install -y libpq-dev gcc
RUN pip install -r requirements.txt

CMD python main.py sync --db postgres:[email protected]:5432/localosmosis -s 172.18.0.31:9092
155 changes: 155 additions & 0 deletions flusher/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Flusher

## About

**Flusher** is a simple program implemented in Python to consume Kafka messages from a queue and load them into the
Postgres database

## Setup instructions for Ubuntu users

For Ubuntu users, to install dependencies including Java, Postgres and Kafka

```shell
sudo apt update
sudo apt upgrade --yes

sudo apt install default-jdk
cd $HOME
curl "https://downloads.apache.org/kafka/3.3.2/kafka_2.13-3.3.2.tgz" -o kafka.tgz
mkdir $HOME/kafka && cd $HOME/kafka
tar -xvzf $HOME/kafka.tgz --strip 1
```

Config the downloaded zookeeper and kafka. Edit the file

```shell
vim $HOME/kafka/config/server.properties
```

With the following values

```toml
log.dirs=/home/ubuntu/kafka/logs
delete.topic.enable = true
message.max.bytes = 52428800
replica.fetch.max.bytes = 52428800
```

Next, let's create a system service for zookeeper. Create the service file

```shell
sudo vim /etc/systemd/system/zookeeper.service
```

With the following code

```shell
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=ubuntu
ExecStart=/home/ubuntu/kafka/bin/zookeeper-server-start.sh /home/ubuntu/kafka/config/zookeeper.properties
ExecStop=/home/ubuntu/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target
```

In the similar manner, create a service for kafka

```shell
sudo vim /etc/systemd/system/kafka.service
```

With the following code

```shell
[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=ubuntu
ExecStart=/bin/sh -c '/home/ubuntu/kafka/bin/kafka-server-start.sh /home/ubuntu/kafka/config/server.properties > /home/ubuntu/kafka/kafka.log 2>&1'
ExecStop=/home/ubuntu/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target
```

Finally, we can start both services

```shell
sudo systemctl enable zookeeper
sudo systemctl enable kafka
sudo systemctl daemon-reload

sudo systemctl start zookeeper
sudo systemctl start kafka
```

## Setup instructions for macOS users

For macOS users, to install dependencies including Java, Postgres and Kafka

```shell
brew cask install java
brew install postgresql
brew install kafka
```
Start the zookeeper service to provide an in-sync view of Kafka cluster, topics and messages

```shell
brew services start zookeeper
```

Start the postgresql service to provide the database server

```shell
brew services start postgresql
```

Start the kafka service to provide a message queue for the indexer node

```shell
brew services start kafka
```

Now make sure python3 venv is installed

```shell
python3 -m pip install --user virtualenv
```

Then run this command to activate the virtual environment and install python dependencies

```shell
python3 -m venv venv && source venv/bin/activate
pip install -r requirements.txt
```
Note that if you encounter an openssl problem while installing dependencies, simply run

```shell
brew install openssl && export LIBRARY_PATH=$LIBRARY_PATH:/usr/local/opt/openssl/lib/
```

## Running Flusher

Before running the flusher, do not forget to activate the python virtual environment

```shell
cd flusher
source venv/bin/activate
```

And use this command to start flusher

```shell
python3 main.py sync --db <DB_URL>
```
2 changes: 2 additions & 0 deletions flusher/flusher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
import flusher.init
import flusher.sync
7 changes: 7 additions & 0 deletions flusher/flusher/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import click


@click.group()
def cli():
"""flusher utility program."""
pass
Loading

0 comments on commit 5c7fe81

Please sign in to comment.