From 42da76f903f8d3a4138ce2ed6426c7d0230bc7cc Mon Sep 17 00:00:00 2001 From: Raneet Debnath <35629432+Raneet10@users.noreply.github.com> Date: Sun, 17 Oct 2021 23:33:08 +0530 Subject: [PATCH] Add batching for KVstore (#149) * add batching to KVStore * add batch for PrefixKV --- block/manager_test.go | 4 ++-- da/mock/mock_test.go | 6 +++--- node/integration_test.go | 2 +- node/node.go | 4 ++-- store/badger.go | 38 ++++++++++++++++++++++++++++++++++++++ store/badger_test.go | 6 +++--- store/kv.go | 13 +++++++++++-- store/prefix.go | 30 +++++++++++++++++++++++++++++- store/prefix_test.go | 35 ++++++++++++++++++++++++++++++++++- store/store.go | 18 +++++++++++++----- store/store_test.go | 4 ++-- 11 files changed, 138 insertions(+), 22 deletions(-) diff --git a/block/manager_test.go b/block/manager_test.go index 5d37de6f8b1..6d734b798b6 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -29,9 +29,9 @@ func TestInitialState(t *testing.T) { LastBlockHeight: 128, } - emptyStore := store.New(store.NewInMemoryKVStore()) + emptyStore := store.New(store.NewDefaultInMemoryKVStore()) - fullStore := store.New(store.NewInMemoryKVStore()) + fullStore := store.New(store.NewDefaultInMemoryKVStore()) err := fullStore.UpdateState(sampleState) require.NoError(t, err) diff --git a/da/mock/mock_test.go b/da/mock/mock_test.go index 64a21018730..9b9519f3c17 100644 --- a/da/mock/mock_test.go +++ b/da/mock/mock_test.go @@ -15,7 +15,7 @@ import ( func TestLifecycle(t *testing.T) { var da da.DataAvailabilityLayerClient = &MockDataAvailabilityLayerClient{} - dalcKV := store.NewInMemoryKVStore() + dalcKV := store.NewDefaultInMemoryKVStore() require := require.New(t) @@ -31,7 +31,7 @@ func TestLifecycle(t *testing.T) { func TestMockDALC(t *testing.T) { var dalc da.DataAvailabilityLayerClient = &MockDataAvailabilityLayerClient{} - dalcKV := store.NewInMemoryKVStore() + dalcKV := store.NewDefaultInMemoryKVStore() require := require.New(t) assert := assert.New(t) @@ -72,7 +72,7 @@ func TestRetrieve(t *testing.T) { var dalc da.DataAvailabilityLayerClient = mock var retriever da.BlockRetriever = mock - dalcKV := store.NewInMemoryKVStore() + dalcKV := store.NewDefaultInMemoryKVStore() require := require.New(t) assert := assert.New(t) diff --git a/node/integration_test.go b/node/integration_test.go index a41d986fcf3..0bc47e87807 100644 --- a/node/integration_test.go +++ b/node/integration_test.go @@ -156,7 +156,7 @@ func createNodes(num int, t *testing.T) ([]*Node, []*mocks.Application) { nodes := make([]*Node, num) apps := make([]*mocks.Application, num) dalc := &mockda.MockDataAvailabilityLayerClient{} - _ = dalc.Init(nil, store.NewInMemoryKVStore(), log.TestingLogger()) + _ = dalc.Init(nil, store.NewDefaultInMemoryKVStore(), log.TestingLogger()) _ = dalc.Start() nodes[0], apps[0] = createNode(0, true, dalc, keys, t) for i := 1; i < num; i++ { diff --git a/node/node.go b/node/node.go index 5a5fc5fd7b1..976a2804630 100644 --- a/node/node.go +++ b/node/node.go @@ -82,9 +82,9 @@ func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey var baseKV store.KVStore if conf.RootDir == "" && conf.DBPath == "" { // this is used for testing logger.Info("WARNING: working in in-memory mode") - baseKV = store.NewInMemoryKVStore() + baseKV = store.NewDefaultInMemoryKVStore() } else { - baseKV = store.NewKVStore(conf.RootDir, conf.DBPath, "optimint") + baseKV = store.NewDefaultKVStore(conf.RootDir, conf.DBPath, "optimint") } mainKV := store.NewPrefixKV(baseKV, mainPrefix) dalcKV := store.NewPrefixKV(baseKV, dalcPrefix) diff --git a/store/badger.go b/store/badger.go index 1209e11f5d6..519eaa87d73 100644 --- a/store/badger.go +++ b/store/badger.go @@ -7,6 +7,7 @@ import ( ) var _ KVStore = &BadgerKV{} +var _ Batch = &BadgerBatch{} var ( // ErrKeyNotFound is returned if key is not found in KVStore. @@ -53,3 +54,40 @@ func (b *BadgerKV) Delete(key []byte) error { } return txn.Commit() } + +// NewBatch creates new batch. +// Note: badger batches should be short lived as they use extra resources. +func (b *BadgerKV) NewBatch() Batch { + return &BadgerBatch{ + txn: b.db.NewTransaction(true), + } +} + +// BadgerBatch encapsulates badger transaction +type BadgerBatch struct { + txn *badger.Txn +} + +// Set accumulates key-value entries in a transaction +func (bb *BadgerBatch) Set(key, value []byte) error { + if err := bb.txn.Set(key, value); err != nil { + return err + } + + return nil +} + +// Delete removes the key and associated value from store +func (bb *BadgerBatch) Delete(key []byte) error { + return bb.txn.Delete(key) +} + +// Commit commits a transaction +func (bb *BadgerBatch) Commit() error { + return bb.txn.Commit() +} + +// Discard cancels a transaction +func (bb *BadgerBatch) Discard() { + bb.txn.Discard() +} diff --git a/store/badger_test.go b/store/badger_test.go index 4dc94bc276d..c2be027f9d5 100644 --- a/store/badger_test.go +++ b/store/badger_test.go @@ -8,7 +8,7 @@ import ( ) func TestGetErrors(t *testing.T) { - dalcKV := NewInMemoryKVStore() + dalcKV := NewDefaultInMemoryKVStore() tc := []struct { name string @@ -30,7 +30,7 @@ func TestGetErrors(t *testing.T) { } func TestSetErrors(t *testing.T) { - dalcKV := NewInMemoryKVStore() + dalcKV := NewDefaultInMemoryKVStore() tc := []struct { name string @@ -53,7 +53,7 @@ func TestSetErrors(t *testing.T) { } func TestDeleteErrors(t *testing.T) { - dalcKV := NewInMemoryKVStore() + dalcKV := NewDefaultInMemoryKVStore() tc := []struct { name string diff --git a/store/kv.go b/store/kv.go index 3884724dc54..35931f1e722 100644 --- a/store/kv.go +++ b/store/kv.go @@ -13,10 +13,19 @@ type KVStore interface { Get(key []byte) ([]byte, error) // Get gets the value for a key. Set(key []byte, value []byte) error // Set updates the value for a key. Delete(key []byte) error // Delete deletes a key. + NewBatch() Batch +} + +// Batch enables batching of transactions +type Batch interface { + Set(key, value []byte) error // Accumulates KV entries in a transaction + Delete(key []byte) error // Deletes the given key + Commit() error // Commits the transaction + Discard() // Discards the transaction } // NewInMemoryKVStore builds KVStore that works in-memory (without accessing disk). -func NewInMemoryKVStore() KVStore { +func NewDefaultInMemoryKVStore() KVStore { db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true)) if err != nil { panic(err) @@ -26,7 +35,7 @@ func NewInMemoryKVStore() KVStore { } } -func NewKVStore(rootDir, dbPath, dbName string) KVStore { +func NewDefaultKVStore(rootDir, dbPath, dbName string) KVStore { path := filepath.Join(rootify(rootDir, dbPath), dbName) db, err := badger.Open(badger.DefaultOptions(path)) if err != nil { diff --git a/store/prefix.go b/store/prefix.go index 1a37deb90cc..829ddefcf5e 100644 --- a/store/prefix.go +++ b/store/prefix.go @@ -1,5 +1,8 @@ package store +var _ KVStore = &PrefixKV{} +var _ Batch = &PrefixKVBatch{} + type PrefixKV struct { kv KVStore prefix []byte @@ -23,5 +26,30 @@ func (p *PrefixKV) Set(key []byte, value []byte) error { func (p *PrefixKV) Delete(key []byte) error { return p.kv.Delete(append(p.prefix, key...)) } +func (p *PrefixKV) NewBatch() Batch { + return &PrefixKVBatch{ + b: p.kv.NewBatch(), + prefix: p.prefix, + } +} -var _ KVStore = &PrefixKV{} +type PrefixKVBatch struct { + b Batch + prefix []byte +} + +func (pb *PrefixKVBatch) Set(key, value []byte) error { + return pb.b.Set(append(pb.prefix, key...), value) +} + +func (pb *PrefixKVBatch) Delete(key []byte) error { + return pb.b.Delete(append(pb.prefix, key...)) +} + +func (pb *PrefixKVBatch) Commit() error { + return pb.b.Commit() +} + +func (pb *PrefixKVBatch) Discard() { + pb.b.Discard() +} diff --git a/store/prefix_test.go b/store/prefix_test.go index bc96d5f9023..b95e40cf595 100644 --- a/store/prefix_test.go +++ b/store/prefix_test.go @@ -13,7 +13,7 @@ func TestPrefixKV(t *testing.T) { assert := assert.New(t) require := require.New(t) - base := NewInMemoryKVStore() + base := NewDefaultInMemoryKVStore() p1 := NewPrefixKV(base, []byte{1}) p2 := NewPrefixKV(base, []byte{2}) @@ -71,3 +71,36 @@ func TestPrefixKV(t *testing.T) { require.NoError(err) assert.Equal(val22, v) } + +func TestPrefixKVBatch(t *testing.T) { + t.Parallel() + + assert := assert.New(t) + require := require.New(t) + + basekv := NewDefaultInMemoryKVStore() + prefixkv := NewPrefixKV(basekv, []byte("prefix1")) + prefixbatchkv1 := prefixkv.NewBatch() + + keys := [][]byte{[]byte("key1"), []byte("key2"), []byte("key3"), []byte("key4")} + values := [][]byte{[]byte("value1"), []byte("value2"), []byte("value3"), []byte("value4")} + + for i := 0; i < len(keys); i++ { + err := prefixbatchkv1.Set(keys[i], values[i]) + require.NoError(err) + } + + err := prefixbatchkv1.Commit() + require.NoError(err) + + for i := 0; i < len(keys); i++ { + vals, err := prefixkv.Get(keys[i]) + assert.Equal(vals, values[i]) + require.NoError(err) + } + + prefixbatchkv2 := prefixkv.NewBatch() + err = prefixbatchkv2.Delete([]byte("key1")) + require.NoError(err) + +} diff --git a/store/store.go b/store/store.go index b68ebacf1d5..d54887b65ee 100644 --- a/store/store.go +++ b/store/store.go @@ -33,7 +33,9 @@ var _ Store = &DefaultStore{} // New returns new, default store. func New(kv KVStore) Store { - return &DefaultStore{db: kv} + return &DefaultStore{ + db: kv, + } } // Height returns height of the highest block saved in the Store. @@ -59,12 +61,18 @@ func (s *DefaultStore) SaveBlock(block *types.Block, commit *types.Commit) error s.mtx.Lock() defer s.mtx.Unlock() - // TODO(tzdybal): use transaction for consistency of DB (https://github.com/celestiaorg/optimint/issues/80) - err = multierr.Append(err, s.db.Set(getBlockKey(hash), blockBlob)) - err = multierr.Append(err, s.db.Set(getCommitKey(hash), commitBlob)) - err = multierr.Append(err, s.db.Set(getIndexKey(block.Header.Height), hash[:])) + + bb := s.db.NewBatch() + err = multierr.Append(err, bb.Set(getBlockKey(hash), blockBlob)) + err = multierr.Append(err, bb.Set(getCommitKey(hash), commitBlob)) + err = multierr.Append(err, bb.Set(getIndexKey(block.Header.Height), hash[:])) if err != nil { + bb.Discard() + return err + } + + if err = bb.Commit(); err != nil { return err } diff --git a/store/store_test.go b/store/store_test.go index fde97d9e7b3..4c0b1276c04 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -38,7 +38,7 @@ func TestBlockstoreHeight(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { assert := assert.New(t) - bstore := New(NewInMemoryKVStore()) + bstore := New(NewDefaultInMemoryKVStore()) assert.Equal(uint64(0), bstore.Height()) for _, block := range c.blocks { @@ -80,7 +80,7 @@ func TestBlockstoreLoad(t *testing.T) { } }() - for _, kv := range []KVStore{NewInMemoryKVStore(), NewKVStore(tmpDir, "db", "test")} { + for _, kv := range []KVStore{NewDefaultInMemoryKVStore(), NewDefaultKVStore(tmpDir, "db", "test")} { for _, c := range cases { t.Run(c.name, func(t *testing.T) { assert := assert.New(t)