Skip to content

Commit

Permalink
internal/compact: defer retrieval of external values
Browse files Browse the repository at this point in the history
During compaction iteration, defer the retrieval of values stored in value
blocks until either:

a) we need to copy the value before stepping the iterator, or
b) the compaction iterator yields the value to the main compaction loop.

This refactor ensures that when a KV is elided by a tombstone, we avoid
unnecessarily loading its value from the external value block. Additionally,
refactoring the compaction iterator interfaces to propagate LazyValues will be
used by value separation (cockroachdb#112) in which we will sometimes propagate external
value references to output sstables without ever retrieving the value.

Close cockroachdb#4197.
Informs cockroachdb#112.
  • Loading branch information
jbowens committed Dec 12, 2024
1 parent ab9741a commit 2c1fd7e
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 64 deletions.
6 changes: 6 additions & 0 deletions internal/base/lazy_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ func (lv *LazyValue) fetchValue(
return f.value, f.callerOwned, f.err
}

// IsInPlaceValue returns true iff the value was stored in-place and does not
// need to be fetched externally.
func (lv *LazyValue) IsInPlaceValue() bool {
return lv.Fetcher == nil
}

// InPlaceValue returns the value under the assumption that it is in-place.
// This is for Pebble-internal code.
func (lv *LazyValue) InPlaceValue() []byte {
Expand Down
2 changes: 1 addition & 1 deletion internal/base/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package base
import "io"

// Merge creates a ValueMerger for the specified key initialized with the value
// of one merge operand.
// of one merge operand. The caller retains ownership of key and value.
type Merge func(key, value []byte) (ValueMerger, error)

// ValueMerger receives merge operands one by one. The operand received is either
Expand Down
141 changes: 86 additions & 55 deletions internal/compact/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ type Iter struct {
// original trailer (eg, before any sequence-number zeroing or changes to
// key kind).
keyTrailer base.InternalKeyTrailer
value []byte
value base.LazyValue
valueCloser io.Closer
// Temporary buffer used for storing the previous user key in order to
// determine when iteration has advanced to a new user key and thus a new
Expand All @@ -189,7 +189,6 @@ type Iter struct {
// advanced.
valueBuf []byte
iterKV *base.InternalKV
iterValue []byte
iterStripeChange stripeChangeType
// skip indicates whether the remaining entries in the current snapshot
// stripe should be skipped or processed. `skip` has no effect when `pos ==
Expand Down Expand Up @@ -362,16 +361,12 @@ func (i *Iter) Stats() IterStats {
}

// First has the same semantics as InternalIterator.First.
func (i *Iter) First() (*base.InternalKey, []byte) {
func (i *Iter) First() (*base.InternalKey, base.LazyValue) {
if i.err != nil {
return nil, nil
return nil, base.LazyValue{}
}
i.iterKV = i.iter.First()
if i.iterKV != nil {
i.iterValue, _, i.err = i.iterKV.Value(nil)
if i.err != nil {
return nil, nil
}
i.curSnapshotIdx, i.curSnapshotSeqNum = i.cfg.Snapshots.IndexAndSeqNum(i.iterKV.SeqNum())
}
i.pos = iterPosNext
Expand All @@ -382,14 +377,14 @@ func (i *Iter) First() (*base.InternalKey, []byte) {
// Next has the same semantics as InternalIterator.Next. Note that when Next
// returns a RANGEDEL or a range key, the caller can use Span() to get the
// corresponding span.
func (i *Iter) Next() (*base.InternalKey, []byte) {
func (i *Iter) Next() (*base.InternalKey, base.LazyValue) {
if i.err != nil {
return nil, nil
return nil, base.LazyValue{}
}

// Close the closer for the current value if one was open.
if i.closeValueCloser() != nil {
return nil, nil
return nil, base.LazyValue{}
}

// Prior to this call to `Next()` we are in one of three situations with
Expand Down Expand Up @@ -477,12 +472,15 @@ func (i *Iter) Next() (*base.InternalKey, []byte) {
// This goes against the comment on i.key in the struct, and
// therefore warrants some investigation.
i.saveKey()
i.value = i.iterKV.V
if invariants.Enabled && !i.value.IsInPlaceValue() {
panic(errors.AssertionFailedf("pebble: span key's value is not in-place"))
}
// TODO(jackson): Handle tracking pinned statistics for range keys
// and range deletions. This would require updating
// emitRangeDelChunk and rangeKeyCompactionTransform to update
// statistics when they apply their own snapshot striping logic.
i.snapshotPinned = false
i.value = i.iterValue
return &i.key, i.value
}

Expand Down Expand Up @@ -545,7 +543,7 @@ func (i *Iter) Next() (*base.InternalKey, []byte) {
switch i.iterKV.Kind() {
case base.InternalKeyKindDelete:
i.saveKey()
i.value = i.iterValue
i.value = base.LazyValue{} // DELs are value-less.
i.skip = true
return &i.key, i.value

Expand All @@ -559,7 +557,7 @@ func (i *Iter) Next() (*base.InternalKey, []byte) {
if i.singleDeleteNext() {
return &i.key, i.value
} else if i.err != nil {
return nil, nil
return nil, base.LazyValue{}
}
continue

Expand All @@ -576,7 +574,7 @@ func (i *Iter) Next() (*base.InternalKey, []byte) {
// kind.
i.setNext()
if i.err != nil {
return nil, nil
return nil, base.LazyValue{}
}
return &i.key, i.value

Expand All @@ -585,7 +583,8 @@ func (i *Iter) Next() (*base.InternalKey, []byte) {
// advances the iterator, adjusting curSnapshotIdx.
origSnapshotIdx := i.curSnapshotIdx
var valueMerger base.ValueMerger
valueMerger, i.err = i.cfg.Merge(i.iterKV.K.UserKey, i.iterValue)
// MERGE values are always stored in-place.
valueMerger, i.err = i.cfg.Merge(i.iterKV.K.UserKey, i.iterKV.V.InPlaceValue())
if i.err == nil {
i.mergeNext(valueMerger)
}
Expand All @@ -607,7 +606,7 @@ func (i *Iter) Next() (*base.InternalKey, []byte) {
if i.err == nil {
if needDelete {
if i.closeValueCloser() != nil {
return nil, nil
return nil, base.LazyValue{}
}
continue
}
Expand All @@ -620,15 +619,15 @@ func (i *Iter) Next() (*base.InternalKey, []byte) {
// MERGE?
i.err = base.MarkCorruptionError(i.err)
}
return nil, nil
return nil, base.LazyValue{}

default:
i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKV.Kind()))
return nil, nil
return nil, base.LazyValue{}
}
}

return nil, nil
return nil, base.LazyValue{}
}

// Span returns the range deletion or range key span corresponding to the
Expand Down Expand Up @@ -666,11 +665,8 @@ func (i *Iter) skipInStripe() {

func (i *Iter) iterNext() bool {
i.iterKV = i.iter.Next()
if i.iterKV != nil {
i.iterValue, _, i.err = i.iterKV.Value(nil)
if i.err != nil {
i.iterKV = nil
}
if i.err != nil {
i.iterKV = nil
}
return i.iterKV != nil
}
Expand Down Expand Up @@ -779,7 +775,7 @@ func (i *Iter) nextInStripeHelper() stripeChangeType {
func (i *Iter) setNext() {
// Save the current key.
i.saveKey()
i.value = i.iterValue
i.value = i.iterKV.V
i.maybeZeroSeqnum(i.curSnapshotIdx)

// If this key is already a SETWITHDEL we can early return and skip the remaining
Expand All @@ -789,9 +785,8 @@ func (i *Iter) setNext() {
return
}

// We are iterating forward. Save the current value.
i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
i.value = i.valueBuf
// We need to iterate forward. Save the current value so we don't lose it.
i.saveValue()

// Else, we continue to loop through entries in the stripe looking for a
// DEL. Note that we may stop *before* encountering a DEL, if one exists.
Expand Down Expand Up @@ -874,7 +869,16 @@ func (i *Iter) mergeNext(valueMerger base.ValueMerger) {
// value and return. We change the kind of the resulting key to a
// Set so that it shadows keys in lower levels. That is:
// MERGE + (SET*) -> SET.
i.err = valueMerger.MergeOlder(i.iterValue)
var v []byte
var callerOwned bool
v, callerOwned, i.err = i.iterKV.Value(i.valueBuf[:0])
if i.err != nil {
return
}
if callerOwned && cap(v) > cap(i.valueBuf) {
i.valueBuf = v
}
i.err = valueMerger.MergeOlder(v)
if i.err != nil {
return
}
Expand All @@ -885,7 +889,9 @@ func (i *Iter) mergeNext(valueMerger base.ValueMerger) {
case base.InternalKeyKindMerge:
// We've hit another Merge value. Merge with the existing value and
// continue looping.
i.err = valueMerger.MergeOlder(i.iterValue)
//
// MERGE values are always stored in-place.
i.err = valueMerger.MergeOlder(i.iterKV.InPlaceValue())
if i.err != nil {
return
}
Expand All @@ -910,7 +916,7 @@ func (i *Iter) mergeNext(valueMerger base.ValueMerger) {
func (i *Iter) singleDeleteNext() bool {
// Save the current key.
i.saveKey()
i.value = i.iterValue
i.value = i.iterKV.V

// Loop until finds a key to be passed to the next level.
for {
Expand Down Expand Up @@ -1097,20 +1103,17 @@ func (i *Iter) skipDueToSingleDeleteElision() {
//
// When a deleteSizedNext is encountered, we skip ahead to see which keys, if
// any, are elided as a result of the tombstone.
func (i *Iter) deleteSizedNext() (*base.InternalKey, []byte) {
func (i *Iter) deleteSizedNext() (*base.InternalKey, base.LazyValue) {
i.saveKey()
i.skip = true

// The DELSIZED tombstone may have no value at all. This happens when the
// tombstone has already deleted the key that the user originally predicted.
// In this case, we still peek forward in case there's another DELSIZED key
// with a lower sequence number, in which case we'll adopt its value.
if len(i.iterValue) == 0 {
i.value = i.valueBuf[:0]
} else {
i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
i.value = i.valueBuf
}
// If the DELSIZED does have a value, it must be in-place.
i.valueBuf = append(i.valueBuf[:0], i.iterKV.V.InPlaceValue()...)
i.value = base.MakeInPlaceValue(i.valueBuf)

// Loop through all the keys within this stripe that are skippable.
i.pos = iterPosNext
Expand Down Expand Up @@ -1147,13 +1150,15 @@ func (i *Iter) deleteSizedNext() (*base.InternalKey, []byte) {
// We treat both cases the same functionally, adopting the identity
// of the lower-sequence numbered tombstone. However in the second
// case, we also increment the stat counting missized tombstones.
if len(i.value) > 0 {
if i.value.Len() > 0 {
// The original DELSIZED key was missized. The key that the user
// thought they were deleting does not exist.
i.stats.CountMissizedDels++
}
i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
i.value = i.valueBuf
// If the tombstone has a value, it must be in-place. To save it, we
// can just copy the in-place value directly.
i.valueBuf = append(i.valueBuf[:0], i.iterKV.InPlaceValue()...)
i.value = base.MakeInPlaceValue(i.valueBuf)
if i.iterKV.Kind() != base.InternalKeyKindDeleteSized {
// Convert the DELSIZED to a DEL—The DEL/SINGLEDEL we're eliding
// may not have deleted the key(s) it was intended to yet. The
Expand Down Expand Up @@ -1185,7 +1190,7 @@ func (i *Iter) deleteSizedNext() (*base.InternalKey, []byte) {
// case has already been elided. We don't count it as a missizing,
// instead converting the DELSIZED to a DEL. Skip the remainder of
// the snapshot stripe and return.
if len(i.value) == 0 {
if i.value.Len() == 0 {
i.key.SetKind(base.InternalKeyKindDelete)
// NB: We skipInStripe now, rather than returning leaving
// i.skip=true and returning early, because Next() requires
Expand All @@ -1198,13 +1203,15 @@ func (i *Iter) deleteSizedNext() (*base.InternalKey, []byte) {
return &i.key, i.value
}
// The deleted key is not a DEL, DELSIZED, and the DELSIZED in i.key
// has a positive size.
expectedSize, n := binary.Uvarint(i.value)
if n != len(i.value) {
i.err = base.CorruptionErrorf("DELSIZED holds invalid value: %x", errors.Safe(i.value))
return nil, nil
// has a positive size. Note that the tombstone's value must be
// in-place.
v := i.value.InPlaceValue()
expectedSize, n := binary.Uvarint(v)
if n != len(v) {
i.err = base.CorruptionErrorf("DELSIZED holds invalid value: %x", errors.Safe(v))
return nil, base.LazyValue{}
}
elidedSize := uint64(len(i.iterKV.K.UserKey)) + uint64(len(i.iterValue))
elidedSize := uint64(len(i.iterKV.K.UserKey)) + uint64(i.iterKV.V.Len())
if elidedSize != expectedSize {
// The original DELSIZED key was missized. It's unclear what to
// do. The user-provided size was wrong, so it's unlikely to be
Expand All @@ -1222,7 +1229,7 @@ func (i *Iter) deleteSizedNext() (*base.InternalKey, []byte) {
// are safer.
i.stats.CountMissizedDels++
i.key.SetKind(base.InternalKeyKindDelete)
i.value = i.valueBuf[:0]
i.value = base.LazyValue{}
// NB: We skipInStripe now, rather than returning leaving
// i.skip=true and returning early, because Next() requires
// that i.skip=true only if i.iterPos = iterPosCurForward.
Expand All @@ -1236,11 +1243,11 @@ func (i *Iter) deleteSizedNext() (*base.InternalKey, []byte) {
// NB: We remove the value regardless of whether the key was sized
// appropriately. The size encoded is 'consumed' the first time it
// meets a key that it deletes.
i.value = i.valueBuf[:0]
i.value = base.LazyValue{}

default:
i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKV.Kind()))
return nil, nil
return nil, base.LazyValue{}
}
}

Expand All @@ -1250,11 +1257,12 @@ func (i *Iter) deleteSizedNext() (*base.InternalKey, []byte) {
// We landed outside the original stripe. Reset skip.
i.skip = false
if i.err != nil {
return nil, nil
return nil, base.LazyValue{}
}
return &i.key, i.value
}

// saveKey saves the key in iterKV to i.key using i.keyBuf's memory.
func (i *Iter) saveKey() {
i.keyBuf = append(i.keyBuf[:0], i.iterKV.K.UserKey...)
i.key = base.InternalKey{
Expand All @@ -1264,6 +1272,28 @@ func (i *Iter) saveKey() {
i.keyTrailer = i.key.Trailer
}

// saveValue saves the value in iterKV to i.value. It must be called before
// stepping the iterator if the value needs to be retained. Unlike keys, values
// do not need to be copied in all code paths. For example, a SETWITHDEL key may
// be written to output sstables without needing to read ahead, copying the
// value directly from the existing input sstable block into the output block
// builder.
//
// If the value is in-place, this copies it into i.valueBuf. If the value is in
// a value block, it retrieves the value from the block (possibly storing the
// result into i.valueBuf).
func (i *Iter) saveValue() {
v, callerOwned, err := i.iterKV.V.Value(i.valueBuf[:0])
if err != nil {
i.err = err
i.value = base.LazyValue{}
} else if !callerOwned {
i.value = base.MakeInPlaceValue(append(i.valueBuf[:0], v...))
} else {
i.value = base.MakeInPlaceValue(v)
}
}

// Error returns any error encountered.
//
// Note that Close will return the error as well.
Expand Down Expand Up @@ -1366,11 +1396,12 @@ func (i *Iter) maybeZeroSeqnum(snapshotIdx int) {

func finishValueMerger(
valueMerger base.ValueMerger, includesBase bool,
) (value []byte, needDelete bool, closer io.Closer, err error) {
) (_ base.LazyValue, needDelete bool, closer io.Closer, err error) {
var value []byte
if valueMerger2, ok := valueMerger.(base.DeletableValueMerger); ok {
value, needDelete, closer, err = valueMerger2.DeletableFinish(includesBase)
} else {
value, closer, err = valueMerger.Finish(includesBase)
}
return
return base.MakeInPlaceValue(value), needDelete, closer, err
}
Loading

0 comments on commit 2c1fd7e

Please sign in to comment.