Skip to content

Commit

Permalink
iterator: use heap to accelerate mergedIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
qianbin authored and syndtr committed Jun 13, 2022
1 parent 9ab5d34 commit 678c1e8
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 26 deletions.
98 changes: 72 additions & 26 deletions leveldb/iterator/merged_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package iterator

import (
"container/heap"

"github.com/syndtr/goleveldb/leveldb/comparer"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/util"
Expand All @@ -33,6 +35,9 @@ type mergedIterator struct {
err error
errf func(err error)
releaser util.Releaser

indexes []int // the heap of iterator indexes
reverse bool // if true, indexes is a max-heap
}

func assertKey(key []byte) []byte {
Expand Down Expand Up @@ -67,16 +72,20 @@ func (i *mergedIterator) First() bool {
return false
}

h := i.indexHeap()
h.Reset(false)
for x, iter := range i.iters {
switch {
case iter.First():
i.keys[x] = assertKey(iter.Key())
h.Push(x)
case i.iterErr(iter):
return false
default:
i.keys[x] = nil
}
}
heap.Init(h)
i.dir = dirSOI
return i.next()
}
Expand All @@ -89,16 +98,20 @@ func (i *mergedIterator) Last() bool {
return false
}

h := i.indexHeap()
h.Reset(true)
for x, iter := range i.iters {
switch {
case iter.Last():
i.keys[x] = assertKey(iter.Key())
h.Push(x)
case i.iterErr(iter):
return false
default:
i.keys[x] = nil
}
}
heap.Init(h)
i.dir = dirEOI
return i.prev()
}
Expand All @@ -111,35 +124,31 @@ func (i *mergedIterator) Seek(key []byte) bool {
return false
}

h := i.indexHeap()
h.Reset(false)
for x, iter := range i.iters {
switch {
case iter.Seek(key):
i.keys[x] = assertKey(iter.Key())
h.Push(x)
case i.iterErr(iter):
return false
default:
i.keys[x] = nil
}
}
heap.Init(h)
i.dir = dirSOI
return i.next()
}

func (i *mergedIterator) next() bool {
var key []byte
if i.dir == dirForward {
key = i.keys[i.index]
}
for x, tkey := range i.keys {
if tkey != nil && (key == nil || i.cmp.Compare(tkey, key) < 0) {
key = tkey
i.index = x
}
}
if key == nil {
h := i.indexHeap()
if h.Len() == 0 {
i.dir = dirEOI
return false
}
i.index = heap.Pop(h).(int)
i.dir = dirForward
return true
}
Expand Down Expand Up @@ -168,6 +177,7 @@ func (i *mergedIterator) Next() bool {
switch {
case iter.Next():
i.keys[x] = assertKey(iter.Key())
heap.Push(i.indexHeap(), x)
case i.iterErr(iter):
return false
default:
Expand All @@ -177,20 +187,12 @@ func (i *mergedIterator) Next() bool {
}

func (i *mergedIterator) prev() bool {
var key []byte
if i.dir == dirBackward {
key = i.keys[i.index]
}
for x, tkey := range i.keys {
if tkey != nil && (key == nil || i.cmp.Compare(tkey, key) > 0) {
key = tkey
i.index = x
}
}
if key == nil {
h := i.indexHeap()
if h.Len() == 0 {
i.dir = dirSOI
return false
}
i.index = heap.Pop(h).(int)
i.dir = dirBackward
return true
}
Expand All @@ -208,6 +210,8 @@ func (i *mergedIterator) Prev() bool {
return i.Last()
case dirForward:
key := append([]byte(nil), i.keys[i.index]...)
h := i.indexHeap()
h.Reset(true)
for x, iter := range i.iters {
if x == i.index {
continue
Expand All @@ -216,19 +220,22 @@ func (i *mergedIterator) Prev() bool {
switch {
case seek && iter.Prev(), !seek && iter.Last():
i.keys[x] = assertKey(iter.Key())
h.Push(x)
case i.iterErr(iter):
return false
default:
i.keys[x] = nil
}
}
heap.Init(h)
}

x := i.index
iter := i.iters[x]
switch {
case iter.Prev():
i.keys[x] = assertKey(iter.Key())
heap.Push(i.indexHeap(), x)
case i.iterErr(iter):
return false
default:
Expand Down Expand Up @@ -259,6 +266,7 @@ func (i *mergedIterator) Release() {
}
i.iters = nil
i.keys = nil
i.indexes = nil
if i.releaser != nil {
i.releaser.Release()
i.releaser = nil
Expand All @@ -284,6 +292,10 @@ func (i *mergedIterator) SetErrorCallback(f func(err error)) {
i.errf = f
}

func (i *mergedIterator) indexHeap() *indexHeap {
return (*indexHeap)(i)
}

// NewMergedIterator returns an iterator that merges its input. Walking the
// resultant iterator will return all key/value pairs of all input iterators
// in strictly increasing key order, as defined by cmp.
Expand All @@ -296,9 +308,43 @@ func (i *mergedIterator) SetErrorCallback(f func(err error)) {
// continue to the next 'input iterator'.
func NewMergedIterator(iters []Iterator, cmp comparer.Comparer, strict bool) Iterator {
return &mergedIterator{
iters: iters,
cmp: cmp,
strict: strict,
keys: make([][]byte, len(iters)),
iters: iters,
cmp: cmp,
strict: strict,
keys: make([][]byte, len(iters)),
indexes: make([]int, 0, len(iters)),
}
}

// indexHeap implements heap.Interface.
type indexHeap mergedIterator

func (h *indexHeap) Len() int { return len(h.indexes) }
func (h *indexHeap) Less(i, j int) bool {
i, j = h.indexes[i], h.indexes[j]
r := h.cmp.Compare(h.keys[i], h.keys[j])
if h.reverse {
return r > 0
}
return r < 0
}

func (h *indexHeap) Swap(i, j int) {
h.indexes[i], h.indexes[j] = h.indexes[j], h.indexes[i]
}

func (h *indexHeap) Push(value interface{}) {
h.indexes = append(h.indexes, value.(int))
}

func (h *indexHeap) Pop() interface{} {
e := len(h.indexes) - 1
popped := h.indexes[e]
h.indexes = h.indexes[:e]
return popped
}

func (h *indexHeap) Reset(reverse bool) {
h.reverse = reverse
h.indexes = h.indexes[:0]
}
21 changes: 21 additions & 0 deletions leveldb/iterator/merged_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package iterator_test

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

Expand Down Expand Up @@ -58,3 +60,22 @@ var _ = testutil.Defer(func() {
Describe("with one filled, two empty iterators", Test(1, 2))
})
})

func BenchmarkMergedIterator(b *testing.B) {
n := 11
iters := make([]Iterator, n)
for i := range iters {
kv := testutil.KeyValue_Generate(nil, 100, 1, 1, 10, 4, 4)
iters[i] = NewArrayIterator(kv)
}

mi := NewMergedIterator(iters, comparer.DefaultComparer, true)
b.ResetTimer()

for i := 0; i < b.N; i++ {
mi.First()
for mi.Next() {
mi.Key()
}
}
}

0 comments on commit 678c1e8

Please sign in to comment.