From 3bcb1980899c581358952b980dd97ea255adcd20 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Tue, 27 Sep 2022 14:05:01 +0800 Subject: [PATCH] Problem: there's no compact historical state storage Closes: #704 Solution: - Integration version store and streaming service. multiple backends db size benchmark support mdbx backend through tm-db store latest version support GetLatestVersion query multistore test versiondb streamer fix lint fix state listener temp fix state listening optimize common case fix lint try to fix mdbx build update cosmos-sdk fix clone append add test case fix subkey problem in history index revert chunking, hard to work with variable length key support iterator check future height fix lint new state listener fix latest state in query fix integration test fix prune node test update dependency add utility to read from file streamer Update versiondb/multistore.go Signed-off-by: yihuang add unit test create common backend test cases update dependency update with new file streamer format Problem: python3.10 is not used in integration tests Solution: - start using python3.10, prepare to later PRs which need the new features - update nixpkgs nesserary for the nix stuff to work. python-roaring64 remove debug log add test cases, improve coverage --- Makefile | 4 + app/app.go | 45 ++- default.nix | 3 +- go.mod | 19 +- gomod2nix.toml | 51 ++- integration_tests/configs/default.jsonnet | 2 +- integration_tests/configs/pruned-node.jsonnet | 2 + .../configs/state_benchmark.jsonnet | 36 ++ integration_tests/conftest.py | 22 ++ .../contracts/contracts/BenchmarkStorage.sol | 15 + integration_tests/pyproject.toml | 5 +- integration_tests/test_benchmark_storage.py | 53 +++ integration_tests/test_streamer.py | 57 +-- integration_tests/utils.py | 1 + integration_tests/versiondb.py | 86 +++++ nix/testenv.nix | 10 +- scripts/cronos-devnet.yaml | 2 +- versiondb/backend_test_utils.go | 276 ++++++++++++++ versiondb/dbutils.go | 84 +++++ versiondb/multistore.go | 135 +++++++ versiondb/store.go | 94 +++++ versiondb/streaming_service.go | 84 +++++ versiondb/sync.go | 31 ++ versiondb/sync_test.go | 340 ++++++++++++++++++ versiondb/tmdb/history.go | 67 ++++ versiondb/tmdb/iterator.go | 183 ++++++++++ versiondb/tmdb/store.go | 266 ++++++++++++++ versiondb/tmdb/store_test.go | 14 + versiondb/types.go | 21 ++ 29 files changed, 1944 insertions(+), 64 deletions(-) create mode 100644 integration_tests/configs/state_benchmark.jsonnet create mode 100644 integration_tests/contracts/contracts/BenchmarkStorage.sol create mode 100644 integration_tests/test_benchmark_storage.py create mode 100644 integration_tests/versiondb.py create mode 100644 versiondb/backend_test_utils.go create mode 100644 versiondb/dbutils.go create mode 100644 versiondb/multistore.go create mode 100644 versiondb/store.go create mode 100644 versiondb/streaming_service.go create mode 100644 versiondb/sync.go create mode 100644 versiondb/sync_test.go create mode 100644 versiondb/tmdb/history.go create mode 100644 versiondb/tmdb/iterator.go create mode 100644 versiondb/tmdb/store.go create mode 100644 versiondb/tmdb/store_test.go create mode 100644 versiondb/types.go diff --git a/Makefile b/Makefile index 94ea62b5e1..e92f965247 100644 --- a/Makefile +++ b/Makefile @@ -74,6 +74,10 @@ ifeq (boltdb,$(findstring boltdb,$(COSMOS_BUILD_OPTIONS))) BUILD_TAGS += boltdb endif +ifeq (mdbx,$(findstring mdbx,$(COSMOS_BUILD_OPTIONS))) + BUILD_TAGS += mdbx +endif + ifeq (,$(findstring nostrip,$(COSMOS_BUILD_OPTIONS))) ldflags += -w -s endif diff --git a/app/app.go b/app/app.go index 7cd77b35e4..8d423a3a44 100644 --- a/app/app.go +++ b/app/app.go @@ -5,12 +5,15 @@ import ( "net/http" "os" "path/filepath" + "strings" "sync" + "github.com/crypto-org-chain/cronos/x/cronos" "github.com/crypto-org-chain/cronos/x/cronos/middleware" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec/types" + "github.com/cosmos/cosmos-sdk/server" "github.com/gorilla/mux" "github.com/rakyll/statik/fs" "github.com/spf13/cast" @@ -122,7 +125,8 @@ import ( // this line is used by starport scaffolding # stargate/app/moduleImport cronosappclient "github.com/crypto-org-chain/cronos/client" - "github.com/crypto-org-chain/cronos/x/cronos" + "github.com/crypto-org-chain/cronos/versiondb" + "github.com/crypto-org-chain/cronos/versiondb/tmdb" cronosclient "github.com/crypto-org-chain/cronos/x/cronos/client" cronoskeeper "github.com/crypto-org-chain/cronos/x/cronos/keeper" evmhandlers "github.com/crypto-org-chain/cronos/x/cronos/keeper/evmhandlers" @@ -350,7 +354,8 @@ func New( // configure state listening capabilities using AppOptions // we are doing nothing with the returned streamingServices and waitGroup in this case // Only support file streamer right now. - if cast.ToString(appOpts.Get(cronosappclient.FlagStreamers)) == "file" { + streamers := cast.ToString(appOpts.Get(cronosappclient.FlagStreamers)) + if strings.Contains(streamers, "file") { streamingDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data", FileStreamerDirectory) if err := os.MkdirAll(streamingDir, os.ModePerm); err != nil { panic(err) @@ -361,7 +366,7 @@ func New( for _, storeKey := range keys { exposeStoreKeys = append(exposeStoreKeys, storeKey) } - service, err := file.NewStreamingService(streamingDir, "", exposeStoreKeys, appCodec) + service, err := file.NewStreamingService(streamingDir, "", exposeStoreKeys, appCodec, false) if err != nil { panic(err) } @@ -373,6 +378,40 @@ func New( } } + if strings.Contains(streamers, "versiondb") { + rootDir := cast.ToString(appOpts.Get(flags.FlagHome)) + dataDir := filepath.Join(rootDir, "data", "versiondb") + if err := os.MkdirAll(dataDir, os.ModePerm); err != nil { + panic(err) + } + backendType := server.GetAppDBBackend(appOpts) + plainDB, err := dbm.NewDB("plain", backendType, dataDir) + if err != nil { + panic(err) + } + historyDB, err := dbm.NewDB("history", backendType, dataDir) + if err != nil { + panic(err) + } + changesetDB, err := dbm.NewDB("changeset", backendType, dataDir) + if err != nil { + panic(err) + } + versionDB := tmdb.NewStore(plainDB, historyDB, changesetDB) + + // default to exposing all + exposeStoreKeys := make([]storetypes.StoreKey, 0, len(keys)) + for _, storeKey := range keys { + exposeStoreKeys = append(exposeStoreKeys, storeKey) + } + service := versiondb.NewStreamingService(versionDB, exposeStoreKeys) + bApp.SetStreamingService(service) + qms := versiondb.NewMultiStore(versionDB, exposeStoreKeys) + qms.MountTransientStores(tkeys) + qms.MountMemoryStores(memKeys) + bApp.SetQueryMultiStore(qms) + } + app := &App{ BaseApp: bApp, cdc: cdc, diff --git a/default.nix b/default.nix index fcbab2154a..846e83eea0 100644 --- a/default.nix +++ b/default.nix @@ -8,7 +8,7 @@ let version = "v0.9.0"; pname = "cronosd"; - tags = [ "ledger" "netgo" network ] + tags = [ "ledger" "netgo" network "mdbx" ] ++ lib.lists.optionals (rocksdb != null) [ "rocksdb" "rocksdb_build" ]; ldflags = lib.concatStringsSep "\n" ([ "-X github.com/cosmos/cosmos-sdk/version.Name=cronos" @@ -27,6 +27,7 @@ buildGoApplication rec { "!/app/" "!/cmd/" "!/client/" + "!/versiondb/" "!go.mod" "!go.sum" "!gomod2nix.toml" diff --git a/go.mod b/go.mod index 17476fe929..40fa2911d7 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.18 require ( cosmossdk.io/math v1.0.0-beta.3 + github.com/RoaringBitmap/roaring v1.2.1 github.com/armon/go-metrics v0.4.1 github.com/cosmos/cosmos-sdk v0.46.3 github.com/cosmos/ibc-go/v5 v5.0.0 @@ -45,10 +46,12 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d // indirect github.com/bgentry/speakeasy v0.1.0 // indirect + github.com/bits-and-blooms/bitset v1.2.0 // indirect github.com/btcsuite/btcd v0.22.1 // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 // indirect github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce // indirect + github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b // indirect github.com/cenkalti/backoff/v4 v4.1.3 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect @@ -69,9 +72,8 @@ require ( github.com/deckarep/golang-set v1.8.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect - github.com/dgraph-io/badger/v2 v2.2007.4 // indirect + github.com/dgraph-io/badger/v3 v3.2103.2 // indirect github.com/dgraph-io/ristretto v0.1.0 // indirect - github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91 // indirect github.com/dop251/goja v0.0.0-20220405120441-9037c2b61cbf // indirect github.com/dustin/go-humanize v1.0.0 // indirect @@ -91,7 +93,8 @@ require ( github.com/golang/glog v1.0.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/google/btree v1.0.1 // indirect + github.com/google/btree v1.1.2 // indirect + github.com/google/flatbuffers v2.0.0+incompatible // indirect github.com/google/go-cmp v0.5.8 // indirect github.com/google/orderedcode v0.0.1 // indirect github.com/google/uuid v1.3.0 // indirect @@ -135,7 +138,9 @@ require ( github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-testing-interface v1.0.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/mschoch/smat v0.2.0 // indirect github.com/mtibben/percent v0.2.1 // indirect + github.com/natefinch/atomic v1.0.1 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.0.5 // indirect @@ -165,6 +170,7 @@ require ( github.com/tendermint/go-amino v0.16.0 // indirect github.com/tklauser/go-sysconf v0.3.10 // indirect github.com/tklauser/numcpus v0.4.0 // indirect + github.com/torquem-ch/mdbx-go v0.26.0 // indirect github.com/tyler-smith/go-bip39 v1.1.0 // indirect github.com/ulikunitz/xz v0.5.8 // indirect github.com/zondax/hid v0.9.1-0.20220302062450-5552068d2266 // indirect @@ -189,7 +195,7 @@ require ( ) replace ( - github.com/cosmos/cosmos-sdk => github.com/cosmos/cosmos-sdk v0.46.2 + github.com/cosmos/cosmos-sdk => github.com/yihuang/cosmos-sdk v0.43.0-beta1.0.20221014023203-2c6b9d06b12d github.com/ethereum/go-ethereum => github.com/ethereum/go-ethereum v1.10.19 // Fix upstream GHSA-h395-qcrw-5vmq vulnerability. @@ -199,6 +205,11 @@ replace ( github.com/peggyjv/gravity-bridge/module/v2 => github.com/crypto-org-chain/gravity-bridge/module/v2 v2.0.1-0.20221027085649-2107c6bd6bc4 + // https://github.com/tendermint/tm-db/pull/297 + github.com/tendermint/tm-db => github.com/yihuang/tm-db v0.0.0-20221006023748-f6214ae9454d + + github.com/torquem-ch/mdbx-go => github.com/yihuang/mdbx-go v0.0.0-20221010042614-b72b4f091d88 + // TODO: remove after fixed https://github.com/cosmos/cosmos-sdk/issues/11364 github.com/zondax/hid => github.com/zondax/hid v0.9.0 ) diff --git a/gomod2nix.toml b/gomod2nix.toml index a05555ba67..b98b5ed14b 100644 --- a/gomod2nix.toml +++ b/gomod2nix.toml @@ -31,6 +31,9 @@ schema = 3 [mod."github.com/ChainSafe/go-schnorrkel"] version = "v0.0.0-20200405005733-88cbf1b4c40d" hash = "sha256-i8RXZemJGlSjBT35oPm0SawFiBoIU5Pkq5xp4n/rzCY=" + [mod."github.com/RoaringBitmap/roaring"] + version = "v1.2.1" + hash = "sha256-0/R956wrCW71eOE36CbxGJJRuQjKwvvIQ/D8QTn2A6w=" [mod."github.com/StackExchange/wmi"] version = "v1.2.1" hash = "sha256-1BoEeWAWyebH+1mMuyPhWZut8nWHb6r73MgcqlGuUEY=" @@ -58,6 +61,9 @@ schema = 3 [mod."github.com/bgentry/speakeasy"] version = "v0.1.0" hash = "sha256-Gt1vj6CFovLnO6wX5u2O4UfecY9V2J9WGw1ez4HMrgk=" + [mod."github.com/bits-and-blooms/bitset"] + version = "v1.2.0" + hash = "sha256-IxNmtELycM+XVzg4qBv04hAJUT3nSWuyP9R+8zc9LmU=" [mod."github.com/btcsuite/btcd"] version = "v0.22.1" hash = "sha256-hBU+roIELcmbW2Gz7eGZzL9qNA1bakq5wNxqCgs4TKc=" @@ -70,6 +76,9 @@ schema = 3 [mod."github.com/btcsuite/btcutil"] version = "v1.0.3-0.20201208143702-a53e38424cce" hash = "sha256-4kasJReFcj25JRHx9dJMct3yDkHqVoHGUx5cu45Msfo=" + [mod."github.com/c2h5oh/datasize"] + version = "v0.0.0-20220606134207-859f65c6625b" + hash = "sha256-1uH+D3w0Y/B3poXm545XGrT4S4c+msTbj7gKgu9pbPM=" [mod."github.com/cenkalti/backoff/v4"] version = "v4.1.3" hash = "sha256-u6MEDopHoTWAZoVvvXOKnAg++xre53YgQx0gmf6t2KU=" @@ -98,12 +107,15 @@ schema = 3 version = "v1.0.0-alpha7" hash = "sha256-2wCH+toTF2A6MfFjOa13muEH5oBCcxAhZEqirNOrBA0=" [mod."github.com/cosmos/cosmos-sdk"] - version = "v0.46.2" - hash = "sha256-Lgn4+Vd5PUUkfHc+lTdK2G6/nymZekFVTe1FxWRqh2w=" - replaced = "github.com/cosmos/cosmos-sdk" + version = "v0.43.0-beta1.0.20221014023203-2c6b9d06b12d" + hash = "sha256-t/QEOJgATr82KFXes+AzNlNL6w2tGHfiwZAE4PdO5GE=" + replaced = "github.com/yihuang/cosmos-sdk" [mod."github.com/cosmos/go-bip39"] version = "v1.0.0" hash = "sha256-Qm2aC2vaS8tjtMUbHmlBSagOSqbduEEDwc51qvQaBmA=" + [mod."github.com/cosmos/gogoproto"] + version = "v1.4.2" + hash = "sha256-hOY+mhPDYWcSYSdth2AW7IONdgicqQir0z/1XrXt9NY=" [mod."github.com/cosmos/gorocksdb"] version = "v1.2.0" hash = "sha256-209TcVuXc5s/TcOvNlaQ1HEJAUDTEK3nxPhs+d8TEcY=" @@ -137,15 +149,12 @@ schema = 3 [mod."github.com/desertbit/timer"] version = "v0.0.0-20180107155436-c41aec40b27f" hash = "sha256-abLOtEcomAqCWLphd2X6WkD/ED764w6sa6unox4BXss=" - [mod."github.com/dgraph-io/badger/v2"] - version = "v2.2007.4" - hash = "sha256-+KwqZJZpViv8S3TqUVvPXrFoMgWFyS3NoLsi4RR5fGk=" + [mod."github.com/dgraph-io/badger/v3"] + version = "v3.2103.2" + hash = "sha256-F6pvsaSKwXOl9RfnUQFqAl6xpCVu9+rthQgOxhKVk1g=" [mod."github.com/dgraph-io/ristretto"] version = "v0.1.0" hash = "sha256-01jneg1+1x8tTfUTBZ+6mHkQaqXVnPYxLJyJhJQcvt4=" - [mod."github.com/dgryski/go-farm"] - version = "v0.0.0-20200201041132-a6ae2369ad13" - hash = "sha256-aOMlPwFY36bLiiIx4HonbCYRAhagk5N6HAWN7Ygif+E=" [mod."github.com/dlclark/regexp2"] version = "v1.4.1-0.20201116162257-a2a8dda75c91" hash = "sha256-VNNMZIc7NkDg3DVLnqeJNM/KZqkkaZu2/HTLBL8X2xE=" @@ -164,6 +173,7 @@ schema = 3 [mod."github.com/ethereum/go-ethereum"] version = "v1.10.19" hash = "sha256-7FPnTGcCb8Xd1QVR+6PmGTaHdTY1mm/8osFTW1JLuG8=" + replaced = "github.com/ethereum/go-ethereum" [mod."github.com/evmos/ethermint"] version = "v0.6.1-0.20221003153722-491c3da7ebd7" hash = "sha256-vnfjk57gYa+F8nn0LByX/B1LV8PY2Jvm8vXV6be4ufc=" @@ -217,8 +227,11 @@ schema = 3 version = "v0.0.4" hash = "sha256-Umx+5xHAQCN/Gi4HbtMhnDCSPFAXSsjVbXd8n5LhjAA=" [mod."github.com/google/btree"] - version = "v1.0.1" - hash = "sha256-1PIeFGgUL4BK/StL/D12pg9bEQ5HfMT/fMLdus4pZTs=" + version = "v1.1.2" + hash = "sha256-K7V2obq3pLM71Mg0vhhHtZ+gtaubwXPQx3xcIyZDCjM=" + [mod."github.com/google/flatbuffers"] + version = "v2.0.0+incompatible" + hash = "sha256-4Db9FdOL60Da4H1+K4Qv02w4omxdsh3uzpmY1vtqHeA=" [mod."github.com/google/go-cmp"] version = "v0.5.8" hash = "sha256-8zkIo+Sr1NXMnj3PNmvjX2sZKnAKWXOFvmnX7D9bwxQ=" @@ -354,9 +367,15 @@ schema = 3 [mod."github.com/mitchellh/mapstructure"] version = "v1.5.0" hash = "sha256-ztVhGQXs67MF8UadVvG72G3ly0ypQW0IRDdOOkjYwoE=" + [mod."github.com/mschoch/smat"] + version = "v0.2.0" + hash = "sha256-DZvUJXjIcta3U+zxzgU3wpoGn/V4lpBY7Xme8aQUi+E=" [mod."github.com/mtibben/percent"] version = "v0.2.1" hash = "sha256-Zj1lpCP6mKQ0UUTMs2By4LC414ou+iJzKkK+eBHfEcc=" + [mod."github.com/natefinch/atomic"] + version = "v1.0.1" + hash = "sha256-fbOVHCwRNI8PFjC4o0YXpKZO0JU2aWTfH5c7WXXKMHg=" [mod."github.com/olekukonko/tablewriter"] version = "v0.0.5" hash = "sha256-/5i70IkH/qSW5KjGzv8aQNKh9tHoz98tqtL0K2DMFn4=" @@ -461,14 +480,19 @@ schema = 3 version = "v0.34.22" hash = "sha256-4p4cpyCWjBbNQUpYN2gDJvnyj+Pov9hw5uRjHrrO++Y=" [mod."github.com/tendermint/tm-db"] - version = "v0.6.7" - hash = "sha256-hl/3RrBrpkk2zA6dmrNlIYKs1/GfqegSscDSkA5Pjlo=" + version = "v0.0.0-20221006023748-f6214ae9454d" + hash = "sha256-mtTVR3f3A9CmcyJBXTeurQuHGiVA5hIzqlxskz1M1qk=" + replaced = "github.com/yihuang/tm-db" [mod."github.com/tklauser/go-sysconf"] version = "v0.3.10" hash = "sha256-Zf2NsgM9+HeM949vCce4HQtSbfUiFpeiQ716yKcFyx4=" [mod."github.com/tklauser/numcpus"] version = "v0.4.0" hash = "sha256-ndE82nOb3agubhEV7aRzEqqTlN4DPbKFHEm2+XZLn8k=" + [mod."github.com/torquem-ch/mdbx-go"] + version = "v0.0.0-20221010042614-b72b4f091d88" + hash = "sha256-HWsrhzSGoYgEUUziQPHEtQGwBQ/upg3/f1fY2NGiC0g=" + replaced = "github.com/yihuang/mdbx-go" [mod."github.com/tyler-smith/go-bip39"] version = "v1.1.0" hash = "sha256-3YhWBtSwRLGwm7vNwqumphZG3uLBW1vwT9QkQ8JuSjU=" @@ -478,6 +502,7 @@ schema = 3 [mod."github.com/zondax/hid"] version = "v0.9.0" hash = "sha256-PvXtxXo/3C+DS9ZeGBlr4zXbIpaYNtMqLzxYhusFXNY=" + replaced = "github.com/zondax/hid" [mod."go.etcd.io/bbolt"] version = "v1.3.6" hash = "sha256-DenVAmyN22xUiivk6fdJp4C9ZnUJXCMDUf8E0goRRV4=" diff --git a/integration_tests/configs/default.jsonnet b/integration_tests/configs/default.jsonnet index 12ca83f47d..23b183f7d5 100644 --- a/integration_tests/configs/default.jsonnet +++ b/integration_tests/configs/default.jsonnet @@ -2,7 +2,7 @@ dotenv: '../../scripts/.env', 'cronos_777-1': { cmd: 'cronosd', - 'start-flags': '--trace --streamers file', + 'start-flags': '--trace --streamers versiondb,file', config: { mempool: { version: 'v1', diff --git a/integration_tests/configs/pruned-node.jsonnet b/integration_tests/configs/pruned-node.jsonnet index 2a3edd147f..6e87b2288b 100644 --- a/integration_tests/configs/pruned-node.jsonnet +++ b/integration_tests/configs/pruned-node.jsonnet @@ -2,6 +2,8 @@ local config = import 'default.jsonnet'; config { 'cronos_777-1'+: { + // don't enable versiondb, since it don't do pruning right now + 'start-flags': '--trace --streamers file', 'app-config'+: { pruning: 'everything', 'state-sync'+: { diff --git a/integration_tests/configs/state_benchmark.jsonnet b/integration_tests/configs/state_benchmark.jsonnet new file mode 100644 index 0000000000..ba8dbf1741 --- /dev/null +++ b/integration_tests/configs/state_benchmark.jsonnet @@ -0,0 +1,36 @@ +local config = import 'default.jsonnet'; + +config { + 'cronos_777-1'+: { + 'start-flags': '--trace --streamers file,versiondb', + 'app-config'+: { + 'app-db-backend': 'rocksdb', + 'state-sync'+: { + 'snapshot-interval': 0, + }, + }, + validators: [ + super.validators[0], + super.validators[1] { + 'app-config'+: { + pruning: 'everything', + }, + }, + ] + super.validators[2:], + genesis+: { + consensus_params+: { + block+: { + max_gas: '163000000', + }, + }, + app_state+: { + feemarket+: { + params+: { + no_base_fee: true, + min_gas_multiplier: '0', + }, + }, + }, + }, + }, +} diff --git a/integration_tests/conftest.py b/integration_tests/conftest.py index 980d4a9627..a496fe7a12 100644 --- a/integration_tests/conftest.py +++ b/integration_tests/conftest.py @@ -13,6 +13,28 @@ def pytest_configure(config): config.addinivalue_line("markers", "slow: marks tests as slow") config.addinivalue_line("markers", "gravity: gravity bridge test cases") + config.addinivalue_line( + "markers", "benchmark: benchmarks, only run if '--run-benchmark' is passed" + ) + + +def pytest_addoption(parser): + parser.addoption( + "--run-benchmark", + action="store_true", + default=False, + help="include benchmark cases", + ) + + +def pytest_collection_modifyitems(config, items): + if config.getoption("--run-benchmark"): + # run benchmarks + return + skip = pytest.mark.skip(reason="need --run-benchmark option to run") + for item in items: + if "benchmark" in item.keywords: + item.add_marker(skip) @pytest.fixture(scope="session") diff --git a/integration_tests/contracts/contracts/BenchmarkStorage.sol b/integration_tests/contracts/contracts/BenchmarkStorage.sol new file mode 100644 index 0000000000..69dcfde451 --- /dev/null +++ b/integration_tests/contracts/contracts/BenchmarkStorage.sol @@ -0,0 +1,15 @@ +pragma solidity 0.8.10; + +contract BenchmarkStorage { + uint seed; + mapping(uint => uint) state; + function random(uint i) private view returns (uint) { + return uint(keccak256(abi.encodePacked(i, seed))); + } + function batch_set(uint _seed, uint n, uint range) public { + seed = _seed; + for (uint i=0; i< n; i++) { + state[random(i) % range] = random(i+i); + } + } +} diff --git a/integration_tests/pyproject.toml b/integration_tests/pyproject.toml index 65ca14c207..54fb3b3f60 100644 --- a/integration_tests/pyproject.toml +++ b/integration_tests/pyproject.toml @@ -5,7 +5,7 @@ description = "" authors = ["chain-dev "] [tool.poetry.dependencies] -python = "^3.8" +python = "^3.10" pytest = "^7.0.1" pytest-github-actions-annotate-failures = "^0.1.1" flake8 = "^4.0.1" @@ -28,6 +28,9 @@ jsonnet = "^0.18.0" eth-account = "^0.7.0" cprotobuf = "^0.1.11" pathspec = "^0.10.1" +rocksdb = { git = "https://github.com/HathorNetwork/python-rocksdb.git", branch = "master" } +click = "^8.1.2" +roaring64 = { git = "https://github.com/yihuang/python-roaring64.git", branch = "main" } [tool.poetry.dev-dependencies] diff --git a/integration_tests/test_benchmark_storage.py b/integration_tests/test_benchmark_storage.py new file mode 100644 index 0000000000..41668a94ac --- /dev/null +++ b/integration_tests/test_benchmark_storage.py @@ -0,0 +1,53 @@ +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path + +import pytest +from web3 import Web3 + +from .network import setup_custom_cronos +from .utils import ( + ACCOUNTS, + CONTRACTS, + deploy_contract, + send_transaction, + w3_wait_for_block, +) + + +@pytest.fixture(scope="module") +def custom_cronos(tmp_path_factory): + path = tmp_path_factory.mktemp("benchmark") + yield from setup_custom_cronos( + path, 26200, Path(__file__).parent / "configs/state_benchmark.jsonnet" + ) + + +@pytest.mark.benchmark +def test_benchmark_storage(custom_cronos): + w3: Web3 = custom_cronos.w3 + w3_wait_for_block(w3, 1) + contract = deploy_contract(w3, CONTRACTS["BenchmarkStorage"]) + + n = 3000 + gas = 81500000 + iterations = 200 + parity = 100 + + def task(acct, acct_i): + for i in range(iterations): + seed = i * 10 + acct_i + tx = contract.functions.batch_set(seed, n, n * parity).buildTransaction( + {"from": acct.address, "gas": gas} + ) + print(send_transaction(w3, tx, acct.key)) + + accounts = [ + ACCOUNTS["validator"], + ACCOUNTS["community"], + ACCOUNTS["signer1"], + ACCOUNTS["signer2"], + ] + with ThreadPoolExecutor(len(accounts)) as exec: + tasks = [exec.submit(task, acct, i) for i, acct in enumerate(accounts)] + for t in tasks: + t.result() diff --git a/integration_tests/test_streamer.py b/integration_tests/test_streamer.py index 249aa346f2..f0128d1e29 100644 --- a/integration_tests/test_streamer.py +++ b/integration_tests/test_streamer.py @@ -13,40 +13,20 @@ class StoreKVPairs(ProtoEntity): value = Field("bytes", 4) -def decode_stream_file(data, body_cls=StoreKVPairs, header_cls=None, footer_cls=None): +def decode_stream_file(data, entry_cls=StoreKVPairs): """ - header, body*, footer + StoreKVPairs, StoreKVPairs, ... """ - header = footer = None - body = [] + items = [] offset = 0 - size, n = decode_primitive(data, "uint64") - offset += n - - # header - if header_cls is not None: - header = header_cls() - header.ParseFromString(data[offset : offset + size]) - offset += size - - while True: + while offset < len(data): size, n = decode_primitive(data[offset:], "uint64") offset += n - if offset + size == len(data): - # footer - if footer_cls is not None: - footer = footer_cls() - footer.ParseFromString(data[offset : offset + size]) - offset += size - break - else: - # body - if body_cls is not None: - item = body_cls() - item.ParseFromString(data[offset : offset + size]) - body.append(item) - offset += size - return header, body, footer + item = entry_cls() + item.ParseFromString(data[offset : offset + size]) + items.append(item) + offset += size + return items def test_streamers(cronos): @@ -55,23 +35,22 @@ def test_streamers(cronos): - try to parse the state change sets """ # inspect the first state change of the first tx in genesis - path = cronos.node_home(0) / "data/file_streamer/block-0-tx-0" - _, body, _ = decode_stream_file(open(path, "rb").read()) + # the InitChainer is committed together with the first block. + path = cronos.node_home(0) / "data/file_streamer/block-1-data" + items = decode_stream_file(open(path, "rb").read()) # creation of the validator account - assert body[0].store_key == "acc" - # the order in gen_txs is undeterministic, could be either one. - assert body[0].key in ( - b"\x01" + HexBytes(ADDRS["validator"]), - b"\x01" + HexBytes(ADDRS["validator2"]), - ) + assert items[0].store_key == "acc" + # the writes are sorted by key, find the minimal address + min_addr = min(ADDRS.values()) + assert items[0].key == b"\x01" + HexBytes(min_addr) if __name__ == "__main__": import binascii import sys - _, body, _ = decode_stream_file(open(sys.argv[1], "rb").read()) - for item in body: + items = decode_stream_file(open(sys.argv[1], "rb").read()) + for item in items: print( item.store_key, item.delete, diff --git a/integration_tests/utils.py b/integration_tests/utils.py index 462d472bfa..a714ff3b89 100644 --- a/integration_tests/utils.py +++ b/integration_tests/utils.py @@ -47,6 +47,7 @@ "TestBlackListERC20": "TestBlackListERC20.sol", "CroBridge": "CroBridge.sol", "CronosGravityCancellation": "CronosGravityCancellation.sol", + "BenchmarkStorage": "BenchmarkStorage.sol", } diff --git a/integration_tests/versiondb.py b/integration_tests/versiondb.py new file mode 100644 index 0000000000..40c0c672e0 --- /dev/null +++ b/integration_tests/versiondb.py @@ -0,0 +1,86 @@ +""" +cli utilities for versiondb +""" +import binascii +from pathlib import Path + +import click +import rocksdb +from cprotobuf import decode_primitive +from roaring64 import BitMap64 + + +def rocksdb_stats(path): + db = rocksdb.DB(str(path), rocksdb.Options()) + for field in ["rocksdb.stats", "rocksdb.sstables"]: + print(f"############# {field}") + print(db.get_property(field.encode()).decode()) + + # space amplification + it = db.iteritems() + it.seek_to_first() + count = 0 + size = 0 + for k, v in it: + count += 1 + size += len(k) + len(v) + # directory size + fsize = sum(f.stat().st_size for f in path.glob("**/*") if f.is_file()) + print( + f"space_amplification: {fsize / size:.2f}, kv pairs: {count}, " + f"data size: {size}, file size: {fsize}" + ) + + +@click.group() +def cli(): + pass + + +@cli.command() +@click.option("--dbpath", help="path of plain db") +def latest_version(dbpath): + db = rocksdb.DB(dbpath, rocksdb.Options()) + bz = db.get(b"s/latest") + # gogoproto std int64, the first byte is field tag + print(decode_primitive(bz[1:], "int64")[0]) + + +@cli.command() +@click.option("--dbpath", help="path of version db") +@click.option("--version", help="version of the value, optional") +@click.argument("store-key") +@click.argument("hex-key") +def get(dbpath, version, store_key, hex_key): + """ + get a value at version + """ + key = f"s/k:{store_key}/".decode() + binascii.unhexlify(hex_key) + plain_db = rocksdb.DB(dbpath + "plain.db", rocksdb.Options()) + if version is None: + v = plain_db.get(key) + else: + version = int(version) + print(binascii.hexlify(v)) + + history_db = rocksdb.DB(dbpath + "history.db", rocksdb.Options()) + bz = history_db.get(key) + bm = BitMap64.deserialize(bz) + + # seek in bitmap + bm.Rank(version) + + +@cli.command() +def sync(path): + pass + + +@cli.command() +@click.option("--dbpath", help="path of rocksdb") +def rocksdbstats(dbpath): + rocksdb_stats(Path(dbpath)) + + +if __name__ == "__main__": + cli() diff --git a/nix/testenv.nix b/nix/testenv.nix index e951eccb7f..0f84ddc388 100644 --- a/nix/testenv.nix +++ b/nix/testenv.nix @@ -1,4 +1,4 @@ -{ poetry2nix, lib, python310 }: +{ poetry2nix, lib, python310, rocksdb }: poetry2nix.mkPoetryEnv { projectDir = ../integration_tests; python = python310; @@ -14,6 +14,9 @@ poetry2nix.mkPoetryEnv { pytest-github-actions-annotate-failures = [ "setuptools" ]; flake8-black = [ "setuptools" ]; multiaddr = [ "setuptools" ]; + rocksdb = [ "setuptools" "cython" "pkgconfig" ]; + pyroaring = [ "setuptools" ]; + roaring64 = [ "poetry" ]; }; in lib.mapAttrs @@ -34,6 +37,11 @@ poetry2nix.mkPoetryEnv { substituteInPlace setup.py --replace "setup()" "setup(version=\"1.3\")" ''; }; + rocksdb = super.rocksdb.overridePythonAttrs ( + old: { + buildInputs = (old.buildInputs or [ ]) ++ [ rocksdb ]; + } + ); }) ]); } diff --git a/scripts/cronos-devnet.yaml b/scripts/cronos-devnet.yaml index 0b9b865e1a..ec2f3c9fe0 100644 --- a/scripts/cronos-devnet.yaml +++ b/scripts/cronos-devnet.yaml @@ -1,7 +1,7 @@ dotenv: .env cronos_777-1: cmd: cronosd - start-flags: "--trace" + start-flags: "--trace --streamers versiondb,file" app-config: minimum-gas-prices: 0basetcro index-events: diff --git a/versiondb/backend_test_utils.go b/versiondb/backend_test_utils.go new file mode 100644 index 0000000000..a5cff0cb00 --- /dev/null +++ b/versiondb/backend_test_utils.go @@ -0,0 +1,276 @@ +package versiondb + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tm-db" + + "github.com/cosmos/cosmos-sdk/store/types" +) + +var ( + key1 = []byte("key1") + key2 = []byte("key2") + value1 = []byte("value1") + value2 = []byte("value2") + value3 = []byte("value3") + key1_subkey = []byte("key1/subkey") +) + +func SetupTestDB(t *testing.T, store VersionStore) { + changeSets := [][]types.StoreKVPair{ + { + {StoreKey: "evm", Key: []byte("delete-in-block2"), Value: []byte("1")}, + {StoreKey: "evm", Key: []byte("re-add-in-block3"), Value: []byte("1")}, + {StoreKey: "evm", Key: []byte("z-genesis-only"), Value: []byte("2")}, + {StoreKey: "evm", Key: []byte("modify-in-block2"), Value: []byte("1")}, + {StoreKey: "staking", Key: []byte("key1"), Value: []byte("value1")}, + {StoreKey: "staking", Key: []byte("key1/subkey"), Value: []byte("value1")}, + }, + { + {StoreKey: "evm", Key: []byte("re-add-in-block3"), Delete: true}, + {StoreKey: "evm", Key: []byte("add-in-block1"), Value: []byte("1")}, + {StoreKey: "staking", Key: []byte("key1"), Delete: true}, + }, + { + {StoreKey: "evm", Key: []byte("add-in-block2"), Value: []byte("1")}, + {StoreKey: "evm", Key: []byte("delete-in-block2"), Delete: true}, + {StoreKey: "evm", Key: []byte("modify-in-block2"), Value: []byte("2")}, + {StoreKey: "evm", Key: []byte("key2"), Delete: true}, + {StoreKey: "staking", Key: []byte("key1"), Value: []byte("value2")}, + }, + { + {StoreKey: "evm", Key: []byte("re-add-in-block3"), Value: []byte("2")}, + }, + { + {StoreKey: "evm", Key: []byte("re-add-in-block3"), Delete: true}, + }, + } + for i, changeSet := range changeSets { + require.NoError(t, store.PutAtVersion(int64(i), changeSet)) + } +} + +func Run(t *testing.T, storeCreator func() VersionStore) { + testBasics(t, storeCreator()) + testIterator(t, storeCreator()) + testHeightInFuture(t, storeCreator()) + + // test delete in genesis + store := storeCreator() + err := store.PutAtVersion(0, []types.StoreKVPair{ + {StoreKey: "evm", Key: []byte{1}, Delete: true}, + }) + require.Error(t, err) +} + +func testBasics(t *testing.T, store VersionStore) { + var v int64 + + SetupTestDB(t, store) + + value, err := store.GetAtVersion("evm", []byte("z-genesis-only"), nil) + require.NoError(t, err) + require.Equal(t, value, []byte("2")) + + v = 4 + ok, err := store.HasAtVersion("evm", []byte("z-genesis-only"), &v) + require.NoError(t, err) + require.True(t, ok) + value, err = store.GetAtVersion("evm", []byte("z-genesis-only"), &v) + require.NoError(t, err) + require.Equal(t, value, []byte("2")) + + value, err = store.GetAtVersion("evm", []byte("re-add-in-block3"), nil) + require.NoError(t, err) + require.Empty(t, value) + + ok, err = store.HasAtVersion("staking", key1, nil) + require.NoError(t, err) + require.True(t, ok) + + value, err = store.GetAtVersion("staking", key1, nil) + require.NoError(t, err) + require.Equal(t, value, []byte("value2")) + + v = 2 + value, err = store.GetAtVersion("staking", key1, &v) + require.NoError(t, err) + require.Equal(t, value, []byte("value2")) + + ok, err = store.HasAtVersion("staking", key1, &v) + require.NoError(t, err) + require.True(t, ok) + + v = 0 + value, err = store.GetAtVersion("staking", key1, &v) + require.NoError(t, err) + require.Equal(t, value, []byte("value1")) + + v = 1 + value, err = store.GetAtVersion("staking", key1, &v) + require.NoError(t, err) + require.Empty(t, value) + + ok, err = store.HasAtVersion("staking", key1, &v) + require.NoError(t, err) + require.False(t, ok) + + v = 0 + value, err = store.GetAtVersion("staking", key1, &v) + require.NoError(t, err) + require.Equal(t, value1, value) + value, err = store.GetAtVersion("staking", key1_subkey, &v) + require.NoError(t, err) + require.Equal(t, value1, value) +} + +type KVPair struct { + Key []byte + Value []byte +} + +func testIterator(t *testing.T, store VersionStore) { + SetupTestDB(t, store) + + expItems := [][]KVPair{ + { + KVPair{[]byte("delete-in-block2"), []byte("1")}, + KVPair{[]byte("modify-in-block2"), []byte("1")}, + KVPair{[]byte("re-add-in-block3"), []byte("1")}, + KVPair{[]byte("z-genesis-only"), []byte("2")}, + }, + { + KVPair{[]byte("add-in-block1"), []byte("1")}, + KVPair{[]byte("delete-in-block2"), []byte("1")}, + KVPair{[]byte("modify-in-block2"), []byte("1")}, + KVPair{[]byte("z-genesis-only"), []byte("2")}, + }, + { + KVPair{[]byte("add-in-block1"), []byte("1")}, + KVPair{[]byte("add-in-block2"), []byte("1")}, + KVPair{[]byte("modify-in-block2"), []byte("2")}, + KVPair{[]byte("z-genesis-only"), []byte("2")}, + }, + { + KVPair{[]byte("add-in-block1"), []byte("1")}, + KVPair{[]byte("add-in-block2"), []byte("1")}, + KVPair{[]byte("modify-in-block2"), []byte("2")}, + KVPair{[]byte("re-add-in-block3"), []byte("2")}, + KVPair{[]byte("z-genesis-only"), []byte("2")}, + }, + { + KVPair{[]byte("add-in-block1"), []byte("1")}, + KVPair{[]byte("add-in-block2"), []byte("1")}, + KVPair{[]byte("modify-in-block2"), []byte("2")}, + KVPair{[]byte("z-genesis-only"), []byte("2")}, + }, + } + for i, exp := range expItems { + t.Run(fmt.Sprintf("block-%d", i), func(t *testing.T) { + v := int64(i) + it, err := store.IteratorAtVersion("evm", nil, nil, &v) + require.NoError(t, err) + require.Equal(t, exp, consumeIterator(it)) + + it, err = store.ReverseIteratorAtVersion("evm", nil, nil, &v) + require.NoError(t, err) + actual := consumeIterator(it) + require.Equal(t, len(exp), len(actual)) + require.Equal(t, reversed(exp), actual) + }) + } + + it, err := store.IteratorAtVersion("evm", nil, nil, nil) + require.Equal(t, expItems[len(expItems)-1], consumeIterator(it)) + + it, err = store.ReverseIteratorAtVersion("evm", nil, nil, nil) + require.Equal(t, reversed(expItems[len(expItems)-1]), consumeIterator(it)) + + // with start parameter + v := int64(2) + it, err = store.IteratorAtVersion("evm", []byte("\xff"), nil, &v) + require.NoError(t, err) + require.Empty(t, consumeIterator(it)) + it, err = store.ReverseIteratorAtVersion("evm", nil, []byte("\x00"), &v) + require.NoError(t, err) + require.Empty(t, consumeIterator(it)) + + it, err = store.IteratorAtVersion("evm", []byte("modify-in-block2"), nil, &v) + require.NoError(t, err) + require.Equal(t, expItems[2][len(expItems[2])-2:], consumeIterator(it)) + + it, err = store.ReverseIteratorAtVersion("evm", nil, []byte("mp"), &v) + require.NoError(t, err) + require.Equal(t, + reversed(expItems[2][:len(expItems[2])-1]), + consumeIterator(it), + ) + + it, err = store.ReverseIteratorAtVersion("evm", nil, []byte("modify-in-block3"), &v) + require.NoError(t, err) + require.Equal(t, + reversed(expItems[2][:len(expItems[2])-1]), + consumeIterator(it), + ) + + // delete the last key, cover some edge cases + v = int64(len(expItems)) + err = store.PutAtVersion( + v, + []types.StoreKVPair{ + {StoreKey: "evm", Key: []byte("z-genesis-only"), Delete: true}, + }, + ) + require.NoError(t, err) + it, err = store.IteratorAtVersion("evm", nil, nil, &v) + require.NoError(t, err) + require.Equal(t, + expItems[v-1][:len(expItems[v-1])-1], + consumeIterator(it), + ) + v -= 1 + it, err = store.IteratorAtVersion("evm", nil, nil, &v) + require.NoError(t, err) + require.Equal(t, + expItems[v], + consumeIterator(it), + ) +} + +func testHeightInFuture(t *testing.T, store VersionStore) { + SetupTestDB(t, store) + + latest, err := store.GetLatestVersion() + require.NoError(t, err) + + v := latest + 1 + _, err = store.GetAtVersion("staking", key1, &v) + require.Error(t, err) + _, err = store.HasAtVersion("staking", key1, &v) + require.Error(t, err) + _, err = store.IteratorAtVersion("staking", nil, nil, &v) + require.Error(t, err) + _, err = store.ReverseIteratorAtVersion("staking", nil, nil, &v) + require.Error(t, err) +} + +func consumeIterator(it dbm.Iterator) []KVPair { + var result []KVPair + for ; it.Valid(); it.Next() { + result = append(result, KVPair{it.Key(), it.Value()}) + } + it.Close() + return result +} + +// reversed clone and reverse the slice +func reversed[S ~[]E, E any](s S) []E { + r := make([]E, len(s)) + for i, j := 0, len(s)-1; i <= j; i, j = i+1, j-1 { + r[i], r[j] = s[j], s[i] + } + return r +} diff --git a/versiondb/dbutils.go b/versiondb/dbutils.go new file mode 100644 index 0000000000..2334d0f2c1 --- /dev/null +++ b/versiondb/dbutils.go @@ -0,0 +1,84 @@ +package versiondb + +import ( + "encoding/binary" + "sort" + + "github.com/RoaringBitmap/roaring/roaring64" +) + +var ChunkLimit = uint64(1950) // threshold beyond which MDBX overflow pages appear: 4096 / 2 - (keySize + 8) + +// CutLeft - cut from bitmap `targetSize` bytes from left +// removing lft part from `bm` +// returns nil on zero cardinality +func CutLeft64(bm *roaring64.Bitmap, sizeLimit uint64) *roaring64.Bitmap { + if bm.GetCardinality() == 0 { + return nil + } + + sz := bm.GetSerializedSizeInBytes() + if sz <= sizeLimit { + lft := roaring64.New() + lft.AddRange(bm.Minimum(), bm.Maximum()+1) + lft.And(bm) + lft.RunOptimize() + bm.Clear() + return lft + } + + from := bm.Minimum() + minMax := bm.Maximum() - bm.Minimum() + to := sort.Search(int(minMax), func(i int) bool { // can be optimized to avoid "too small steps", but let's leave it for readability + lft := roaring64.New() // bitmap.Clear() method intentionally not used here, because then serialized size of bitmap getting bigger + lft.AddRange(from, from+uint64(i)+1) + lft.And(bm) + lft.RunOptimize() + return lft.GetSerializedSizeInBytes() > sizeLimit + }) + + lft := roaring64.New() + lft.AddRange(from, from+uint64(to)) // no +1 because sort.Search returns element which is just higher threshold - but we need lower + lft.And(bm) + bm.RemoveRange(from, from+uint64(to)) + lft.RunOptimize() + return lft +} + +func WalkChunks64(bm *roaring64.Bitmap, sizeLimit uint64, f func(chunk *roaring64.Bitmap, isLast bool) error) error { + for bm.GetCardinality() > 0 { + if err := f(CutLeft64(bm, sizeLimit), bm.GetCardinality() == 0); err != nil { + return err + } + } + return nil +} + +func WalkChunkWithKeys64(k []byte, m *roaring64.Bitmap, sizeLimit uint64, f func(chunkKey []byte, chunk *roaring64.Bitmap) error) error { + return WalkChunks64(m, sizeLimit, func(chunk *roaring64.Bitmap, isLast bool) error { + chunkKey := make([]byte, len(k)+8) + copy(chunkKey, k) + if isLast { + binary.BigEndian.PutUint64(chunkKey[len(k):], ^uint64(0)) + } else { + binary.BigEndian.PutUint64(chunkKey[len(k):], chunk.Maximum()) + } + return f(chunkKey, chunk) + }) +} + +// SeekInBitmap64 - returns value in bitmap which is >= n +func SeekInBitmap64(m *roaring64.Bitmap, n uint64) (found uint64, ok bool) { + if m == nil || m.IsEmpty() { + return 0, false + } + if n == 0 { + return m.Minimum(), true + } + searchRank := m.Rank(n - 1) + if searchRank >= m.GetCardinality() { + return 0, false + } + found, _ = m.Select(searchRank) + return found, true +} diff --git a/versiondb/multistore.go b/versiondb/multistore.go new file mode 100644 index 0000000000..4871558fd9 --- /dev/null +++ b/versiondb/multistore.go @@ -0,0 +1,135 @@ +package versiondb + +import ( + "io" + "sync" + + "github.com/cosmos/cosmos-sdk/store/cachemulti" + "github.com/cosmos/cosmos-sdk/store/mem" + "github.com/cosmos/cosmos-sdk/store/transient" + "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" +) + +var _ sdk.MultiStore = (*MultiStore)(nil) + +type MultiStore struct { + versionDB VersionStore + storeKeys []types.StoreKey + + // transient or memory stores + transientStores map[types.StoreKey]types.KVStore + + traceWriter io.Writer + traceContext types.TraceContext + traceContextMutex sync.Mutex +} + +func NewMultiStore(versionDB VersionStore, storeKeys []types.StoreKey) *MultiStore { + return &MultiStore{versionDB: versionDB, storeKeys: storeKeys, transientStores: make(map[types.StoreKey]types.KVStore)} +} + +func (s *MultiStore) GetStoreType() types.StoreType { + return types.StoreTypeMulti +} + +func (s *MultiStore) cacheMultiStore(version *int64) sdk.CacheMultiStore { + stores := make(map[types.StoreKey]types.CacheWrapper, len(s.transientStores)+len(s.storeKeys)) + for k, v := range s.transientStores { + stores[k] = v + } + for _, k := range s.storeKeys { + stores[k] = NewKVStore(s.versionDB, k, version) + } + return cachemulti.NewStore(nil, stores, nil, s.traceWriter, s.getTracingContext()) +} + +func (s *MultiStore) CacheMultiStore() sdk.CacheMultiStore { + return s.cacheMultiStore(nil) +} + +func (s *MultiStore) CacheMultiStoreWithVersion(version int64) (sdk.CacheMultiStore, error) { + return s.cacheMultiStore(&version), nil +} + +// CacheWrap implements CacheWrapper/MultiStore/CommitStore. +func (s *MultiStore) CacheWrap() types.CacheWrap { + return s.CacheMultiStore().(types.CacheWrap) +} + +// CacheWrapWithTrace implements the CacheWrapper interface. +func (s *MultiStore) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.CacheWrap { + return s.CacheWrap() +} + +func (s *MultiStore) GetStore(storeKey types.StoreKey) sdk.Store { + return s.GetKVStore(storeKey) +} + +func (s *MultiStore) GetKVStore(storeKey types.StoreKey) sdk.KVStore { + store, ok := s.transientStores[storeKey] + if ok { + return store + } + return NewKVStore(s.versionDB, storeKey, nil) +} + +func (s *MultiStore) MountTransientStores(keys map[string]*types.TransientStoreKey) { + for _, key := range keys { + s.transientStores[key] = transient.NewStore() + } +} + +func (s *MultiStore) MountMemoryStores(keys map[string]*types.MemoryStoreKey) { + for _, key := range keys { + s.transientStores[key] = mem.NewStore() + } +} + +// SetTracer sets the tracer for the MultiStore that the underlying +// stores will utilize to trace operations. A MultiStore is returned. +func (s *MultiStore) SetTracer(w io.Writer) types.MultiStore { + s.traceWriter = w + return s +} + +// SetTracingContext updates the tracing context for the MultiStore by merging +// the given context with the existing context by key. Any existing keys will +// be overwritten. It is implied that the caller should update the context when +// necessary between tracing operations. It returns a modified MultiStore. +func (s *MultiStore) SetTracingContext(tc types.TraceContext) types.MultiStore { + s.traceContextMutex.Lock() + defer s.traceContextMutex.Unlock() + s.traceContext = s.traceContext.Merge(tc) + + return s +} + +func (s *MultiStore) getTracingContext() types.TraceContext { + s.traceContextMutex.Lock() + defer s.traceContextMutex.Unlock() + + if s.traceContext == nil { + return nil + } + + ctx := types.TraceContext{} + for k, v := range s.traceContext { + ctx[k] = v + } + + return ctx +} + +// TracingEnabled returns if tracing is enabled for the MultiStore. +func (s *MultiStore) TracingEnabled() bool { + return s.traceWriter != nil +} + +func (s *MultiStore) LatestVersion() int64 { + version, err := s.versionDB.GetLatestVersion() + if err != nil { + panic(err) + } + return version +} diff --git a/versiondb/store.go b/versiondb/store.go new file mode 100644 index 0000000000..906423fb9e --- /dev/null +++ b/versiondb/store.go @@ -0,0 +1,94 @@ +package versiondb + +import ( + "io" + "time" + + "github.com/cosmos/cosmos-sdk/store/cachekv" + "github.com/cosmos/cosmos-sdk/store/listenkv" + "github.com/cosmos/cosmos-sdk/store/tracekv" + "github.com/cosmos/cosmos-sdk/store/types" + "github.com/cosmos/cosmos-sdk/telemetry" +) + +var _ types.KVStore = (*Store)(nil) + +// Store Implements types.KVStore +type Store struct { + store VersionStore + storeKey types.StoreKey + version *int64 +} + +func NewKVStore(store VersionStore, storeKey types.StoreKey, version *int64) *Store { + return &Store{store, storeKey, version} +} + +// Implements Store. +func (st *Store) GetStoreType() types.StoreType { + // FIXME + return types.StoreTypeIAVL +} + +// Implements Store. +func (st *Store) CacheWrap() types.CacheWrap { + return cachekv.NewStore(st) +} + +// CacheWrapWithTrace implements the Store interface. +func (st *Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.CacheWrap { + return cachekv.NewStore(tracekv.NewStore(st, w, tc)) +} + +// CacheWrapWithListeners implements the CacheWrapper interface. +func (st *Store) CacheWrapWithListeners(storeKey types.StoreKey, listeners []types.WriteListener) types.CacheWrap { + return cachekv.NewStore(listenkv.NewStore(st, storeKey, listeners)) +} + +// Implements types.KVStore. +func (st *Store) Get(key []byte) []byte { + defer telemetry.MeasureSince(time.Now(), "store", "iavl", "get") + value, err := st.store.GetAtVersion(st.storeKey.Name(), key, st.version) + if err != nil { + panic(err) + } + return value +} + +// Implements types.KVStore. +func (st *Store) Has(key []byte) (exists bool) { + defer telemetry.MeasureSince(time.Now(), "store", "iavl", "has") + has, err := st.store.HasAtVersion(st.storeKey.Name(), key, st.version) + if err != nil { + panic(err) + } + return has +} + +// Implements types.KVStore. +func (st *Store) Iterator(start, end []byte) types.Iterator { + itr, err := st.store.IteratorAtVersion(st.storeKey.Name(), start, end, st.version) + if err != nil { + panic(err) + } + return itr +} + +// Implements types.KVStore. +func (st *Store) ReverseIterator(start, end []byte) types.Iterator { + itr, err := st.store.ReverseIteratorAtVersion(st.storeKey.Name(), start, end, st.version) + if err != nil { + panic(err) + } + return itr +} + +// Implements types.KVStore. +func (st *Store) Set(key, value []byte) { + panic("write operation is not supported") +} + +// Implements types.KVStore. +func (st *Store) Delete(key []byte) { + panic("write operation is not supported") +} diff --git a/versiondb/streaming_service.go b/versiondb/streaming_service.go new file mode 100644 index 0000000000..e56aef0e18 --- /dev/null +++ b/versiondb/streaming_service.go @@ -0,0 +1,84 @@ +package versiondb + +import ( + "sort" + "strings" + "sync" + + abci "github.com/tendermint/tendermint/abci/types" + + "github.com/cosmos/cosmos-sdk/baseapp" + "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" +) + +var _ baseapp.StreamingService = &StreamingService{} + +// StreamingService is a concrete implementation of StreamingService that accumulate the state changes in current block, +// writes the ordered changeset out to version storage. +type StreamingService struct { + listeners []*types.MemoryListener // the listeners that will be initialized with BaseApp + versionStore VersionStore + currentBlockNumber int64 // the current block number +} + +// NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys +func NewStreamingService(versionStore VersionStore, storeKeys []types.StoreKey) *StreamingService { + // sort by the storeKeys first + sort.SliceStable(storeKeys, func(i, j int) bool { + return strings.Compare(storeKeys[i].Name(), storeKeys[j].Name()) < 0 + }) + + listeners := make([]*types.MemoryListener, len(storeKeys)) + for i, key := range storeKeys { + listeners[i] = types.NewMemoryListener(key) + } + return &StreamingService{listeners, versionStore, 0} +} + +// Listeners satisfies the baseapp.StreamingService interface +func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListener { + listeners := make(map[types.StoreKey][]types.WriteListener, len(fss.listeners)) + for _, listener := range fss.listeners { + listeners[listener.StoreKey()] = []types.WriteListener{listener} + } + return listeners +} + +// ListenBeginBlock satisfies the baseapp.ABCIListener interface +// It sets the currentBlockNumber. +func (fss *StreamingService) ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { + fss.currentBlockNumber = req.GetHeader().Height + return nil +} + +// ListenDeliverTx satisfies the baseapp.ABCIListener interface +func (fss *StreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error { + return nil +} + +// ListenEndBlock satisfies the baseapp.ABCIListener interface +// It merge the state caches of all the listeners together, and write out to the versionStore. +func (fss *StreamingService) ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error { + return nil +} + +func (fss *StreamingService) ListenCommit(ctx sdk.Context, res abci.ResponseCommit) error { + // concat the state caches + var changeSet []types.StoreKVPair + for _, listener := range fss.listeners { + changeSet = append(changeSet, listener.PopStateCache()...) + } + + return fss.versionStore.PutAtVersion(fss.currentBlockNumber, changeSet) +} + +// Stream satisfies the baseapp.StreamingService interface +func (fss *StreamingService) Stream(wg *sync.WaitGroup) error { + return nil +} + +// Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface +func (fss *StreamingService) Close() error { + return nil +} diff --git a/versiondb/sync.go b/versiondb/sync.go new file mode 100644 index 0000000000..4c89590e4c --- /dev/null +++ b/versiondb/sync.go @@ -0,0 +1,31 @@ +package versiondb + +import ( + "bufio" + "io" + + protoio "github.com/gogo/protobuf/io" + + "github.com/cosmos/cosmos-sdk/store/types" +) + +const maxItemSize = 64000000 // SDK has no key/value size limit, so we set an arbitrary limit + +// ReadFileStreamer parse a binary stream dumped by file streamer to changeset, +// which can be feeded to version store. +func ReadFileStreamer(input *bufio.Reader) ([]types.StoreKVPair, error) { + var changeSet []types.StoreKVPair + reader := protoio.NewDelimitedReader(input, maxItemSize) + for { + var msg types.StoreKVPair + err := reader.ReadMsg(&msg) + if err != nil { + if err == io.EOF { + break + } + return nil, err + } + changeSet = append(changeSet, msg) + } + return changeSet, nil +} diff --git a/versiondb/sync_test.go b/versiondb/sync_test.go new file mode 100644 index 0000000000..85498ee07f --- /dev/null +++ b/versiondb/sync_test.go @@ -0,0 +1,340 @@ +package versiondb + +import ( + "bufio" + "bytes" + "encoding/hex" + "strings" + "testing" + + "github.com/cosmos/cosmos-sdk/store/types" + "github.com/stretchr/testify/require" +) + +const data = ` +a2380aff030a20ff2dd95def59a0c5265f30768a1c6fbdab519c96fb98df +33d524a94575bac1e91292030a02080b120c63726f6e6f735f3737372d31 +1802220c08d9a39a9a0610b8d89eca022a480a202d8f46f61152696c6dc8 +0c6367c7ddc926dc3ed7ab23bd37fb7ca83eb00ba207122408011220df77 +0b6cb0a9694f550ae515d5fb0ebc08902e0fa47ceec816928e74fd8fee44 +322004943bc3709104b52f30f83ca3d30d8bf57551edd48797aef5cee50b +b532c15c3a20e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934c +a495991b7852b8554220f537a6e0561fa0edd12b30ec9b6479e659f6f1fa +1587e69556201bfaf4cf97404a20f537a6e0561fa0edd12b30ec9b6479e6 +59f6f1fa1587e69556201bfaf4cf97405220252fe7cf36dd1bb85dafc47a +08961df0cfd8c027defa5e01e958be121599db9d5a209048462db52bb809 +6c92f8733fcf26455a000a550d9f3748dddadcc2267917146220e3b0c442 +98fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b8556a20 +e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852 +b8557214ae7ccd8d599769209074b81d0c2f3f28624742b71a4612210a1d +0a142861b93c776d100695688250c5b1ba4d44ef2a791880a094a58d1d10 +0112210a1d0a14ae7ccd8d599769209074b81d0c2f3f28624742b71880a0 +94a58d1d100112ae100a630a0d636f696e5f726563656976656412360a08 +7265636569766572122a637263316d33683330776c767366386c6c727578 +7470756b64767379306b6d326b756d386c3061773437121a0a06616d6f75 +6e74121034313139343530383537377374616b650a5c0a08636f696e6261 +736512340a066d696e746572122a637263316d33683330776c767366386c +6c7275787470756b64767379306b6d326b756d386c3061773437121a0a06 +616d6f756e74121034313139343530383537377374616b650a5f0a0a636f +696e5f7370656e7412350a077370656e646572122a637263316d33683330 +776c767366386c6c7275787470756b64767379306b6d326b756d386c3061 +773437121a0a06616d6f756e74121034313139343530383537377374616b +650a630a0d636f696e5f726563656976656412360a087265636569766572 +122a637263313778706676616b6d32616d67393632796c73366638347a33 +6b656c6c3863356c3838656b6572121a0a06616d6f756e74121034313139 +343530383537377374616b650a95010a087472616e7366657212370a0972 +6563697069656e74122a637263313778706676616b6d32616d6739363279 +6c73366638347a336b656c6c3863356c3838656b657212340a0673656e64 +6572122a637263316d33683330776c767366386c6c7275787470756b6476 +7379306b6d326b756d386c3061773437121a0a06616d6f756e7412103431 +3139343530383537377374616b650a3f0a076d65737361676512340a0673 +656e646572122a637263316d33683330776c767366386c6c727578747075 +6b64767379306b6d326b756d386c30617734370aa2010a046d696e741224 +0a0c626f6e6465645f726174696f1214302e393939393939393739343032 +37343439353212210a09696e666c6174696f6e1214302e31323939393939 +3739373130313635333031123a0a11616e6e75616c5f70726f766973696f +6e7312253235393939393936343737353631363138382e37363031383234 +363033383333383838343312150a06616d6f756e74120b34313139343530 +383537370a5f0a0a636f696e5f7370656e7412350a077370656e64657212 +2a637263313778706676616b6d32616d67393632796c73366638347a336b +656c6c3863356c3838656b6572121a0a06616d6f756e7412103832333839 +3031393532307374616b650a630a0d636f696e5f72656365697665641236 +0a087265636569766572122a637263316a7636357333677271663676366a +6c33647034743663397439726b3939636438737037326d70121a0a06616d +6f756e74121038323338393031393532307374616b650a95010a08747261 +6e7366657212370a09726563697069656e74122a637263316a7636357333 +677271663676366a6c33647034743663397439726b393963643873703732 +6d7012340a0673656e646572122a637263313778706676616b6d32616d67 +393632796c73366638347a336b656c6c3863356c3838656b6572121a0a06 +616d6f756e74121038323338393031393532307374616b650a3f0a076d65 +737361676512340a0673656e646572122a637263313778706676616b6d32 +616d67393632796c73366638347a336b656c6c3863356c3838656b65720a +7f0a0f70726f706f7365725f726577617264122c0a06616d6f756e741222 +343131393435303937362e30303030303030303030303030303030303073 +74616b65123e0a0976616c696461746f72123163726376616c6f70657231 +326c756b75367578656868616b303270793472637a36357a753073776837 +776a36756c726c670a790a0a636f6d6d697373696f6e122b0a06616d6f75 +6e7412213431313934353039372e36303030303030303030303030303030 +30307374616b65123e0a0976616c696461746f72123163726376616c6f70 +657231326c756b75367578656868616b303270793472637a36357a753073 +776837776a36756c726c670a770a0772657761726473122c0a06616d6f75 +6e741222343131393435303937362e303030303030303030303030303030 +3030307374616b65123e0a0976616c696461746f72123163726376616c6f +70657231326c756b75367578656868616b303270793472637a36357a7530 +73776837776a36756c726c670a7a0a0a636f6d6d697373696f6e122c0a06 +616d6f756e741222333833313038393430372e3638303030303030303030 +303030303030307374616b65123e0a0976616c696461746f721231637263 +76616c6f70657231326c756b75367578656868616b303270793472637a36 +357a753073776837776a36756c726c670a780a0772657761726473122d0a +06616d6f756e74122333383331303839343037362e383030303030303030 +3030303030303030307374616b65123e0a0976616c696461746f72123163 +726376616c6f70657231326c756b75367578656868616b30327079347263 +7a36357a753073776837776a36756c726c670a7a0a0a636f6d6d69737369 +6f6e122c0a06616d6f756e741222333833313038393430372e3638303030 +303030303030303030303030307374616b65123e0a0976616c696461746f +72123163726376616c6f70657231387a367133386d68767473767972356d +616b38666a38733867346777376b6a6a7030653064680a780a0772657761 +726473122d0a06616d6f756e74122333383331303839343037362e383030 +3030303030303030303030303030307374616b65123e0a0976616c696461 +746f72123163726376616c6f70657231387a367133386d68767473767972 +356d616b38666a38733867346777376b6a6a7030653064680a250a0a6665 +655f6d61726b657412170a08626173655f666565120b3736353632353030 +3030301aa9100aa4040aa1040afa020aba020a2a2f636f736d6f732e7374 +616b696e672e763162657461312e4d736743726561746556616c69646174 +6f72128b020a070a056e6f646530123b0a12313030303030303030303030 +30303030303012123230303030303030303030303030303030301a113130 +3030303030303030303030303030301a0131222a63726331326c756b7536 +7578656868616b303270793472637a36357a753073776837776a73727730 +70702a3163726376616c6f70657231326c756b75367578656868616b3032 +70793472637a36357a753073776837776a36756c726c6732430a1d2f636f +736d6f732e63727970746f2e656432353531392e5075624b657912220a20 +9b86839433f4229b7b7b51554a25ed32549126eb80fe53497ae336a1e67b +66843a1c0a057374616b6512133130303030303030303030303030303030 +3030123b3439643130313933363334303730393664303337396438643863 +3833336438626633396334383866403139322e3136382e302e35343a3236 +363536125f0a570a4f0a282f65746865726d696e742e63727970746f2e76 +312e657468736563703235366b312e5075624b657912230a21026e710a62 +a342de0ed4d7c4532dcbcbbafbf19652ed67b237efab70e8b207efac1204 +0a020801120410c09a0c1a4119ffd1b342c44e183b8113561595186e1f72 +c7273ac4e9ccb04c2b8a4d31b3780eff63cb490102ed39f7482084b4ddb8 +4d672ee2607262a312724f0e5b2b79aa0012ff0b123612340a322f636f73 +6d6f732e7374616b696e672e763162657461312e4d736743726561746556 +616c696461746f72526573706f6e73651ae0055b7b226d73675f696e6465 +78223a302c226576656e7473223a5b7b2274797065223a22636f696e5f72 +65636569766564222c2261747472696275746573223a5b7b226b6579223a +227265636569766572222c2276616c7565223a22637263317479676d7333 +78686873337976343837706878336477346139356a6e3774376c6b393067 +6161227d2c7b226b6579223a22616d6f756e74222c2276616c7565223a22 +313030303030303030303030303030303030307374616b65227d5d7d2c7b +2274797065223a22636f696e5f7370656e74222c22617474726962757465 +73223a5b7b226b6579223a227370656e646572222c2276616c7565223a22 +63726331326c756b75367578656868616b303270793472637a36357a7530 +73776837776a737277307070227d2c7b226b6579223a22616d6f756e7422 +2c2276616c7565223a223130303030303030303030303030303030303073 +74616b65227d5d7d2c7b2274797065223a226372656174655f76616c6964 +61746f72222c2261747472696275746573223a5b7b226b6579223a227661 +6c696461746f72222c2276616c7565223a2263726376616c6f7065723132 +6c756b75367578656868616b303270793472637a36357a75307377683777 +6a36756c726c67227d2c7b226b6579223a22616d6f756e74222c2276616c +7565223a22313030303030303030303030303030303030307374616b6522 +7d5d7d2c7b2274797065223a226d657373616765222c2261747472696275 +746573223a5b7b226b6579223a22616374696f6e222c2276616c7565223a +222f636f736d6f732e7374616b696e672e763162657461312e4d73674372 +6561746556616c696461746f72227d2c7b226b6579223a226d6f64756c65 +222c2276616c7565223a227374616b696e67227d2c7b226b6579223a2273 +656e646572222c2276616c7565223a2263726331326c756b753675786568 +68616b303270793472637a36357a753073776837776a737277307070227d +5d7d5d7d5d28ffffffffffffffffff0130bca10d3a440a02747812050a03 +66656512370a096665655f7061796572122a63726331326c756b75367578 +656868616b303270793472637a36357a753073776837776a737277307070 +3a3d0a02747812370a076163635f736571122c63726331326c756b753675 +78656868616b303270793472637a36357a753073776837776a7372773070 +702f303a6b0a02747812650a097369676e6174757265125847662f527330 +4c455468673767524e57465a55596268397978796336784f6e4d73457772 +696b30787333674f2f32504c5351454337546e3353434345744e32345457 +6375346d427959714d53636b384f577974357167413d3a3f0a076d657373 +61676512340a06616374696f6e122a2f636f736d6f732e7374616b696e67 +2e763162657461312e4d736743726561746556616c696461746f723a670a +0a636f696e5f7370656e7412350a077370656e646572122a63726331326c +756b75367578656868616b303270793472637a36357a753073776837776a +73727730707012220a06616d6f756e741218313030303030303030303030 +303030303030307374616b653a6b0a0d636f696e5f726563656976656412 +360a087265636569766572122a637263317479676d733378686873337976 +343837706878336477346139356a6e3774376c6b393067616112220a0661 +6d6f756e741218313030303030303030303030303030303030307374616b +653a760a106372656174655f76616c696461746f72123e0a0976616c6964 +61746f72123163726376616c6f70657231326c756b75367578656868616b +303270793472637a36357a753073776837776a36756c726c6712220a0661 +6d6f756e741218313030303030303030303030303030303030307374616b +653a520a076d65737361676512110a066d6f64756c6512077374616b696e +6712340a0673656e646572122a63726331326c756b75367578656868616b +303270793472637a36357a753073776837776a7372773070701aa9100aa4 +040aa1040afa020aba020a2a2f636f736d6f732e7374616b696e672e7631 +62657461312e4d736743726561746556616c696461746f72128b020a070a +056e6f646531123b0a123130303030303030303030303030303030301212 +3230303030303030303030303030303030301a1131303030303030303030 +303030303030301a0131222a63726331387a367133386d68767473767972 +356d616b38666a38733867346777376b6a6a747367726e372a3163726376 +616c6f70657231387a367133386d68767473767972356d616b38666a3873 +3867346777376b6a6a70306530646832430a1d2f636f736d6f732e637279 +70746f2e656432353531392e5075624b657912220a2072b50cf0ed1863ff +c937af99b6ad779a2c223e59459eab7768bda7c2da6f836e3a1c0a057374 +616b65121331303030303030303030303030303030303030123b35396566 +333139663464383334396466626532613233653131356163353835356430 +303938613065403139322e3136382e302e35343a3236363536125f0a570a +4f0a282f65746865726d696e742e63727970746f2e76312e657468736563 +703235366b312e5075624b657912230a210242785a75074452d62a6ac222 +70ffb8fb01c9375d0ba72887ae800dc619315d1b12040a020801120410c0 +9a0c1a41e7019fd760970e02f8967aa0f9820c0b98de32d8e72601aa34fe +60df52356d19591eb2bd8516037d2c52c22170ca533abf72d50d4c7f770d +1d5e045df51ff89c0112ff0b123612340a322f636f736d6f732e7374616b +696e672e763162657461312e4d736743726561746556616c696461746f72 +526573706f6e73651ae0055b7b226d73675f696e646578223a302c226576 +656e7473223a5b7b2274797065223a22636f696e5f726563656976656422 +2c2261747472696275746573223a5b7b226b6579223a2272656365697665 +72222c2276616c7565223a22637263317479676d73337868687333797634 +3837706878336477346139356a6e3774376c6b3930676161227d2c7b226b +6579223a22616d6f756e74222c2276616c7565223a223130303030303030 +30303030303030303030307374616b65227d5d7d2c7b2274797065223a22 +636f696e5f7370656e74222c2261747472696275746573223a5b7b226b65 +79223a227370656e646572222c2276616c7565223a2263726331387a3671 +33386d68767473767972356d616b38666a38733867346777376b6a6a7473 +67726e37227d2c7b226b6579223a22616d6f756e74222c2276616c756522 +3a22313030303030303030303030303030303030307374616b65227d5d7d +2c7b2274797065223a226372656174655f76616c696461746f72222c2261 +747472696275746573223a5b7b226b6579223a2276616c696461746f7222 +2c2276616c7565223a2263726376616c6f70657231387a367133386d6876 +7473767972356d616b38666a38733867346777376b6a6a70306530646822 +7d2c7b226b6579223a22616d6f756e74222c2276616c7565223a22313030 +303030303030303030303030303030307374616b65227d5d7d2c7b227479 +7065223a226d657373616765222c2261747472696275746573223a5b7b22 +6b6579223a22616374696f6e222c2276616c7565223a222f636f736d6f73 +2e7374616b696e672e763162657461312e4d736743726561746556616c69 +6461746f72227d2c7b226b6579223a226d6f64756c65222c2276616c7565 +223a227374616b696e67227d2c7b226b6579223a2273656e646572222c22 +76616c7565223a2263726331387a367133386d68767473767972356d616b +38666a38733867346777376b6a6a747367726e37227d5d7d5d7d5d28ffff +ffffffffffffff0130fc8c0d3a440a02747812050a0366656512370a0966 +65655f7061796572122a63726331387a367133386d68767473767972356d +616b38666a38733867346777376b6a6a747367726e373a3d0a0274781237 +0a076163635f736571122c63726331387a367133386d6876747376797235 +6d616b38666a38733867346777376b6a6a747367726e372f303a6b0a0274 +7812650a097369676e61747572651258357747663132435844674c346c6e +71672b59494d43356a654d746a6e4a6747714e5035673331493162526c5a +48724b39685259446653785377694677796c4d3676334c564455782f6477 +30645867526439522f346e41453d3a3f0a076d65737361676512340a0661 +6374696f6e122a2f636f736d6f732e7374616b696e672e76316265746131 +2e4d736743726561746556616c696461746f723a670a0a636f696e5f7370 +656e7412350a077370656e646572122a63726331387a367133386d687674 +73767972356d616b38666a38733867346777376b6a6a747367726e371222 +0a06616d6f756e7412183130303030303030303030303030303030303073 +74616b653a6b0a0d636f696e5f726563656976656412360a087265636569 +766572122a637263317479676d7333786868733379763438377068783364 +77346139356a6e3774376c6b393067616112220a06616d6f756e74121831 +3030303030303030303030303030303030307374616b653a760a10637265 +6174655f76616c696461746f72123e0a0976616c696461746f7212316372 +6376616c6f70657231387a367133386d68767473767972356d616b38666a +38733867346777376b6a6a70306530646812220a06616d6f756e74121831 +3030303030303030303030303030303030307374616b653a520a076d6573 +7361676512110a066d6f64756c6512077374616b696e6712340a0673656e +646572122a63726331387a367133386d68767473767972356d616b38666a +38733867346777376b6a6a747367726e37220208022aec0212260a090880 +804010e0aeee26120e08a08d0612040880c60a188080401a090a07656432 +353531391a9a020a0b626c6f636b5f626c6f6f6d128a020a05626c6f6f6d +128002000000000000000000000000000000000000000000000000000000 +000000000000000000000000000000000000000000000000000000000000 +000000000000000000000000000000000000000000000000000000000000 +000000000000000000000000000000000000000000000000000000000000 +000000000000000000000000000000000000000000000000000000000000 +000000000000000000000000000000000000000000000000000000000000 +000000000000000000000000000000000000000000000000000000000000 +000000000000000000000000000000000000000000000000000000000000 +000000000000000000000000000000000000001a250a09626c6f636b5f67 +6173120b0a06686569676874120132120b0a06616d6f756e741201303222 +1220b995a75e975242574ff1730feaf1e9255743e86ff9d949b04e2906ad +fd10f824230a0462616e6b1a06007374616b652213323030303030303038 +32333839303139353230300a0462616e6b1a1b021493354845030274cd4b +f1686abd60ab28ec52e1a77374616b65220b383233383930313935323025 +0a0462616e6b10011a1b0214dc6f17bbec824fff8f86587966b2047db6ab +73677374616b65250a0462616e6b10011a1b0214f1829676db577682e944 +fc3493d451b67ff3e29f7374616b65270a0462616e6b1a1c037374616b65 +001493354845030274cd4bf1686abd60ab28ec52e1a7220100260a046261 +6e6b10011a1c037374616b650014dc6f17bbec824fff8f86587966b2047d +b6ab7367260a0462616e6b10011a1c037374616b650014f1829676db5776 +82e944fc3493d451b67ff3e29f3a0a0c646973747269627574696f6e1a01 +0022270a250a057374616b65121c31363437373830333930343030303030 +303030303030303030303030290a0c646973747269627574696f6e1a0101 +22160a14ae7ccd8d599769209074b81d0c2f3f28624742b7500a0c646973 +747269627574696f6e1a16021438b4089f7762e0c20e9bed8e991e074550 +ef5a5222280a260a057374616b65121d3338333130383934303736383030 +303030303030303030303030303030500a0c646973747269627574696f6e +1a16021457f96e6b86cdefdb3d412547816a82e3e0ebf9d222280a260a05 +7374616b65121d3432343330333435303532383030303030303030303030 +303030303030520a0c646973747269627574696f6e1a16061438b4089f77 +62e0c20e9bed8e991e074550ef5a52222a0a260a057374616b65121d3334 +343739383034363639313230303030303030303030303030303030100252 +0a0c646973747269627574696f6e1a16061457f96e6b86cdefdb3d412547 +816a82e3e0ebf9d2222a0a260a057374616b65121d333831383733313035 +343735323030303030303030303030303030303010024f0a0c6469737472 +69627574696f6e1a16071438b4089f7762e0c20e9bed8e991e074550ef5a +5222270a250a057374616b65121c33383331303839343037363830303030 +3030303030303030303030304f0a0c646973747269627574696f6e1a1607 +1457f96e6b86cdefdb3d412547816a82e3e0ebf9d222270a250a05737461 +6b65121c3432343330333435303532383030303030303030303030303030 +3030180a096665656d61726b65741a010122080000000000000000450a04 +6d696e741a0100223a0a1231323939393939373937313031363533303112 +243235393939393936343737353631363138383736303138323436303338 +333338383834332a0a06706172616d731a116665656d61726b65742f4261 +7365466565220d223736353632353030303030225b0a08736c617368696e +671a1601142861b93c776d100695688250c5b1ba4d44ef2a7922370a3163 +726376616c636f6e73313970736d6a307268643567716439746773666776 +7476643666347a7737326e656674666d6730180122005b0a08736c617368 +696e671a160114ae7ccd8d599769209074b81d0c2f3f28624742b722370a +3163726376616c636f6e7331346537766d7232656a61356a707972356871 +77736374656c397033797773346874333468797a18012200cd070a077374 +616b696e671a02503222bd070a92030a02080b120c63726f6e6f735f3737 +372d311802220c08d9a39a9a0610b8d89eca022a480a202d8f46f6115269 +6c6dc80c6367c7ddc926dc3ed7ab23bd37fb7ca83eb00ba2071224080112 +20df770b6cb0a9694f550ae515d5fb0ebc08902e0fa47ceec816928e74fd +8fee44322004943bc3709104b52f30f83ca3d30d8bf57551edd48797aef5 +cee50bb532c15c3a20e3b0c44298fc1c149afbf4c8996fb92427ae41e464 +9b934ca495991b7852b8554220f537a6e0561fa0edd12b30ec9b6479e659 +f6f1fa1587e69556201bfaf4cf97404a20f537a6e0561fa0edd12b30ec9b +6479e659f6f1fa1587e69556201bfaf4cf97405220252fe7cf36dd1bb85d +afc47a08961df0cfd8c027defa5e01e958be121599db9d5a209048462db5 +2bb8096c92f8733fcf26455a000a550d9f3748dddadcc2267917146220e3 +b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b8 +556a20e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca49599 +1b7852b8557214ae7ccd8d599769209074b81d0c2f3f28624742b7129102 +0a3163726376616c6f70657231326c756b75367578656868616b30327079 +3472637a36357a753073776837776a36756c726c6712430a1d2f636f736d +6f732e63727970746f2e656432353531392e5075624b657912220a209b86 +839433f4229b7b7b51554a25ed32549126eb80fe53497ae336a1e67b6684 +20032a133130303030303030303030303030303030303032253130303030 +303030303030303030303030303030303030303030303030303030303030 +30303a070a056e6f6465304a00524b0a3b0a123130303030303030303030 +3030303030303012123230303030303030303030303030303030301a1131 +30303030303030303030303030303030120c08d1a39a9a0610a89eb0a602 +5a01311291020a3163726376616c6f70657231387a367133386d68767473 +767972356d616b38666a38733867346777376b6a6a70306530646812430a +1d2f636f736d6f732e63727970746f2e656432353531392e5075624b6579 +12220a2072b50cf0ed1863ffc937af99b6ad779a2c223e59459eab7768bd +a7c2da6f836e20032a133130303030303030303030303030303030303032 +253130303030303030303030303030303030303030303030303030303030 +30303030303030303a070a056e6f6465314a00524b0a3b0a123130303030 +303030303030303030303030301212323030303030303030303030303030 +3030301a113130303030303030303030303030303030120c08d1a39a9a06 +10a89eb0a6025a0131 +` + +func TestReadFileStreamer(t *testing.T) { + buf, err := hex.DecodeString(strings.Replace(data, "\n", "", -1)) + require.NoError(t, err) + + changeSet, err := ReadFileStreamer(bufio.NewReader(bytes.NewReader(buf))) + require.NoError(t, err) + + require.Equal(t, 21, len(changeSet)) + expItem := types.StoreKVPair{StoreKey: "bank", Delete: false, Key: []uint8{0x0, 0x73, 0x74, 0x61, 0x6b, 0x65}, Value: []uint8{0x32, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x38, 0x32, 0x33, 0x38, 0x39, 0x30, 0x31, 0x39, 0x35, 0x32, 0x30}} + require.Equal(t, expItem, changeSet[0]) +} diff --git a/versiondb/tmdb/history.go b/versiondb/tmdb/history.go new file mode 100644 index 0000000000..0b0438b3ec --- /dev/null +++ b/versiondb/tmdb/history.go @@ -0,0 +1,67 @@ +package tmdb + +import ( + "bytes" + + "github.com/RoaringBitmap/roaring/roaring64" + + "github.com/crypto-org-chain/cronos/versiondb" + dbm "github.com/tendermint/tm-db" +) + +// GetHistoryIndex returns the history index bitmap. +func GetHistoryIndex(db dbm.DB, key []byte) (*roaring64.Bitmap, error) { + // try to seek the first chunk whose maximum is bigger or equal to the target height. + bz, err := db.Get(key) + if err != nil { + return nil, err + } + if len(bz) == 0 { + return nil, nil + } + m := roaring64.New() + _, err = m.ReadFrom(bytes.NewReader(bz)) + if err != nil { + return nil, err + } + return m, nil +} + +// SeekHistoryIndex locate the minimal version that changed the key and is larger than the target version, +// using the returned version can find the value for the target version in changeset table. +// If not found, return -1 +func SeekHistoryIndex(db dbm.DB, key []byte, version uint64) (int64, error) { + m, err := GetHistoryIndex(db, key) + if err != nil { + return -1, err + } + found, ok := versiondb.SeekInBitmap64(m, version+1) + if !ok { + return -1, nil + } + return int64(found), nil +} + +// WriteHistoryIndex set the block height to the history bitmap. +// it try to set to the last chunk, if the last chunk exceeds chunk limit, split it. +func WriteHistoryIndex(db dbm.DB, batch dbm.Batch, key []byte, height uint64) error { + bz, err := db.Get(key) + if err != nil { + return err + } + + m := roaring64.New() + if len(bz) > 0 { + _, err = m.ReadFrom(bytes.NewReader(bz)) + if err != nil { + return err + } + } + m.Add(height) + m.RunOptimize() + bz, err = m.ToBytes() + if err != nil { + return err + } + return batch.Set(key, bz) +} diff --git a/versiondb/tmdb/iterator.go b/versiondb/tmdb/iterator.go new file mode 100644 index 0000000000..bdbc9a6e6d --- /dev/null +++ b/versiondb/tmdb/iterator.go @@ -0,0 +1,183 @@ +package tmdb + +import ( + "bytes" + + "github.com/RoaringBitmap/roaring/roaring64" + "github.com/cosmos/cosmos-sdk/store/types" + "github.com/crypto-org-chain/cronos/versiondb" + dbm "github.com/tendermint/tm-db" +) + +type Iterator struct { + storeKey string + version int64 + + start, end []byte + + plain, history types.Iterator + changesetDB dbm.DB + + key, value []byte + + reverse bool + status int + err error +} + +var _ types.Iterator = (*Iterator)(nil) + +func NewIterator(storeKey string, version int64, plainDB, historyDB types.KVStore, changesetDB dbm.DB, start, end []byte, reverse bool) (types.Iterator, error) { + var plain, history types.Iterator + + if reverse { + plain = plainDB.ReverseIterator(start, end) + } else { + plain = plainDB.Iterator(start, end) + } + + if reverse { + history = historyDB.ReverseIterator(start, end) + } else { + history = historyDB.Iterator(start, end) + } + iter := &Iterator{ + storeKey: storeKey, version: version, + reverse: reverse, + start: start, end: end, + plain: plain, history: history, + changesetDB: changesetDB, + } + iter.err = iter.resolve() + return iter, nil +} + +// Domain implements types.Iterator. +func (iter *Iterator) Domain() ([]byte, []byte) { + return iter.start, iter.end +} + +func (iter *Iterator) Valid() bool { + return iter.err == nil && len(iter.key) > 0 +} + +func (iter *Iterator) Next() { + switch iter.status { + case -2: + return + case 0: + iter.plain.Next() + iter.history.Next() + case 1: + iter.history.Next() + case -1: + iter.plain.Next() + } + iter.err = iter.resolve() +} + +func (iter *Iterator) Key() []byte { + return iter.key +} + +func (iter *Iterator) Value() []byte { + return iter.value +} + +func (iter *Iterator) Close() error { + err1 := iter.plain.Close() + err2 := iter.history.Close() + if err1 != nil { + return err1 + } + if err2 != nil { + return err2 + } + return nil +} + +func (iter *Iterator) Error() error { + return iter.err +} + +func (iter *Iterator) getFromHistory(key []byte, bz []byte, getLatestValue func() []byte) ([]byte, error) { + m := roaring64.New() + _, err := m.ReadFrom(bytes.NewReader(bz)) + if err != nil { + return nil, err + } + found, ok := versiondb.SeekInBitmap64(m, uint64(iter.version)+1) + if !ok { + // not changed, use the latest one + return getLatestValue(), nil + } + changesetKey := ChangesetKey(found, prependStoreKey(iter.storeKey, key)) + return iter.changesetDB.Get(changesetKey) +} + +func (iter *Iterator) resolve() (err error) { + for { + var pkey, hkey []byte + if iter.plain.Valid() { + pkey = iter.plain.Key() + } + if iter.history.Valid() { + hkey = iter.history.Key() + } + + iter.status = compareKey(pkey, hkey, iter.reverse) + switch iter.status { + case -2: + // end of iteration + iter.key = nil + iter.value = nil + return nil + case 0: + // find the historial value, or fallback to latest one. + iter.key = hkey + iter.value, err = iter.getFromHistory(hkey, iter.history.Value(), func() []byte { + return iter.plain.Value() + }) + if len(iter.value) > 0 { + return + } + iter.plain.Next() + iter.history.Next() + case 1: + // plain state exhausted or history cursor lag behind + // the key is deleted in plain state, use the history state. + iter.key = hkey + iter.value, err = iter.getFromHistory(hkey, iter.history.Value(), func() []byte { + return nil + }) + if len(iter.value) > 0 { + return + } + iter.history.Next() + case -1: + // history state exhausted or plain cursor lag behind + // the key don't exist in history state, use the plain state value. + iter.key = pkey + iter.value = iter.plain.Value() + return + } + } +} + +// compareKey is similar to bytes.Compare, but it treat empty slice as biggest value. +func compareKey(k1, k2 []byte, reverse bool) int { + switch { + case len(k1) == 0 && len(k2) == 0: + return -2 + case len(k1) == 0: + return 1 + case len(k2) == 0: + return -1 + default: + result := bytes.Compare(k1, k2) + if reverse { + result = -result + } + return result + } +} diff --git a/versiondb/tmdb/store.go b/versiondb/tmdb/store.go new file mode 100644 index 0000000000..d86b2eddbe --- /dev/null +++ b/versiondb/tmdb/store.go @@ -0,0 +1,266 @@ +package tmdb + +import ( + "bytes" + "errors" + "fmt" + + "github.com/cosmos/cosmos-sdk/store/dbadapter" + "github.com/cosmos/cosmos-sdk/store/prefix" + "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" + gogotypes "github.com/cosmos/gogoproto/types" + "github.com/crypto-org-chain/cronos/versiondb" + dbm "github.com/tendermint/tm-db" +) + +const latestVersionKey = "s/latest" + +var _ versiondb.VersionStore = (*Store)(nil) + +// Store implements `VersionStore`. +type Store struct { + // latest key-value pairs + plainDB dbm.DB + // history bitmap index of keys + historyDB dbm.DB + // changesets of each blocks + changesetDB dbm.DB +} + +func NewStore(plainDB, historyDB, changesetDB dbm.DB) *Store { + return &Store{plainDB, historyDB, changesetDB} +} + +// PutAtVersion implements VersionStore interface +// TODO reduce allocation within iterations. +func (s *Store) PutAtVersion(version int64, changeSet []types.StoreKVPair) error { + plainBatch := s.plainDB.NewBatch() + defer plainBatch.Close() + historyBatch := s.historyDB.NewBatch() + defer historyBatch.Close() + changesetBatch := s.changesetDB.NewBatch() + defer changesetBatch.Close() + + for _, pair := range changeSet { + key := prependStoreKey(pair.StoreKey, pair.Key) + + if version == 0 { + // genesis state is written into plain state directly + if pair.Delete { + return errors.New("can't delete at genesis") + } + if err := plainBatch.Set(key, pair.Value); err != nil { + return err + } + continue + } + + original, err := s.plainDB.Get(key) + if err != nil { + return err + } + if bytes.Equal(original, pair.Value) { + // do nothing if the value is not changed + continue + } + + // write history index + if err := WriteHistoryIndex(s.historyDB, historyBatch, key, uint64(version)); err != nil { + return err + } + + // write the old value to changeset + if len(original) > 0 { + changesetKey := append(sdk.Uint64ToBigEndian(uint64(version)), key...) + if err := changesetBatch.Set(changesetKey, original); err != nil { + return err + } + } + + // write the new value to plain state + if pair.Delete { + if err := plainBatch.Delete(key); err != nil { + return err + } + } else { + if err := plainBatch.Set(key, pair.Value); err != nil { + return err + } + } + } + + // write latest version to plain state + if err := s.setLatestVersion(plainBatch, version); err != nil { + return err + } + + if err := changesetBatch.WriteSync(); err != nil { + return err + } + if err := historyBatch.WriteSync(); err != nil { + return err + } + return plainBatch.WriteSync() +} + +// GetAtVersion implements VersionStore interface +func (s *Store) GetAtVersion(storeKey string, key []byte, version *int64) ([]byte, error) { + rawKey := prependStoreKey(storeKey, key) + if version == nil { + return s.plainDB.Get(rawKey) + } + + height := *version + + // optimize for latest version + latest, err := s.GetLatestVersion() + if err != nil { + return nil, err + } + if height > latest { + return nil, fmt.Errorf("height %d is in the future", height) + } + if latest == height { + return s.plainDB.Get(rawKey) + } + + found, err := SeekHistoryIndex(s.historyDB, rawKey, uint64(height)) + if err != nil { + return nil, err + } + if found < 0 { + // there's no change records found after the target version, query the latest state. + return s.plainDB.Get(rawKey) + } + // get from changeset + changesetKey := ChangesetKey(uint64(found), rawKey) + return s.changesetDB.Get(changesetKey) +} + +// HasAtVersion implements VersionStore interface +func (s *Store) HasAtVersion(storeKey string, key []byte, version *int64) (bool, error) { + rawKey := prependStoreKey(storeKey, key) + if version == nil { + return s.plainDB.Has(rawKey) + } + + height := *version + + // optimize for latest version + latest, err := s.GetLatestVersion() + if err != nil { + return false, err + } + if height > latest { + return false, fmt.Errorf("height %d is in the future", height) + } + if latest == height { + return s.plainDB.Has(rawKey) + } + + found, err := SeekHistoryIndex(s.historyDB, rawKey, uint64(height)) + if err != nil { + return false, err + } + if found < 0 { + // there's no change records after the target version, query the latest state. + return s.plainDB.Has(rawKey) + } + // get from changeset + changesetKey := ChangesetKey(uint64(found), rawKey) + return s.changesetDB.Has(changesetKey) +} + +// IteratorAtVersion implements VersionStore interface +func (s *Store) IteratorAtVersion(storeKey string, start, end []byte, version *int64) (types.Iterator, error) { + storePrefix := StoreKeyPrefix(storeKey) + prefixPlain := prefix.NewStore(dbadapter.Store{DB: s.plainDB}, storePrefix) + if version == nil { + return prefixPlain.Iterator(start, end), nil + } + + // optimize for latest version + height := *version + latest, err := s.GetLatestVersion() + if err != nil { + return nil, err + } + if height > latest { + return nil, fmt.Errorf("height %d is in the future", height) + } + if latest == height { + return prefixPlain.Iterator(start, end), nil + } + + prefixHistory := prefix.NewStore(dbadapter.Store{DB: s.historyDB}, storePrefix) + return NewIterator(storeKey, height, prefixPlain, prefixHistory, s.changesetDB, start, end, false) +} + +// ReverseIteratorAtVersion implements VersionStore interface +func (s *Store) ReverseIteratorAtVersion(storeKey string, start, end []byte, version *int64) (types.Iterator, error) { + storePrefix := StoreKeyPrefix(storeKey) + prefixPlain := prefix.NewStore(dbadapter.Store{DB: s.plainDB}, storePrefix) + if version == nil { + return prefixPlain.ReverseIterator(start, end), nil + } + + // optimize for latest version + height := *version + latest, err := s.GetLatestVersion() + if err != nil { + return nil, err + } + if height > latest { + return nil, fmt.Errorf("height %d is in the future", height) + } + if latest == height { + return prefixPlain.ReverseIterator(start, end), nil + } + + prefixHistory := prefix.NewStore(dbadapter.Store{DB: s.historyDB}, storePrefix) + return NewIterator(storeKey, height, prefixPlain, prefixHistory, s.changesetDB, start, end, true) +} + +// GetLatestVersion returns the latest version stored in plain state, +// it's committed after the changesets, so the data for this version is guaranteed to be persisted. +// returns -1 if the key don't exists. +func (s *Store) GetLatestVersion() (int64, error) { + bz, err := s.plainDB.Get([]byte(latestVersionKey)) + if err != nil { + return -1, err + } else if bz == nil { + return -1, nil + } + + var latestVersion int64 + + if err := gogotypes.StdInt64Unmarshal(&latestVersion, bz); err != nil { + return -1, err + } + + return latestVersion, nil +} + +func (s *Store) setLatestVersion(plainBatch dbm.Batch, version int64) error { + // write latest version to plain state + bz, err := gogotypes.StdInt64Marshal(version) + if err != nil { + return err + } + return plainBatch.Set([]byte(latestVersionKey), bz) +} + +// ChangesetKey build key changeset db +func ChangesetKey(version uint64, key []byte) []byte { + return append(sdk.Uint64ToBigEndian(version), key...) +} + +func StoreKeyPrefix(storeKey string) []byte { + return []byte("s/k:" + storeKey + "/") +} + +// prependStoreKey prepends storeKey to the key +func prependStoreKey(storeKey string, key []byte) []byte { + return append(StoreKeyPrefix(storeKey), key...) +} diff --git a/versiondb/tmdb/store_test.go b/versiondb/tmdb/store_test.go new file mode 100644 index 0000000000..24453198ff --- /dev/null +++ b/versiondb/tmdb/store_test.go @@ -0,0 +1,14 @@ +package tmdb + +import ( + "testing" + + "github.com/crypto-org-chain/cronos/versiondb" + dbm "github.com/tendermint/tm-db" +) + +func TestTMDB(t *testing.T) { + versiondb.Run(t, func() versiondb.VersionStore { + return NewStore(dbm.NewMemDB(), dbm.NewMemDB(), dbm.NewMemDB()) + }) +} diff --git a/versiondb/types.go b/versiondb/types.go new file mode 100644 index 0000000000..fab4a6a21d --- /dev/null +++ b/versiondb/types.go @@ -0,0 +1,21 @@ +package versiondb + +import ( + "github.com/cosmos/cosmos-sdk/store/types" +) + +// VersionStore is a versioned storage of a flat key-value pairs. +// it don't need to support merkle proof, so could be implemented in a much more efficient way. +// `nil` version means the latest version. +type VersionStore interface { + GetAtVersion(storeKey string, key []byte, version *int64) ([]byte, error) + HasAtVersion(storeKey string, key []byte, version *int64) (bool, error) + IteratorAtVersion(storeKey string, start, end []byte, version *int64) (types.Iterator, error) + ReverseIteratorAtVersion(storeKey string, start, end []byte, version *int64) (types.Iterator, error) + GetLatestVersion() (int64, error) + + // Persist the change set of a block, + // the `changeSet` should be ordered by (storeKey, key), + // the version should be latest version plus one. + PutAtVersion(version int64, changeSet []types.StoreKVPair) error +}