Skip to content

Commit

Permalink
fix(store/v2/pebble): handle version 0 in keys
Browse files Browse the repository at this point in the history
  • Loading branch information
kocubinski committed Nov 14, 2024
1 parent e74799e commit b17c4b3
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 10 deletions.
23 changes: 23 additions & 0 deletions store/v2/storage/pebbledb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ type Batch struct {
batch *pebble.Batch
version uint64
sync bool
size int
}

const (
oneIf64Bit = ^uint(0) >> 63
maxUint32OrInt = (1<<31)<<oneIf64Bit - 1
maxVarintLen32 = 5
)

func keyValueSize(key, value []byte) int {
return len(key) + len(value) + 1 + 2*maxVarintLen32
}

func NewBatch(storage *pebble.DB, version uint64, sync bool) (*Batch, error) {
Expand All @@ -34,6 +45,7 @@ func NewBatch(storage *pebble.DB, version uint64, sync bool) (*Batch, error) {
batch: batch,
version: version,
sync: sync,
size: keyValueSize([]byte(latestVersionKey), versionBz[:]),
}, nil
}

Expand All @@ -50,9 +62,20 @@ func (b *Batch) set(storeKey []byte, tombstone uint64, key, value []byte) error
prefixedKey := MVCCEncode(prependStoreKey(storeKey, key), b.version)
prefixedVal := MVCCEncode(value, tombstone)

size := keyValueSize(prefixedKey, prefixedVal)
if b.size+size > maxUint32OrInt {
// 4 GB is huge, probably genesis; flush and reset
if err := b.batch.Commit(&pebble.WriteOptions{Sync: b.sync}); err != nil {
return fmt.Errorf("max batch size exceed: failed to write PebbleDB batch: %w", err)
}
b.batch.Reset()
b.size = 0
}

if err := b.batch.Set(prefixedKey, prefixedVal, nil); err != nil {
return fmt.Errorf("failed to write PebbleDB batch: %w", err)
}
b.size += size

return nil
}
Expand Down
21 changes: 14 additions & 7 deletions store/v2/storage/pebbledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,14 @@ func (db *Database) Prune(version uint64) (err error) {
return fmt.Errorf("invalid PebbleDB MVCC key: %s", prefixedKey)
}

keyVersion, err := decodeUint64Ascending(verBz)
if err != nil {
return fmt.Errorf("failed to decode key version: %w", err)
var keyVersion uint64
// handle version 0 (no version prefix)
if len(verBz) > 0 {
keyVersion, err = decodeUint64Ascending(verBz)
if err != nil {
return fmt.Errorf("failed to decode key version: %w", err)
}
}

// seek to next key if we are at a version which is higher than prune height
if keyVersion > version {
itr.NextPrefix()
Expand Down Expand Up @@ -432,9 +435,13 @@ func getMVCCSlice(db *pebble.DB, storeKey, key []byte, version uint64) ([]byte,
return nil, fmt.Errorf("invalid PebbleDB MVCC key: %s", itr.Key())
}

keyVersion, err := decodeUint64Ascending(vBz)
if err != nil {
return nil, fmt.Errorf("failed to decode key version: %w", err)
var keyVersion uint64
// handle version 0 (no version prefix)
if len(vBz) > 0 {
keyVersion, err = decodeUint64Ascending(vBz)
if err != nil {
return nil, fmt.Errorf("failed to decode key version: %w", err)
}
}
if keyVersion > version {
return nil, fmt.Errorf("key version too large: %d", keyVersion)
Expand Down
11 changes: 8 additions & 3 deletions store/v2/storage/pebbledb/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,15 +216,20 @@ func (itr *iterator) DebugRawIterate() {
valid = itr.source.SeekLT(MVCCEncode(firstKey, itr.version+1))
}

var err error
for valid {
key, vBz, ok := SplitMVCCKey(itr.source.Key())
if !ok {
panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key()))
}

version, err := decodeUint64Ascending(vBz)
if err != nil {
panic(fmt.Errorf("failed to decode key version: %w", err))
var version uint64
// handle version 0 (no version prefix)
if len(vBz) > 0 {
version, err = decodeUint64Ascending(vBz)
if err != nil {
panic(fmt.Errorf("failed to decode key version: %w", err))
}
}

val, tombBz, ok := SplitMVCCKey(itr.source.Value())
Expand Down

0 comments on commit b17c4b3

Please sign in to comment.