Skip to content

Commit

Permalink
feat: make ChangeSet and KVPair protobuf serializable (backport #726) (
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Apr 22, 2023
1 parent 61b2862 commit e74c486
Show file tree
Hide file tree
Showing 12 changed files with 678 additions and 0 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Unreleased

### Improvements

- [#726](https://github.com/cosmos/iavl/pull/726) Make `KVPair` and `ChangeSet` serializable with protobuf.

## 0.20.0 (March 14, 2023)

- [#622](https://github.com/cosmos/iavl/pull/622) `export/newExporter()` and `ImmutableTree.Export()` returns error for nil arguements
Expand Down
9 changes: 9 additions & 0 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: v1
managed:
enabled: true
go_package_prefix:
default: github.com/cosmos/iavl/proto
plugins:
- plugin: buf.build/protocolbuffers/go
out: proto
opt: paths=source_relative
152 changes: 152 additions & 0 deletions diff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package iavl

import (
"bytes"

"github.com/cosmos/iavl/proto"
)

type (
KVPair = proto.KVPair
ChangeSet = proto.ChangeSet
)

// KVPairReceiver is callback parameter of method `extractStateChanges` to receive stream of `KVPair`s.
type KVPairReceiver func(pair *KVPair) error

// extractStateChanges extracts the state changes by between two versions of the tree.
// it first traverse the `root` tree until the first `sharedNode` and record the new leave nodes,
// then traverse the `prevRoot` tree until the current `sharedNode` to find out orphaned leave nodes,
// compare orphaned leave nodes and new leave nodes to produce stream of `KVPair`s and passed to callback.
//
// The algorithm don't run in constant memory strictly, but it tried the best the only
// keep minimal intermediate states in memory.
func (ndb *nodeDB) extractStateChanges(prevVersion int64, prevRoot, root []byte, receiver KVPairReceiver) error {
curIter, err := NewNodeIterator(root, ndb)
if err != nil {
return err
}

prevIter, err := NewNodeIterator(prevRoot, ndb)
if err != nil {
return err
}

var (
// current shared node between two versions
sharedNode *Node
// record the newly added leaf nodes during the traversal to the `sharedNode`,
// will be compared with found orphaned nodes to produce change set stream.
newLeaves []*Node
)

// consumeNewLeaves concumes remaining `newLeaves` nodes and produce insertion `KVPair`.
consumeNewLeaves := func() error {
for _, node := range newLeaves {
if err := receiver(&KVPair{
Key: node.key,
Value: node.value,
}); err != nil {
return err
}
}

newLeaves = newLeaves[:0]
return nil
}

// advanceSharedNode forward `curIter` until the next `sharedNode`,
// `sharedNode` will be `nil` if the new version is exhausted.
// it also records the new leaf nodes during the traversal.
advanceSharedNode := func() error {
if err := consumeNewLeaves(); err != nil {
return err
}

sharedNode = nil
for curIter.Valid() {
node := curIter.GetNode()
shared := node.version <= prevVersion
curIter.Next(shared)
if shared {
sharedNode = node
break
} else if node.isLeaf() {
newLeaves = append(newLeaves, node)
}
}

return nil
}
if err := advanceSharedNode(); err != nil {
return err
}

// addOrphanedLeave receives a new orphaned leave node found in previous version,
// compare with the current newLeaves, to produce `iavl.KVPair` stream.
addOrphanedLeave := func(orphaned *Node) error {
for len(newLeaves) > 0 {
newLeave := newLeaves[0]
switch bytes.Compare(orphaned.key, newLeave.key) {
case 1:
// consume a new node as insertion and continue
newLeaves = newLeaves[1:]
if err := receiver(&KVPair{
Key: newLeave.key,
Value: newLeave.value,
}); err != nil {
return err
}
continue

case -1:
// removal, don't consume new nodes
return receiver(&KVPair{
Delete: true,
Key: orphaned.key,
})

case 0:
// update, consume the new node and stop
newLeaves = newLeaves[1:]
return receiver(&KVPair{
Key: newLeave.key,
Value: newLeave.value,
})
}
}

// removal
return receiver(&KVPair{
Delete: true,
Key: orphaned.key,
})
}

// Traverse `prevIter` to find orphaned nodes in the previous version,
// and compare them with newLeaves to generate `KVPair` stream.
for prevIter.Valid() {
node := prevIter.GetNode()
shared := sharedNode != nil && (node == sharedNode || bytes.Equal(node.hash, sharedNode.hash))
// skip sub-tree of shared nodes
prevIter.Next(shared)
if shared {
if err := advanceSharedNode(); err != nil {
return err
}
} else if node.isLeaf() {
if err := addOrphanedLeave(node); err != nil {
return err
}
}
}

if err := consumeNewLeaves(); err != nil {
return err
}

if err := curIter.Error(); err != nil {
return err
}
return prevIter.Error()
}
102 changes: 102 additions & 0 deletions diff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package iavl

import (
"encoding/binary"
"fmt"
"math"
"math/rand"
"sort"
"testing"

db "github.com/cometbft/cometbft-db"
"github.com/stretchr/testify/require"
)

// TestDiffRoundTrip generate random change sets, build an iavl tree versions,
// then extract state changes from the versions and compare with the original change sets.
func TestDiffRoundTrip(t *testing.T) {
changeSets := genChangeSets(rand.New(rand.NewSource(0)), 300)

// apply changeSets to tree
db := db.NewMemDB()
tree, err := NewMutableTree(db, 0, true)
require.NoError(t, err)
for i := range changeSets {
v, err := tree.SaveChangeSet(changeSets[i])
require.NoError(t, err)
require.Equal(t, int64(i+1), v)
}

// extract change sets from db
var extractChangeSets []*ChangeSet
tree2 := NewImmutableTree(db, 0, true)
err = tree2.ndb.traverseStateChanges(0, math.MaxInt64, func(version int64, changeSet *ChangeSet) error {
extractChangeSets = append(extractChangeSets, changeSet)
return nil
})
require.NoError(t, err)
require.Equal(t, changeSets, extractChangeSets)
}

func genChangeSets(r *rand.Rand, n int) []*ChangeSet {
var changeSets []*ChangeSet

for i := 0; i < n; i++ {
items := make(map[string]*KVPair)
start, count, step := r.Int63n(1000), r.Int63n(1000), r.Int63n(10)
for i := start; i < start+count*step; i += step {
value := make([]byte, 8)
binary.LittleEndian.PutUint64(value, uint64(i))

key := fmt.Sprintf("test-%d", i)
items[key] = &KVPair{
Key: []byte(key),
Value: value,
}
}
if len(changeSets) > 0 {
// pick some random keys to delete from the last version
lastChangeSet := changeSets[len(changeSets)-1]
count = r.Int63n(10)
for _, pair := range lastChangeSet.Pairs {
if count <= 0 {
break
}
if pair.Delete {
continue
}
items[string(pair.Key)] = &KVPair{
Key: pair.Key,
Delete: true,
}
count--
}

// Special case, set to identical value
if len(lastChangeSet.Pairs) > 0 {
i := r.Int63n(int64(len(lastChangeSet.Pairs)))
pair := lastChangeSet.Pairs[i]
if !pair.Delete {
items[string(pair.Key)] = &KVPair{
Key: pair.Key,
Value: pair.Value,
}
}
}
}

var keys []string
for key := range items {
keys = append(keys, key)
}
sort.Strings(keys)

var cs ChangeSet
for _, key := range keys {
cs.Pairs = append(cs.Pairs, items[key])
}

changeSets = append(changeSets, &cs)
}
return changeSets
}
74 changes: 74 additions & 0 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,77 @@ func (iter *Iterator) Error() error {
func (iter *Iterator) IsFast() bool {
return false
}

// NodeIterator is an iterator for nodeDB to traverse a tree in depth-first, preorder manner.
type NodeIterator struct {
nodesToVisit []*Node
ndb *nodeDB
err error
}

// NewNodeIterator returns a new NodeIterator to traverse the tree of the root node.
func NewNodeIterator(rootKey []byte, ndb *nodeDB) (*NodeIterator, error) {
if rootKey == nil {
return &NodeIterator{
nodesToVisit: []*Node{},
ndb: ndb,
}, nil
}

node, err := ndb.GetNode(rootKey)
if err != nil {
return nil, err
}

return &NodeIterator{
nodesToVisit: []*Node{node},
ndb: ndb,
}, nil
}

// GetNode returns the current visiting node.
func (iter *NodeIterator) GetNode() *Node {
return iter.nodesToVisit[len(iter.nodesToVisit)-1]
}

// Valid checks if the validator is valid.
func (iter *NodeIterator) Valid() bool {
return iter.err == nil && len(iter.nodesToVisit) > 0
}

// Error returns an error if any errors.
func (iter *NodeIterator) Error() error {
return iter.err
}

// Next moves forward the traversal.
// if isSkipped is true, the subtree under the current node is skipped.
func (iter *NodeIterator) Next(isSkipped bool) {
if !iter.Valid() {
return
}
node := iter.GetNode()
iter.nodesToVisit = iter.nodesToVisit[:len(iter.nodesToVisit)-1]

if isSkipped {
return
}

if node.isLeaf() {
return
}

rightNode, err := iter.ndb.GetNode(node.rightHash)
if err != nil {
iter.err = err
return
}
iter.nodesToVisit = append(iter.nodesToVisit, rightNode)

leftNode, err := iter.ndb.GetNode(node.leftHash)
if err != nil {
iter.err = err
return
}
iter.nodesToVisit = append(iter.nodesToVisit, leftNode)
}
26 changes: 26 additions & 0 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -1287,3 +1287,29 @@ func (tree *MutableTree) addOrphans(orphans []*Node) error {
}
return nil
}

// SaveChangeSet saves a ChangeSet to the tree.
// It is used to replay a ChangeSet as a new version.
func (tree *MutableTree) SaveChangeSet(cs *ChangeSet) (int64, error) {
// if the tree has uncommitted changes, return error
if tree.root != nil && tree.root.key == nil {
return 0, fmt.Errorf("cannot save changeset with uncommitted changes")
}
for _, pair := range cs.Pairs {
if pair.Delete {
_, removed, err := tree.Remove(pair.Key)
if !removed {
return 0, fmt.Errorf("attempted to remove non-existent key %s", pair.Key)
}
if err != nil {
return 0, err
}
} else {
if _, err := tree.Set(pair.Key, pair.Value); err != nil {
return 0, err
}
}
}
_, version, err := tree.SaveVersion()
return version, err
}
Loading

0 comments on commit e74c486

Please sign in to comment.