Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/compact: defer retrieval of external values #4198

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions internal/base/lazy_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ func (lv *LazyValue) fetchValue(
return f.value, f.callerOwned, f.err
}

// IsInPlaceValue returns true iff the value was stored in-place and does not
// need to be fetched externally.
func (lv *LazyValue) IsInPlaceValue() bool {
return lv.Fetcher == nil
}

// InPlaceValue returns the value under the assumption that it is in-place.
// This is for Pebble-internal code.
func (lv *LazyValue) InPlaceValue() []byte {
Expand Down
2 changes: 1 addition & 1 deletion internal/base/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package base
import "io"

// Merge creates a ValueMerger for the specified key initialized with the value
// of one merge operand.
// of one merge operand. The caller retains ownership of key and value.
type Merge func(key, value []byte) (ValueMerger, error)

// ValueMerger receives merge operands one by one. The operand received is either
Expand Down
144 changes: 89 additions & 55 deletions internal/compact/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ type Iter struct {
// original trailer (eg, before any sequence-number zeroing or changes to
// key kind).
keyTrailer base.InternalKeyTrailer
value []byte
value base.LazyValue
valueCloser io.Closer
// Temporary buffer used for storing the previous user key in order to
// determine when iteration has advanced to a new user key and thus a new
Expand All @@ -189,7 +189,6 @@ type Iter struct {
// advanced.
valueBuf []byte
iterKV *base.InternalKV
iterValue []byte
iterStripeChange stripeChangeType
// skip indicates whether the remaining entries in the current snapshot
// stripe should be skipped or processed. `skip` has no effect when `pos ==
Expand Down Expand Up @@ -362,16 +361,12 @@ func (i *Iter) Stats() IterStats {
}

// First has the same semantics as InternalIterator.First.
func (i *Iter) First() (*base.InternalKey, []byte) {
func (i *Iter) First() (*base.InternalKey, base.LazyValue) {
if i.err != nil {
return nil, nil
return nil, base.LazyValue{}
}
i.iterKV = i.iter.First()
if i.iterKV != nil {
i.iterValue, _, i.err = i.iterKV.Value(nil)
if i.err != nil {
return nil, nil
}
i.curSnapshotIdx, i.curSnapshotSeqNum = i.cfg.Snapshots.IndexAndSeqNum(i.iterKV.SeqNum())
}
i.pos = iterPosNext
Expand All @@ -382,14 +377,14 @@ func (i *Iter) First() (*base.InternalKey, []byte) {
// Next has the same semantics as InternalIterator.Next. Note that when Next
// returns a RANGEDEL or a range key, the caller can use Span() to get the
// corresponding span.
func (i *Iter) Next() (*base.InternalKey, []byte) {
func (i *Iter) Next() (*base.InternalKey, base.LazyValue) {
if i.err != nil {
return nil, nil
return nil, base.LazyValue{}
}

// Close the closer for the current value if one was open.
if i.closeValueCloser() != nil {
return nil, nil
return nil, base.LazyValue{}
}

// Prior to this call to `Next()` we are in one of three situations with
Expand Down Expand Up @@ -477,12 +472,15 @@ func (i *Iter) Next() (*base.InternalKey, []byte) {
// This goes against the comment on i.key in the struct, and
// therefore warrants some investigation.
i.saveKey()
i.value = i.iterKV.V
if invariants.Enabled && !i.value.IsInPlaceValue() {
panic(errors.AssertionFailedf("pebble: span key's value is not in-place"))
}
// TODO(jackson): Handle tracking pinned statistics for range keys
// and range deletions. This would require updating
// emitRangeDelChunk and rangeKeyCompactionTransform to update
// statistics when they apply their own snapshot striping logic.
i.snapshotPinned = false
i.value = i.iterValue
return &i.key, i.value
}

Expand Down Expand Up @@ -545,7 +543,7 @@ func (i *Iter) Next() (*base.InternalKey, []byte) {
switch i.iterKV.Kind() {
case base.InternalKeyKindDelete:
i.saveKey()
i.value = i.iterValue
i.value = base.LazyValue{} // DELs are value-less.
i.skip = true
return &i.key, i.value

Expand All @@ -559,7 +557,7 @@ func (i *Iter) Next() (*base.InternalKey, []byte) {
if i.singleDeleteNext() {
return &i.key, i.value
} else if i.err != nil {
return nil, nil
return nil, base.LazyValue{} // SINGLEDELs are value-less.
}
continue

Expand All @@ -576,7 +574,7 @@ func (i *Iter) Next() (*base.InternalKey, []byte) {
// kind.
i.setNext()
if i.err != nil {
return nil, nil
return nil, base.LazyValue{}
}
return &i.key, i.value

Expand All @@ -585,7 +583,8 @@ func (i *Iter) Next() (*base.InternalKey, []byte) {
// advances the iterator, adjusting curSnapshotIdx.
origSnapshotIdx := i.curSnapshotIdx
var valueMerger base.ValueMerger
valueMerger, i.err = i.cfg.Merge(i.iterKV.K.UserKey, i.iterValue)
// MERGE values are always stored in-place.
valueMerger, i.err = i.cfg.Merge(i.iterKV.K.UserKey, i.iterKV.V.InPlaceValue())
if i.err == nil {
i.mergeNext(valueMerger)
}
Expand All @@ -607,7 +606,7 @@ func (i *Iter) Next() (*base.InternalKey, []byte) {
if i.err == nil {
if needDelete {
if i.closeValueCloser() != nil {
return nil, nil
return nil, base.LazyValue{}
}
continue
}
Expand All @@ -620,15 +619,15 @@ func (i *Iter) Next() (*base.InternalKey, []byte) {
// MERGE?
i.err = base.MarkCorruptionError(i.err)
}
return nil, nil
return nil, base.LazyValue{}

default:
i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKV.Kind()))
return nil, nil
return nil, base.LazyValue{}
}
}

return nil, nil
return nil, base.LazyValue{}
}

// Span returns the range deletion or range key span corresponding to the
Expand Down Expand Up @@ -666,11 +665,8 @@ func (i *Iter) skipInStripe() {

func (i *Iter) iterNext() bool {
i.iterKV = i.iter.Next()
if i.iterKV != nil {
i.iterValue, _, i.err = i.iterKV.Value(nil)
if i.err != nil {
i.iterKV = nil
}
if i.iterKV == nil {
i.err = i.iter.Error()
}
return i.iterKV != nil
}
Expand Down Expand Up @@ -779,7 +775,7 @@ func (i *Iter) nextInStripeHelper() stripeChangeType {
func (i *Iter) setNext() {
// Save the current key.
i.saveKey()
i.value = i.iterValue
i.value = i.iterKV.V
i.maybeZeroSeqnum(i.curSnapshotIdx)

// If this key is already a SETWITHDEL we can early return and skip the remaining
Expand All @@ -789,9 +785,8 @@ func (i *Iter) setNext() {
return
}

// We are iterating forward. Save the current value.
i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
i.value = i.valueBuf
// We need to iterate forward. Save the current value so we don't lose it.
i.saveValue()

// Else, we continue to loop through entries in the stripe looking for a
// DEL. Note that we may stop *before* encountering a DEL, if one exists.
Expand Down Expand Up @@ -874,7 +869,16 @@ func (i *Iter) mergeNext(valueMerger base.ValueMerger) {
// value and return. We change the kind of the resulting key to a
// Set so that it shadows keys in lower levels. That is:
// MERGE + (SET*) -> SET.
i.err = valueMerger.MergeOlder(i.iterValue)
var v []byte
var callerOwned bool
v, callerOwned, i.err = i.iterKV.Value(i.valueBuf[:0])
if i.err != nil {
return
}
if callerOwned && cap(v) > cap(i.valueBuf) {
i.valueBuf = v
}
i.err = valueMerger.MergeOlder(v)
if i.err != nil {
return
}
Expand All @@ -885,7 +889,9 @@ func (i *Iter) mergeNext(valueMerger base.ValueMerger) {
case base.InternalKeyKindMerge:
// We've hit another Merge value. Merge with the existing value and
// continue looping.
i.err = valueMerger.MergeOlder(i.iterValue)
//
// MERGE values are always stored in-place.
i.err = valueMerger.MergeOlder(i.iterKV.InPlaceValue())
if i.err != nil {
return
}
Expand All @@ -910,7 +916,10 @@ func (i *Iter) mergeNext(valueMerger base.ValueMerger) {
func (i *Iter) singleDeleteNext() bool {
// Save the current key.
i.saveKey()
i.value = i.iterValue
if invariants.Enabled && (!i.iterKV.V.IsInPlaceValue() || i.iterKV.V.Len() != 0) {
panic(errors.AssertionFailedf("pebble: single delete value is not in-place or is non-empty"))
}
i.value = base.LazyValue{} // SINGLEDELs are value-less.

// Loop until finds a key to be passed to the next level.
for {
Expand Down Expand Up @@ -1097,20 +1106,17 @@ func (i *Iter) skipDueToSingleDeleteElision() {
//
// When a deleteSizedNext is encountered, we skip ahead to see which keys, if
// any, are elided as a result of the tombstone.
func (i *Iter) deleteSizedNext() (*base.InternalKey, []byte) {
func (i *Iter) deleteSizedNext() (*base.InternalKey, base.LazyValue) {
i.saveKey()
i.skip = true

// The DELSIZED tombstone may have no value at all. This happens when the
// tombstone has already deleted the key that the user originally predicted.
// In this case, we still peek forward in case there's another DELSIZED key
// with a lower sequence number, in which case we'll adopt its value.
if len(i.iterValue) == 0 {
i.value = i.valueBuf[:0]
} else {
i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
i.value = i.valueBuf
}
// If the DELSIZED does have a value, it must be in-place.
i.valueBuf = append(i.valueBuf[:0], i.iterKV.V.InPlaceValue()...)
i.value = base.MakeInPlaceValue(i.valueBuf)

// Loop through all the keys within this stripe that are skippable.
i.pos = iterPosNext
Expand Down Expand Up @@ -1147,13 +1153,15 @@ func (i *Iter) deleteSizedNext() (*base.InternalKey, []byte) {
// We treat both cases the same functionally, adopting the identity
// of the lower-sequence numbered tombstone. However in the second
// case, we also increment the stat counting missized tombstones.
if len(i.value) > 0 {
if i.value.Len() > 0 {
// The original DELSIZED key was missized. The key that the user
// thought they were deleting does not exist.
i.stats.CountMissizedDels++
}
i.valueBuf = append(i.valueBuf[:0], i.iterValue...)
i.value = i.valueBuf
// If the tombstone has a value, it must be in-place. To save it, we
// can just copy the in-place value directly.
i.valueBuf = append(i.valueBuf[:0], i.iterKV.InPlaceValue()...)
i.value = base.MakeInPlaceValue(i.valueBuf)
if i.iterKV.Kind() != base.InternalKeyKindDeleteSized {
// Convert the DELSIZED to a DEL—The DEL/SINGLEDEL we're eliding
// may not have deleted the key(s) it was intended to yet. The
Expand Down Expand Up @@ -1185,7 +1193,7 @@ func (i *Iter) deleteSizedNext() (*base.InternalKey, []byte) {
// case has already been elided. We don't count it as a missizing,
// instead converting the DELSIZED to a DEL. Skip the remainder of
// the snapshot stripe and return.
if len(i.value) == 0 {
if i.value.Len() == 0 {
i.key.SetKind(base.InternalKeyKindDelete)
// NB: We skipInStripe now, rather than returning leaving
// i.skip=true and returning early, because Next() requires
Expand All @@ -1198,13 +1206,15 @@ func (i *Iter) deleteSizedNext() (*base.InternalKey, []byte) {
return &i.key, i.value
}
// The deleted key is not a DEL, DELSIZED, and the DELSIZED in i.key
// has a positive size.
expectedSize, n := binary.Uvarint(i.value)
if n != len(i.value) {
i.err = base.CorruptionErrorf("DELSIZED holds invalid value: %x", errors.Safe(i.value))
return nil, nil
// has a positive size. Note that the tombstone's value must be
// in-place.
v := i.value.InPlaceValue()
expectedSize, n := binary.Uvarint(v)
if n != len(v) {
i.err = base.CorruptionErrorf("DELSIZED holds invalid value: %x", errors.Safe(v))
return nil, base.LazyValue{}
}
elidedSize := uint64(len(i.iterKV.K.UserKey)) + uint64(len(i.iterValue))
elidedSize := uint64(len(i.iterKV.K.UserKey)) + uint64(i.iterKV.V.Len())
if elidedSize != expectedSize {
// The original DELSIZED key was missized. It's unclear what to
// do. The user-provided size was wrong, so it's unlikely to be
Expand All @@ -1222,7 +1232,7 @@ func (i *Iter) deleteSizedNext() (*base.InternalKey, []byte) {
// are safer.
i.stats.CountMissizedDels++
i.key.SetKind(base.InternalKeyKindDelete)
i.value = i.valueBuf[:0]
i.value = base.LazyValue{}
// NB: We skipInStripe now, rather than returning leaving
// i.skip=true and returning early, because Next() requires
// that i.skip=true only if i.iterPos = iterPosCurForward.
Expand All @@ -1236,11 +1246,11 @@ func (i *Iter) deleteSizedNext() (*base.InternalKey, []byte) {
// NB: We remove the value regardless of whether the key was sized
// appropriately. The size encoded is 'consumed' the first time it
// meets a key that it deletes.
i.value = i.valueBuf[:0]
i.value = base.LazyValue{}

default:
i.err = base.CorruptionErrorf("invalid internal key kind: %d", errors.Safe(i.iterKV.Kind()))
return nil, nil
return nil, base.LazyValue{}
}
}

Expand All @@ -1250,11 +1260,12 @@ func (i *Iter) deleteSizedNext() (*base.InternalKey, []byte) {
// We landed outside the original stripe. Reset skip.
i.skip = false
if i.err != nil {
return nil, nil
return nil, base.LazyValue{}
}
return &i.key, i.value
}

// saveKey saves the key in iterKV to i.key using i.keyBuf's memory.
func (i *Iter) saveKey() {
i.keyBuf = append(i.keyBuf[:0], i.iterKV.K.UserKey...)
i.key = base.InternalKey{
Expand All @@ -1264,6 +1275,28 @@ func (i *Iter) saveKey() {
i.keyTrailer = i.key.Trailer
}

// saveValue saves the value in iterKV to i.value. It must be called before
// stepping the iterator if the value needs to be retained. Unlike keys, values
// do not need to be copied in all code paths. For example, a SETWITHDEL key may
// be written to output sstables without needing to read ahead, copying the
// value directly from the existing input sstable block into the output block
// builder.
//
// If the value is in-place, this copies it into i.valueBuf. If the value is in
// a value block, it retrieves the value from the block (possibly storing the
// result into i.valueBuf).
func (i *Iter) saveValue() {
v, callerOwned, err := i.iterKV.V.Value(i.valueBuf[:0])
if err != nil {
i.err = err
i.value = base.LazyValue{}
} else if !callerOwned {
i.value = base.MakeInPlaceValue(append(i.valueBuf[:0], v...))
} else {
i.value = base.MakeInPlaceValue(v)
}
}

// Error returns any error encountered.
//
// Note that Close will return the error as well.
Expand Down Expand Up @@ -1366,11 +1399,12 @@ func (i *Iter) maybeZeroSeqnum(snapshotIdx int) {

func finishValueMerger(
valueMerger base.ValueMerger, includesBase bool,
) (value []byte, needDelete bool, closer io.Closer, err error) {
) (_ base.LazyValue, needDelete bool, closer io.Closer, err error) {
var value []byte
if valueMerger2, ok := valueMerger.(base.DeletableValueMerger); ok {
value, needDelete, closer, err = valueMerger2.DeletableFinish(includesBase)
} else {
value, closer, err = valueMerger.Finish(includesBase)
}
return
return base.MakeInPlaceValue(value), needDelete, closer, err
}
Loading
Loading