Skip to content

Commit

Permalink
fix: rootmulti's Restore don't return the next unknown item as expect…
Browse files Browse the repository at this point in the history
…ed (#11286)

## Description
Solution:
- return the next unknown item and add a unit test to ensure that.

---

### Author Checklist

*All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues.*

I have...

- [ ] included the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] added `!` to the type prefix if API or client breaking change
- [ ] targeted the correct branch (see [PR Targeting](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting))
- [ ] provided a link to the relevant issue or specification
- [ ] followed the guidelines for [building modules](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules)
- [ ] included the necessary unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing)
- [ ] added a changelog entry to `CHANGELOG.md`
- [ ] included comments for [documenting Go code](https://blog.golang.org/godoc)
- [ ] updated the relevant documentation or specification
- [ ] reviewed "Files changed" and left comments if necessary
- [ ] confirmed all CI checks have passed

### Reviewers Checklist

*All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items.*

I have...

- [ ] confirmed the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] confirmed `!` in the type prefix if API or client breaking change
- [ ] confirmed all author checklist items have been addressed
- [ ] reviewed state machine logic
- [ ] reviewed API design and naming
- [ ] reviewed documentation is accurate
- [ ] reviewed tests and test coverage
- [ ] manually tested (if applicable)

(cherry picked from commit 41bd879)

# Conflicts:
#	store/rootmulti/snapshot_test.go
#	store/rootmulti/store.go
#	store/types/snapshot.pb.go
  • Loading branch information
yihuang authored and mergify-bot committed Feb 28, 2022
1 parent 12fffc1 commit 970fa71
Show file tree
Hide file tree
Showing 2 changed files with 323 additions and 0 deletions.
308 changes: 308 additions & 0 deletions store/rootmulti/snapshot_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,308 @@
package rootmulti_test

import (
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
"math/rand"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cosmos/cosmos-sdk/snapshots"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
"github.com/cosmos/cosmos-sdk/store/iavl"
"github.com/cosmos/cosmos-sdk/store/rootmulti"
"github.com/cosmos/cosmos-sdk/store/types"
dbm "github.com/tendermint/tm-db"
)

func newMultiStoreWithGeneratedData(db dbm.DB, stores uint8, storeKeys uint64) *rootmulti.Store {
multiStore := rootmulti.NewStore(db)
r := rand.New(rand.NewSource(49872768940)) // Fixed seed for deterministic tests

keys := []*types.KVStoreKey{}
for i := uint8(0); i < stores; i++ {
key := types.NewKVStoreKey(fmt.Sprintf("store%v", i))
multiStore.MountStoreWithDB(key, types.StoreTypeIAVL, nil)
keys = append(keys, key)
}
multiStore.LoadLatestVersion()

for _, key := range keys {
store := multiStore.GetCommitKVStore(key).(*iavl.Store)
for i := uint64(0); i < storeKeys; i++ {
k := make([]byte, 8)
v := make([]byte, 1024)
binary.BigEndian.PutUint64(k, i)
_, err := r.Read(v)
if err != nil {
panic(err)
}
store.Set(k, v)
}
}

multiStore.Commit()
multiStore.LoadLatestVersion()

return multiStore
}

func newMultiStoreWithMixedMounts(db dbm.DB) *rootmulti.Store {
store := rootmulti.NewStore(db)
store.MountStoreWithDB(types.NewKVStoreKey("iavl1"), types.StoreTypeIAVL, nil)
store.MountStoreWithDB(types.NewKVStoreKey("iavl2"), types.StoreTypeIAVL, nil)
store.MountStoreWithDB(types.NewKVStoreKey("iavl3"), types.StoreTypeIAVL, nil)
store.MountStoreWithDB(types.NewTransientStoreKey("trans1"), types.StoreTypeTransient, nil)
store.LoadLatestVersion()

return store
}

func newMultiStoreWithMixedMountsAndBasicData(db dbm.DB) *rootmulti.Store {
store := newMultiStoreWithMixedMounts(db)
store1 := store.GetStoreByName("iavl1").(types.CommitKVStore)
store2 := store.GetStoreByName("iavl2").(types.CommitKVStore)
trans1 := store.GetStoreByName("trans1").(types.KVStore)

store1.Set([]byte("a"), []byte{1})
store1.Set([]byte("b"), []byte{1})
store2.Set([]byte("X"), []byte{255})
store2.Set([]byte("A"), []byte{101})
trans1.Set([]byte("x1"), []byte{91})
store.Commit()

store1.Set([]byte("b"), []byte{2})
store1.Set([]byte("c"), []byte{3})
store2.Set([]byte("B"), []byte{102})
store.Commit()

store2.Set([]byte("C"), []byte{103})
store2.Delete([]byte("X"))
trans1.Set([]byte("x2"), []byte{92})
store.Commit()

return store
}

func assertStoresEqual(t *testing.T, expect, actual types.CommitKVStore, msgAndArgs ...interface{}) {
assert.Equal(t, expect.LastCommitID(), actual.LastCommitID())
expectIter := expect.Iterator(nil, nil)
expectMap := map[string][]byte{}
for ; expectIter.Valid(); expectIter.Next() {
expectMap[string(expectIter.Key())] = expectIter.Value()
}
require.NoError(t, expectIter.Error())

actualIter := expect.Iterator(nil, nil)
actualMap := map[string][]byte{}
for ; actualIter.Valid(); actualIter.Next() {
actualMap[string(actualIter.Key())] = actualIter.Value()
}
require.NoError(t, actualIter.Error())

assert.Equal(t, expectMap, actualMap, msgAndArgs...)
}

func TestMultistoreSnapshot_Checksum(t *testing.T) {
// Chunks from different nodes must fit together, so all nodes must produce identical chunks.
// This checksum test makes sure that the byte stream remains identical. If the test fails
// without having changed the data (e.g. because the Protobuf or zlib encoding changes),
// snapshottypes.CurrentFormat must be bumped.
store := newMultiStoreWithGeneratedData(dbm.NewMemDB(), 5, 10000)
version := uint64(store.LastCommitID().Version)

testcases := []struct {
format uint32
chunkHashes []string
}{
{1, []string{
"503e5b51b657055b77e88169fadae543619368744ad15f1de0736c0a20482f24",
"e1a0daaa738eeb43e778aefd2805e3dd720798288a410b06da4b8459c4d8f72e",
"aa048b4ee0f484965d7b3b06822cf0772cdcaad02f3b1b9055e69f2cb365ef3c",
"7921eaa3ed4921341e504d9308a9877986a879fe216a099c86e8db66fcba4c63",
"a4a864e6c02c9fca5837ec80dc84f650b25276ed7e4820cf7516ced9f9901b86",
"ca2879ac6e7205d257440131ba7e72bef784cd61642e32b847729e543c1928b9",
}},
}
for _, tc := range testcases {
tc := tc
t.Run(fmt.Sprintf("Format %v", tc.format), func(t *testing.T) {
ch := make(chan io.ReadCloser)
go func() {
streamWriter := snapshots.NewStreamWriter(ch)
defer streamWriter.Close()
require.NotNil(t, streamWriter)
err := store.Snapshot(version, streamWriter)
require.NoError(t, err)
}()
hashes := []string{}
hasher := sha256.New()
for chunk := range ch {
hasher.Reset()
_, err := io.Copy(hasher, chunk)
require.NoError(t, err)
hashes = append(hashes, hex.EncodeToString(hasher.Sum(nil)))
}
assert.Equal(t, tc.chunkHashes, hashes,
"Snapshot output for format %v has changed", tc.format)
})
}
}

func TestMultistoreSnapshot_Errors(t *testing.T) {
store := newMultiStoreWithMixedMountsAndBasicData(dbm.NewMemDB())

testcases := map[string]struct {
height uint64
expectType error
}{
"0 height": {0, nil},
"unknown height": {9, nil},
}
for name, tc := range testcases {
tc := tc
t.Run(name, func(t *testing.T) {
err := store.Snapshot(tc.height, nil)
require.Error(t, err)
if tc.expectType != nil {
assert.True(t, errors.Is(err, tc.expectType))
}
})
}
}

func TestMultistoreSnapshotRestore(t *testing.T) {
source := newMultiStoreWithMixedMountsAndBasicData(dbm.NewMemDB())
target := newMultiStoreWithMixedMounts(dbm.NewMemDB())
version := uint64(source.LastCommitID().Version)
require.EqualValues(t, 3, version)
dummyExtensionItem := snapshottypes.SnapshotItem{
Item: &snapshottypes.SnapshotItem_Extension{
Extension: &snapshottypes.SnapshotExtensionMeta{
Name: "test",
Format: 1,
},
},
}

chunks := make(chan io.ReadCloser, 100)
go func() {
streamWriter := snapshots.NewStreamWriter(chunks)
require.NotNil(t, streamWriter)
defer streamWriter.Close()
err := source.Snapshot(version, streamWriter)
require.NoError(t, err)
// write an extension metadata
err = streamWriter.WriteMsg(&dummyExtensionItem)
require.NoError(t, err)
}()

streamReader, err := snapshots.NewStreamReader(chunks)
require.NoError(t, err)
nextItem, err := target.Restore(version, snapshottypes.CurrentFormat, streamReader)
require.NoError(t, err)
require.Equal(t, *dummyExtensionItem.GetExtension(), *nextItem.GetExtension())

assert.Equal(t, source.LastCommitID(), target.LastCommitID())
for key, sourceStore := range source.GetStores() {
targetStore := target.GetStoreByName(key.Name()).(types.CommitKVStore)
switch sourceStore.GetStoreType() {
case types.StoreTypeTransient:
assert.False(t, targetStore.Iterator(nil, nil).Valid(),
"transient store %v not empty", key.Name())
default:
assertStoresEqual(t, sourceStore, targetStore, "store %q not equal", key.Name())
}
}
}

func benchmarkMultistoreSnapshot(b *testing.B, stores uint8, storeKeys uint64) {
b.Skip("Noisy with slow setup time, please see https://github.com/cosmos/cosmos-sdk/issues/8855.")

b.ReportAllocs()
b.StopTimer()
source := newMultiStoreWithGeneratedData(dbm.NewMemDB(), stores, storeKeys)
version := source.LastCommitID().Version
require.EqualValues(b, 1, version)
b.StartTimer()

for i := 0; i < b.N; i++ {
target := rootmulti.NewStore(dbm.NewMemDB())
for key := range source.GetStores() {
target.MountStoreWithDB(key, types.StoreTypeIAVL, nil)
}
err := target.LoadLatestVersion()
require.NoError(b, err)
require.EqualValues(b, 0, target.LastCommitID().Version)

chunks := make(chan io.ReadCloser)
go func() {
streamWriter := snapshots.NewStreamWriter(chunks)
require.NotNil(b, streamWriter)
err := source.Snapshot(uint64(version), streamWriter)
require.NoError(b, err)
}()
for reader := range chunks {
_, err := io.Copy(io.Discard, reader)
require.NoError(b, err)
err = reader.Close()
require.NoError(b, err)
}
}
}

func benchmarkMultistoreSnapshotRestore(b *testing.B, stores uint8, storeKeys uint64) {
b.Skip("Noisy with slow setup time, please see https://github.com/cosmos/cosmos-sdk/issues/8855.")

b.ReportAllocs()
b.StopTimer()
source := newMultiStoreWithGeneratedData(dbm.NewMemDB(), stores, storeKeys)
version := uint64(source.LastCommitID().Version)
require.EqualValues(b, 1, version)
b.StartTimer()

for i := 0; i < b.N; i++ {
target := rootmulti.NewStore(dbm.NewMemDB())
for key := range source.GetStores() {
target.MountStoreWithDB(key, types.StoreTypeIAVL, nil)
}
err := target.LoadLatestVersion()
require.NoError(b, err)
require.EqualValues(b, 0, target.LastCommitID().Version)

chunks := make(chan io.ReadCloser)
go func() {
writer := snapshots.NewStreamWriter(chunks)
require.NotNil(b, writer)
err := source.Snapshot(version, writer)
require.NoError(b, err)
}()
reader, err := snapshots.NewStreamReader(chunks)
require.NoError(b, err)
_, err = target.Restore(version, snapshottypes.CurrentFormat, reader)
require.NoError(b, err)
require.Equal(b, source.LastCommitID(), target.LastCommitID())
}
}

func BenchmarkMultistoreSnapshot100K(b *testing.B) {
benchmarkMultistoreSnapshot(b, 10, 10000)
}

func BenchmarkMultistoreSnapshot1M(b *testing.B) {
benchmarkMultistoreSnapshot(b, 10, 100000)
}

func BenchmarkMultistoreSnapshotRestore100K(b *testing.B) {
benchmarkMultistoreSnapshotRestore(b, 10, 10000)
}

func BenchmarkMultistoreSnapshotRestore1M(b *testing.B) {
benchmarkMultistoreSnapshotRestore(b, 10, 100000)
}
15 changes: 15 additions & 0 deletions store/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,9 +811,16 @@ func (rs *Store) Restore(
// a SnapshotStoreItem, telling us which store to import into. The following items will contain
// SnapshotNodeItem (i.e. ExportNode) until we reach the next SnapshotStoreItem or EOF.
var importer *iavltree.Importer
var snapshotItem snapshottypes.SnapshotItem
loop:
for {
<<<<<<< HEAD
item := &types.SnapshotItem{}
err := protoReader.ReadMsg(item)
=======
snapshotItem = snapshottypes.SnapshotItem{}
err := protoReader.ReadMsg(&snapshotItem)
>>>>>>> 41bd879ac (fix: rootmulti's Restore don't return the next unknown item as expected (#11286))
if err == io.EOF {
break
} else if err != nil {
Expand Down Expand Up @@ -867,7 +874,11 @@ func (rs *Store) Restore(
}

default:
<<<<<<< HEAD
return sdkerrors.Wrapf(sdkerrors.ErrLogic, "unknown snapshot item %T", item)
=======
break loop
>>>>>>> 41bd879ac (fix: rootmulti's Restore don't return the next unknown item as expected (#11286))
}
}

Expand All @@ -880,7 +891,11 @@ func (rs *Store) Restore(
}

flushMetadata(rs.db, int64(height), rs.buildCommitInfo(int64(height)), []int64{})
<<<<<<< HEAD
return rs.LoadLatestVersion()
=======
return snapshotItem, rs.LoadLatestVersion()
>>>>>>> 41bd879ac (fix: rootmulti's Restore don't return the next unknown item as expected (#11286))
}

func (rs *Store) loadCommitStoreFromParams(key types.StoreKey, id types.CommitID, params storeParams) (types.CommitKVStore, error) {
Expand Down

0 comments on commit 970fa71

Please sign in to comment.