Skip to content

Commit

Permalink
feat!: Add hooks to allow app modules to add things to state-sync (co…
Browse files Browse the repository at this point in the history
…smos#10961)

## Description

Closes: cosmos#7340

- Support registering multiple snapshotters in snapshot manager.
- Append the extension snapshotters to existing snapshot stream.

~TODO: testing.~
- existing tests are fixed






---

### Author Checklist

*All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues.*

I have...

- [ ] included the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] added `!` to the type prefix if API or client breaking change
- [ ] targeted the correct branch (see [PR Targeting](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting))
- [ ] provided a link to the relevant issue or specification
- [ ] followed the guidelines for [building modules](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules)
- [ ] included the necessary unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing)
- [ ] added a changelog entry to `CHANGELOG.md`
- [ ] included comments for [documenting Go code](https://blog.golang.org/godoc)
- [ ] updated the relevant documentation or specification
- [ ] reviewed "Files changed" and left comments if necessary
- [ ] confirmed all CI checks have passed

### Reviewers Checklist

*All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items.*

I have...

- [ ] confirmed the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] confirmed `!` in the type prefix if API or client breaking change
- [ ] confirmed all author checklist items have been addressed 
- [ ] reviewed state machine logic
- [ ] reviewed API design and naming
- [ ] reviewed documentation is accurate
- [ ] reviewed tests and test coverage
- [ ] manually tested (if applicable)
  • Loading branch information
yihuang authored Feb 24, 2022
1 parent c9a37fe commit 7e18e9f
Show file tree
Hide file tree
Showing 20 changed files with 5,960 additions and 722 deletions.
3,126 changes: 3,098 additions & 28 deletions api/cosmos/base/snapshots/v1beta1/snapshot.pulsar.go

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,22 @@ func DefaultStoreLoader(ms sdk.CommitMultiStore) error {
return ms.LoadLatestVersion()
}

// CommitMultiStore returns the root multi-store.
// App constructor can use this to access the `cms`.
// UNSAFE: only safe to use during app initialization.
func (app *BaseApp) CommitMultiStore() sdk.CommitMultiStore {
if app.sealed {
panic("cannot call CommitMultiStore() after baseapp is sealed")
}
return app.cms
}

// SnapshotManager returns the snapshot manager.
// application use this to register extra extension snapshotters.
func (app *BaseApp) SnapshotManager() *snapshots.Manager {
return app.snapshotManager
}

// LoadVersion loads the BaseApp application version. It will panic if called
// more than once on a running baseapp.
func (app *BaseApp) LoadVersion(version int64) error {
Expand Down
2 changes: 1 addition & 1 deletion baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (app *BaseApp) SetSnapshotStore(snapshotStore *snapshots.Store) {
app.snapshotManager = nil
return
}
app.snapshotManager = snapshots.NewManager(snapshotStore, app.cms)
app.snapshotManager = snapshots.NewManager(snapshotStore, app.cms, nil)
}

// SetSnapshotInterval sets the snapshot interval.
Expand Down
39 changes: 38 additions & 1 deletion proto/cosmos/base/snapshots/v1beta1/snapshot.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,41 @@ message Snapshot {
// Metadata contains SDK-specific snapshot metadata.
message Metadata {
repeated bytes chunk_hashes = 1; // SHA-256 chunk hashes
}
}

// SnapshotItem is an item contained in a rootmulti.Store snapshot.
message SnapshotItem {
// item is the specific type of snapshot item.
oneof item {
SnapshotStoreItem store = 1;
SnapshotIAVLItem iavl = 2 [(gogoproto.customname) = "IAVL"];
SnapshotExtensionMeta extension = 3;
SnapshotExtensionPayload extension_payload = 4;
}
}

// SnapshotStoreItem contains metadata about a snapshotted store.
message SnapshotStoreItem {
string name = 1;
}

// SnapshotIAVLItem is an exported IAVL node.
message SnapshotIAVLItem {
bytes key = 1;
bytes value = 2;
// version is block height
int64 version = 3;
// height is depth of the tree.
int32 height = 4;
}

// SnapshotExtensionMeta contains metadata about an external snapshotter.
message SnapshotExtensionMeta {
string name = 1;
uint32 format = 2;
}

// SnapshotExtensionPayload contains payloads of an external snapshotter.
message SnapshotExtensionPayload {
bytes payload = 1;
}
28 changes: 0 additions & 28 deletions proto/cosmos/base/store/v1beta1/snapshot.proto

This file was deleted.

8 changes: 5 additions & 3 deletions server/mock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package mock
import (
"io"

protoio "github.com/gogo/protobuf/io"
dbm "github.com/tendermint/tm-db"

snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)
Expand Down Expand Up @@ -122,13 +124,13 @@ func (ms multiStore) SetInitialVersion(version int64) error {
panic("not implemented")
}

func (ms multiStore) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
func (ms multiStore) Snapshot(height uint64, protoWriter protoio.Writer) error {
panic("not implemented")
}

func (ms multiStore) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
panic("not implemented")
}

Expand Down
File renamed without changes.
File renamed without changes.
105 changes: 74 additions & 31 deletions snapshots/helpers_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
package snapshots_test

import (
"bufio"
"bytes"
"compress/zlib"
"crypto/sha256"
"errors"
"io"
"os"
"testing"
"time"

protoio "github.com/gogo/protobuf/io"
"github.com/stretchr/testify/require"
db "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/snapshots"
"github.com/cosmos/cosmos-sdk/snapshots/types"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

func checksums(slice [][]byte) [][]byte {
Expand Down Expand Up @@ -56,45 +61,85 @@ func readChunks(chunks <-chan io.ReadCloser) [][]byte {
return bodies
}

// snapshotItems serialize a array of bytes as SnapshotItem_ExtensionPayload, and return the chunks.
func snapshotItems(items [][]byte) [][]byte {
// copy the same parameters from the code
snapshotChunkSize := uint64(10e6)
snapshotBufferSize := int(snapshotChunkSize)

ch := make(chan io.ReadCloser)
go func() {
chunkWriter := snapshots.NewChunkWriter(ch, snapshotChunkSize)
bufWriter := bufio.NewWriterSize(chunkWriter, snapshotBufferSize)
zWriter, _ := zlib.NewWriterLevel(bufWriter, 7)
protoWriter := protoio.NewDelimitedWriter(zWriter)
for _, item := range items {
types.WriteExtensionItem(protoWriter, item)
}
protoWriter.Close()
zWriter.Close()
bufWriter.Flush()
chunkWriter.Close()
}()

var chunks [][]byte
for chunkBody := range ch {
chunk, err := io.ReadAll(chunkBody)
if err != nil {
panic(err)
}
chunks = append(chunks, chunk)
}
return chunks
}

type mockSnapshotter struct {
chunks [][]byte
items [][]byte
}

func (m *mockSnapshotter) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
if format == 0 {
return types.ErrUnknownFormat
return snapshottypes.SnapshotItem{}, types.ErrUnknownFormat
}
if m.chunks != nil {
return errors.New("already has contents")
if m.items != nil {
return snapshottypes.SnapshotItem{}, errors.New("already has contents")
}
if ready != nil {
close(ready)

m.items = [][]byte{}
for {
item := &snapshottypes.SnapshotItem{}
err := protoReader.ReadMsg(item)
if err == io.EOF {
break
} else if err != nil {
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message")
}
payload := item.GetExtensionPayload()
if payload == nil {
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message")
}
m.items = append(m.items, payload.Payload)
}

m.chunks = [][]byte{}
for reader := range chunks {
chunk, err := io.ReadAll(reader)
if err != nil {
return snapshottypes.SnapshotItem{}, nil
}

func (m *mockSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
for _, item := range m.items {
if err := types.WriteExtensionItem(protoWriter, item); err != nil {
return err
}
m.chunks = append(m.chunks, chunk)
}

return nil
}

func (m *mockSnapshotter) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
if format == 0 {
return nil, types.ErrUnknownFormat
}
ch := make(chan io.ReadCloser, len(m.chunks))
for _, chunk := range m.chunks {
ch <- io.NopCloser(bytes.NewReader(chunk))
}
close(ch)
return ch, nil
func (m *mockSnapshotter) SnapshotFormat() uint32 {
return 1
}
func (m *mockSnapshotter) SupportedFormats() []uint32 {
return []uint32{1}
}

// setupBusyManager creates a manager with an empty store that is busy creating a snapshot at height 1.
Expand All @@ -110,7 +155,7 @@ func setupBusyManager(t *testing.T) *snapshots.Manager {
store, err := snapshots.NewStore(db.NewMemDB(), tempdir)
require.NoError(t, err)
hung := newHungSnapshotter()
mgr := snapshots.NewManager(store, hung)
mgr := snapshots.NewManager(store, hung, nil)

go func() {
_, err := mgr.Create(1)
Expand All @@ -137,15 +182,13 @@ func (m *hungSnapshotter) Close() {
close(m.ch)
}

func (m *hungSnapshotter) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
func (m *hungSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
<-m.ch
ch := make(chan io.ReadCloser, 1)
ch <- io.NopCloser(bytes.NewReader([]byte{}))
return ch, nil
return nil
}

func (m *hungSnapshotter) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
panic("not implemented")
}
Loading

0 comments on commit 7e18e9f

Please sign in to comment.