-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(store/v2): snapshot manager #18458
Changes from 10 commits
8ddbff9
2569a3f
c0dcf8f
0f462be
9f876dc
beb0ef4
c792bde
5c25952
c558a40
2c30672
07185be
d03866b
19d0cd2
0598995
6d728b1
5026749
fdf6419
91dd007
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,14 +3,19 @@ package commitment | |
import ( | ||
"errors" | ||
"fmt" | ||
"io" | ||
"math" | ||
|
||
protoio "github.com/cosmos/gogoproto/io" | ||
ics23 "github.com/cosmos/ics23/go" | ||
|
||
"cosmossdk.io/log" | ||
"cosmossdk.io/store/v2" | ||
snapshotstypes "cosmossdk.io/store/v2/snapshots/types" | ||
) | ||
|
||
var _ store.Committer = (*CommitStore)(nil) | ||
var _ snapshotstypes.CommitSnapshotter = (*CommitStore)(nil) | ||
cool-develope marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// CommitStore is a wrapper around multiple Tree objects mapped by a unique store | ||
// key. Each store key reflects dedicated and unique usage within a module. A caller | ||
|
@@ -127,6 +132,140 @@ func (c *CommitStore) Prune(version uint64) (ferr error) { | |
return ferr | ||
} | ||
|
||
// Snapshot implements snapshotstypes.CommitSnapshotter. | ||
func (c *CommitStore) Snapshot(version uint64, protoWriter protoio.Writer) error { | ||
if version == 0 { | ||
return fmt.Errorf("the snapshot version must be greater than 0") | ||
} | ||
|
||
latestVersion, err := c.GetLatestVersion() | ||
if err != nil { | ||
return err | ||
} | ||
if version > latestVersion { | ||
return fmt.Errorf("the snapshot version %d is greater than the latest version %d", version, latestVersion) | ||
} | ||
|
||
for storeKey, tree := range c.multiTrees { | ||
exporter, err := tree.Export(version) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like an expensive operation being ran in a loop multiple times. Couldn't we extract the exporter right above once? We could invert the operations that iterate over storeKeys and exporter.Next() to iterate N times over exporter.Next() but storeKeys (M) times There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we have several trees |
||
if err != nil { | ||
return fmt.Errorf("failed to export tree for version %d: %w", version, err) | ||
} | ||
|
||
defer exporter.Close() | ||
cool-develope marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
err = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{ | ||
Item: &snapshotstypes.SnapshotItem_Store{ | ||
Store: &snapshotstypes.SnapshotStoreItem{ | ||
Name: storeKey, | ||
}, | ||
}, | ||
}) | ||
if err != nil { | ||
return fmt.Errorf("failed to write store name: %w", err) | ||
} | ||
|
||
for { | ||
item, err := exporter.Next() | ||
if errors.Is(err, ErrorExportDone) { | ||
break | ||
} else if err != nil { | ||
return fmt.Errorf("failed to get the next export node: %w", err) | ||
} | ||
|
||
if err = protoWriter.WriteMsg(&snapshotstypes.SnapshotItem{ | ||
Item: &snapshotstypes.SnapshotItem_IAVL{ | ||
IAVL: item, | ||
}, | ||
}); err != nil { | ||
return fmt.Errorf("failed to write iavl node: %w", err) | ||
} | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Restore implements snapshotstypes.CommitSnapshotter. | ||
func (c *CommitStore) Restore(version uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair) (snapshotstypes.SnapshotItem, error) { | ||
var ( | ||
importer Importer | ||
snapshotItem snapshotstypes.SnapshotItem | ||
storeKey string | ||
) | ||
|
||
loop: | ||
for { | ||
snapshotItem = snapshotstypes.SnapshotItem{} | ||
err := protoReader.ReadMsg(&snapshotItem) | ||
if errors.Is(err, io.EOF) { | ||
break | ||
} else if err != nil { | ||
return snapshotstypes.SnapshotItem{}, fmt.Errorf("invalid protobuf message: %w", err) | ||
} | ||
|
||
switch item := snapshotItem.Item.(type) { | ||
case *snapshotstypes.SnapshotItem_Store: | ||
if importer != nil { | ||
if err := importer.Commit(); err != nil { | ||
return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err) | ||
} | ||
importer.Close() | ||
} | ||
storeKey = item.Store.Name | ||
tree := c.multiTrees[storeKey] | ||
if tree == nil { | ||
return snapshotstypes.SnapshotItem{}, fmt.Errorf("store %s not found", storeKey) | ||
} | ||
importer, err = tree.Import(version) | ||
if err != nil { | ||
return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to import tree for version %d: %w", version, err) | ||
} | ||
defer importer.Close() | ||
|
||
case *snapshotstypes.SnapshotItem_IAVL: | ||
if importer == nil { | ||
return snapshotstypes.SnapshotItem{}, fmt.Errorf("received IAVL node item before store item") | ||
} | ||
node := item.IAVL | ||
if node.Height > int32(math.MaxInt8) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we have this restriction, is it something related to iavl? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, iavl requires There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, we should change this in iavlv2, its weird versions can go up to maxint64 but iavl will crash before then. Seems like there is a bit of a misstep in expectations between the two dependencies There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that in v2 of iavl we should remove this restriction. It makes code usage in iavl also awkward: one weekend many months ago before I found out about Matt's rewrite, I tried to tackle concurrency issues and data races in iavl using atomic.Int64 and it was awkward trying to use that in iavl. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is a just tree height, it can be up to maxUint8 (255) |
||
return snapshotstypes.SnapshotItem{}, fmt.Errorf("node height %v cannot exceed %v", | ||
item.IAVL.Height, math.MaxInt8) | ||
} | ||
// Protobuf does not differentiate between []byte{} and nil, but fortunately IAVL does | ||
// not allow nil keys nor nil values for leaf nodes, so we can always set them to empty. | ||
if node.Key == nil { | ||
node.Key = []byte{} | ||
} | ||
if node.Height == 0 { | ||
if node.Value == nil { | ||
node.Value = []byte{} | ||
} | ||
// If the node is a leaf node, it will be written to the storage. | ||
chStorage <- &store.KVPair{ | ||
Key: node.Key, | ||
Value: node.Value, | ||
StoreKey: storeKey, | ||
} | ||
} | ||
err := importer.Add(node) | ||
if err != nil { | ||
return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to add node to importer: %w", err) | ||
} | ||
default: | ||
break loop | ||
} | ||
} | ||
|
||
if importer != nil { | ||
if err := importer.Commit(); err != nil { | ||
return snapshotstypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err) | ||
} | ||
} | ||
|
||
return snapshotItem, c.LoadVersion(version) | ||
} | ||
|
||
func (c *CommitStore) Close() (ferr error) { | ||
for _, tree := range c.multiTrees { | ||
if err := tree.Close(); err != nil { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
package commitment | ||
|
||
import ( | ||
"testing" | ||
) | ||
|
||
func TestSnapshotter(t *testing.T) { | ||
// generate a new tree | ||
// storeKey := "store" | ||
// tree := generateTree(storeKey) | ||
// require.NotNil(t, tree) | ||
|
||
// latestVersion := uint64(10) | ||
// kvCount := 10 | ||
// for i := uint64(1); i <= latestVersion; i++ { | ||
// cs := store.NewChangeset() | ||
// for j := 0; j < kvCount; j++ { | ||
// key := []byte(fmt.Sprintf("key-%d-%d", i, j)) | ||
// value := []byte(fmt.Sprintf("value-%d-%d", i, j)) | ||
// cs.Add(key, value) | ||
// } | ||
// err := tree.WriteBatch(cs) | ||
// require.NoError(t, err) | ||
|
||
// _, err = tree.Commit() | ||
// require.NoError(t, err) | ||
// } | ||
|
||
// latestHash := tree.WorkingHash() | ||
|
||
// // create a snapshot | ||
// dummyExtensionItem := snapshotstypes.SnapshotItem{ | ||
// Item: &snapshotstypes.SnapshotItem_Extension{ | ||
// Extension: &snapshotstypes.SnapshotExtensionMeta{ | ||
// Name: "test", | ||
// Format: 1, | ||
// }, | ||
// }, | ||
// } | ||
// target := generateTree("") | ||
// chunks := make(chan io.ReadCloser, kvCount*int(latestVersion)) | ||
// go func() { | ||
// streamWriter := snapshots.NewStreamWriter(chunks) | ||
// require.NotNil(t, streamWriter) | ||
// defer streamWriter.Close() | ||
// err := tree.Snapshot(latestVersion, streamWriter) | ||
// require.NoError(t, err) | ||
// // write an extension metadata | ||
// err = streamWriter.WriteMsg(&dummyExtensionItem) | ||
// require.NoError(t, err) | ||
// }() | ||
|
||
// streamReader, err := snapshots.NewStreamReader(chunks) | ||
// chStorage := make(chan *store.KVPair, 100) | ||
// require.NoError(t, err) | ||
// nextItem, err := target.Restore(latestVersion, snapshotstypes.CurrentFormat, streamReader, chStorage) | ||
// require.NoError(t, err) | ||
// require.Equal(t, *dummyExtensionItem.GetExtension(), *nextItem.GetExtension()) | ||
|
||
// // check the store key | ||
// require.Equal(t, storeKey, target.storeKey) | ||
|
||
// // check the restored tree hash | ||
// require.Equal(t, latestHash, target.WorkingHash()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we please call this
itr
oriter
instead ofexporter
?exporter
makes it seem like its purpose is to write out content, but really the purpose is to iterate over the exported content.