Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: save restored snapshot locally (backport #16060) #16262

Merged
merged 2 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ Ref: https://keepachangelog.com/en/1.0.0/

## [Unreleased]

## Features

* [#16060](https://github.com/cosmos/cosmos-sdk/pull/16060) Support saving restoring snapshot locally.

### Improvements

* (deps) [#15973](https://github.com/cosmos/cosmos-sdk/pull/15973) Bump CometBFT to [v0.34.28](https://github.com/cometbft/cometbft/blob/v0.34.28/CHANGELOG.md#v03428).
Expand Down
82 changes: 63 additions & 19 deletions snapshots/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"math"
"os"
"sort"
"sync"

Expand Down Expand Up @@ -38,12 +39,12 @@ type Manager struct {
multistore types.Snapshotter
logger log.Logger

mtx sync.Mutex
operation operation
chRestore chan<- io.ReadCloser
chRestoreDone <-chan restoreDone
restoreChunkHashes [][]byte
restoreChunkIndex uint32
mtx sync.Mutex
operation operation
chRestore chan<- uint32
chRestoreDone <-chan restoreDone
restoreSnapshot *types.Snapshot
restoreChunkIndex uint32
}

// operation represents a Manager operation. Only one operation can be in progress at a time.
Expand All @@ -61,7 +62,8 @@ const (
opPrune operation = "prune"
opRestore operation = "restore"

chunkBufferSize = 4
chunkBufferSize = 4
chunkIDBufferSize = 1024

snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit
)
Expand Down Expand Up @@ -131,7 +133,7 @@ func (m *Manager) endLocked() {
m.chRestore = nil
}
m.chRestoreDone = nil
m.restoreChunkHashes = nil
m.restoreSnapshot = nil
m.restoreChunkIndex = 0
}

Expand Down Expand Up @@ -284,27 +286,57 @@ func (m *Manager) Restore(snapshot types.Snapshot) error {
}

// Start an asynchronous snapshot restoration, passing chunks and completion status via channels.
chChunks := make(chan io.ReadCloser, chunkBufferSize)
chChunkIDs := make(chan uint32, chunkIDBufferSize)
chDone := make(chan restoreDone, 1)

dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format)
if err := os.MkdirAll(dir, 0o750); err != nil {
return sdkerrors.Wrapf(err, "failed to create snapshot directory %q", dir)
}

chChunks := m.loadChunkStream(snapshot.Height, snapshot.Format, chChunkIDs)

go func() {
err := m.restoreSnapshot(snapshot, chChunks)
err := m.doRestoreSnapshot(snapshot, chChunks)
chDone <- restoreDone{
complete: err == nil,
err: err,
}
close(chDone)
}()

m.chRestore = chChunks
m.chRestore = chChunkIDs
m.chRestoreDone = chDone
m.restoreChunkHashes = snapshot.Metadata.ChunkHashes
m.restoreSnapshot = &snapshot
m.restoreChunkIndex = 0
return nil
}

// restoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed.
func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error {
func (m *Manager) loadChunkStream(height uint64, format uint32, chunkIDs <-chan uint32) <-chan io.ReadCloser {
chunks := make(chan io.ReadCloser, chunkBufferSize)
go func() {
defer close(chunks)

for chunkID := range chunkIDs {
chunk, err := m.store.loadChunkFile(height, format, chunkID)
if err != nil {
m.logger.Error("load chunk file failed", "height", height, "format", format, "chunk", chunkID, "err", err)
break
}
chunks <- chunk
}
}()

return chunks
}

// doRestoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed.
func (m *Manager) doRestoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error {
dir := m.store.pathSnapshot(snapshot.Height, snapshot.Format)
if err := os.MkdirAll(dir, 0o750); err != nil {
return sdkerrors.Wrapf(err, "failed to create snapshot directory %q", dir)
}

streamReader, err := NewStreamReader(chChunks)
if err != nil {
return err
Expand Down Expand Up @@ -348,7 +380,7 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "no restore operation in progress")
}

if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) {
if int(m.restoreChunkIndex) >= len(m.restoreSnapshot.Metadata.ChunkHashes) {
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "received unexpected chunk")
}

Expand All @@ -365,19 +397,30 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {

// Verify the chunk hash.
hash := sha256.Sum256(chunk)
expected := m.restoreChunkHashes[m.restoreChunkIndex]
expected := m.restoreSnapshot.Metadata.ChunkHashes[m.restoreChunkIndex]
if !bytes.Equal(hash[:], expected) {
return false, sdkerrors.Wrapf(types.ErrChunkHashMismatch,
"expected %x, got %x", hash, expected)
}

if err := m.store.saveChunkContent(chunk, m.restoreChunkIndex, m.restoreSnapshot); err != nil {
return false, sdkerrors.Wrapf(err, "save chunk content %d", m.restoreChunkIndex)
}

// Pass the chunk to the restore, and wait for completion if it was the final one.
m.chRestore <- io.NopCloser(bytes.NewReader(chunk))
m.chRestore <- m.restoreChunkIndex
m.restoreChunkIndex++

if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) {
if int(m.restoreChunkIndex) >= len(m.restoreSnapshot.Metadata.ChunkHashes) {
close(m.chRestore)
m.chRestore = nil

// the chunks are all written into files, we can save the snapshot to the db,
// even if the restoration may not completed yet.
if err := m.store.saveSnapshot(m.restoreSnapshot); err != nil {
return false, sdkerrors.Wrap(err, "save restoring snapshot")
}

done := <-m.chRestoreDone
m.endLocked()
if done.err != nil {
Expand All @@ -386,6 +429,7 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {
if !done.complete {
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "restore ended prematurely")
}

return true, nil
}
return false, nil
Expand All @@ -411,7 +455,7 @@ func (m *Manager) RestoreLocalSnapshot(height uint64, format uint32) error {
}
defer m.endLocked()

return m.restoreSnapshot(*snapshot, ch)
return m.doRestoreSnapshot(*snapshot, ch)
}

// sortedExtensionNames sort extension names for deterministic iteration.
Expand Down
7 changes: 7 additions & 0 deletions snapshots/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ func TestManager_Restore(t *testing.T) {

assert.Equal(t, expectItems, target.items)

// The snapshot is saved in local snapshot store
snapshots, err := store.List()
require.NoError(t, err)
snapshot := snapshots[0]
require.Equal(t, uint64(3), snapshot.Height)
require.Equal(t, types.CurrentFormat, snapshot.Format)

// Starting a new restore should fail now, because the target already has contents.
err = manager.Restore(types.Snapshot{
Height: 3,
Expand Down
6 changes: 6 additions & 0 deletions snapshots/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,12 @@ func (s *Store) saveChunk(chunkBody io.ReadCloser, index uint32, snapshot *types
return nil
}

// saveChunkContent save the chunk to disk
func (s *Store) saveChunkContent(chunk []byte, index uint32, snapshot *types.Snapshot) error {
path := s.PathChunk(snapshot.Height, snapshot.Format, index)
return os.WriteFile(path, chunk, 0o600)
}

// saveSnapshot saves snapshot metadata to the database.
func (s *Store) saveSnapshot(snapshot *types.Snapshot) error {
value, err := proto.Marshal(snapshot)
Expand Down