Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DEMO: don't merge] Trial kvstore interface cleanup #836

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 21 additions & 19 deletions store.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package weave

import (
"context"
)

//////////////////////////////////////////////////////////
// Defines all public interfaces for interacting with stores
//
Expand All @@ -8,27 +12,25 @@ package weave
// ReadOnlyKVStore is a simple interface to query data.
type ReadOnlyKVStore interface {
// Get returns nil iff key doesn't exist. Panics on nil key.
Get(key []byte) ([]byte, error)
Get(ctx context.Context, key []byte) ([]byte, error)

// Has checks if a key exists. Panics on nil key.
Has(key []byte) (bool, error)
Has(ctx context.Context, key []byte) (bool, error)

// Iterator over a domain of keys in ascending order. End is exclusive.
// Iterator over a domain of keys. End is exclusive.
// Start must be less than end, or the Iterator is invalid.
//
// If reverse is false, read in ascending order. Else read descending.
// You must cancel the context to close the Iterator without consuming it all.
// CONTRACT: No writes may happen within a domain while an iterator exists over it.
Iterator(start, end []byte) (Iterator, error)

// ReverseIterator over a domain of keys in descending order. End is exclusive.
// Start must be greater than end, or the Iterator is invalid.
// CONTRACT: No writes may happen within a domain while an iterator exists over it.
ReverseIterator(start, end []byte) (Iterator, error)
Iterator(ctx context.Context, start, end []byte, reverse bool) (Iterator, error)
}

// SetDeleter is a minimal interface for writing,
// Unifying KVStore and Batch
type SetDeleter interface {
Set(key, value []byte) error // CONTRACT: key, value readonly []byte
Delete(key []byte) error // CONTRACT: key readonly []byte
Set(ctx context.Context, key, value []byte) error // CONTRACT: key, value readonly []byte
Delete(ctx context.Context, key []byte) error // CONTRACT: key readonly []byte
}

// KVStore is a simple interface to get/set data
Expand All @@ -46,17 +48,20 @@ type KVStore interface {
// Batch can write multiple ops atomically to an underlying KVStore
type Batch interface {
SetDeleter
Write() error
Write(ctx context.Context) error
}

/*
Iterator allows us to access a set of items within a range of
keys. These may all be preloaded, or loaded on demand.
Cancel the context passed in the constructor to terminate
the iterator.

Usage:

var itr Iterator = ...
defer itr.Release()
ctx, cancel := context.WithCancel(context.Background())
var itr Iterator = kv.Iterate(start, end, false)
defer cancel()

k, v, err := itr.Next()
for err == nil {
Expand All @@ -74,9 +79,6 @@ type Iterator interface {
//
// Returns (nil, nil, errors.ErrIteratorDone) if there is no more data
Next() (key, value []byte, err error)

// Release releases the Iterator, allowing it to do any needed cleanup.
Release()
}

///////////////////////////////////////////////////////////
Expand Down Expand Up @@ -109,7 +111,7 @@ type KVCacheWrap interface {
CacheableKVStore

// Write syncs with the underlying store.
Write() error
Write(ctx context.Context) error

// Discard invalidates this CacheWrap and releases all data
Discard()
Expand All @@ -130,7 +132,7 @@ type KVCacheWrap interface {
type CommitKVStore interface {
// Get returns the value at last committed state
// returns nil iff key doesn't exist. Panics on nil key.
Get(key []byte) ([]byte, error)
Get(ctx context.Context, key []byte) ([]byte, error)

// TODO: Get with proof, also historical queries
// GetVersionedWithProof(key []byte, version int64) (value []byte)
Expand Down
40 changes: 16 additions & 24 deletions store/btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package store

import (
"bytes"
"context"

"github.com/google/btree"
"github.com/iov-one/weave/errors"
Expand Down Expand Up @@ -99,8 +100,8 @@ func (b BTreeCacheWrap) NewBatch() Batch {

// Write syncs with the underlying store.
// And then cleans up
func (b BTreeCacheWrap) Write() error {
err := b.batch.Write()
func (b BTreeCacheWrap) Write(ctx context.Context) error {
err := b.batch.Write(ctx)
b.Discard()
return err
}
Expand All @@ -117,21 +118,21 @@ func (b BTreeCacheWrap) Discard() {
}

// Set writes to the BTree and to the batch
func (b BTreeCacheWrap) Set(key, value []byte) error {
func (b BTreeCacheWrap) Set(ctx context.Context, key, value []byte) error {
b.bt.ReplaceOrInsert(newSetItem(key, value))

return b.batch.Set(key, value)
return b.batch.Set(ctx, key, value)
}

// Delete deletes from the BTree and to the batch
func (b BTreeCacheWrap) Delete(key []byte) error {
func (b BTreeCacheWrap) Delete(ctx context.Context, key []byte) error {
b.bt.ReplaceOrInsert(newDeletedItem(key))

return b.batch.Delete(key)
return b.batch.Delete(ctx, key)
}

// Get reads from btree if there, else backing store
func (b BTreeCacheWrap) Get(key []byte) ([]byte, error) {
func (b BTreeCacheWrap) Get(ctx context.Context, key []byte) ([]byte, error) {
res := b.bt.Get(bkey{key})
if res != nil {
switch t := res.(type) {
Expand All @@ -143,11 +144,11 @@ func (b BTreeCacheWrap) Get(key []byte) ([]byte, error) {
return nil, errors.Wrapf(errors.ErrDatabase, "Unknown item in btree: %#v", res)
}
}
return b.back.Get(key)
return b.back.Get(ctx, key)
}

// Has reads from btree if there, else backing store
func (b BTreeCacheWrap) Has(key []byte) (bool, error) {
func (b BTreeCacheWrap) Has(ctx context.Context, key []byte) (bool, error) {
res := b.bt.Get(bkey{key})
if res != nil {
switch res.(type) {
Expand All @@ -159,31 +160,22 @@ func (b BTreeCacheWrap) Has(key []byte) (bool, error) {
return false, errors.Wrapf(errors.ErrDatabase, "Unknown item in btree: %#v", res)
}
}
return b.back.Has(key)
return b.back.Has(ctx, key)
}

// Iterator over a domain of keys in ascending order.
// Combines results from btree and backing store
func (b BTreeCacheWrap) Iterator(start, end []byte) (Iterator, error) {
func (b BTreeCacheWrap) Iterator(ctx context.Context, start, end []byte, reverse bool) (Iterator, error) {
// take the backing iterator for start
parentIter, err := b.back.Iterator(start, end)
parentIter, err := b.back.Iterator(ctx, start, end, reverse)
if err != nil {
return nil, err
}
iter := ascendBtree(b.bt, start, end).wrap(parentIter)
return iter, nil
}
if reverse {
return descendBtree(ctx, b.bt, start, end).wrap(parentIter), nil

// ReverseIterator over a domain of keys in descending order.
// Combines results from btree and backing store
func (b BTreeCacheWrap) ReverseIterator(start, end []byte) (Iterator, error) {
// take the backing iterator for start
parentIter, err := b.back.ReverseIterator(start, end)
if err != nil {
return nil, err
}
iter := descendBtree(b.bt, start, end).wrap(parentIter)
return iter, nil
return ascendBtree(ctx, b.bt, start, end).wrap(parentIter), nil
}

/////////////////////////////////////////////////////////
Expand Down
52 changes: 27 additions & 25 deletions store/helpers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package store

import "github.com/iov-one/weave/errors"
import (
"context"

"github.com/iov-one/weave/errors"
)

////////////////////////////////////////////////
// Slice -> Iterator
Expand All @@ -9,20 +13,28 @@ import "github.com/iov-one/weave/errors"
//
// TODO: make this private and only expose Iterator interface????
type SliceIterator struct {
ctx context.Context
data []Model
idx int
}

var _ Iterator = (*SliceIterator)(nil)

// NewSliceIterator creates a new Iterator over this slice
func NewSliceIterator(data []Model) *SliceIterator {
func NewSliceIterator(ctx context.Context, data []Model) *SliceIterator {
return &SliceIterator{
data: data,
ctx: ctx,
}
}

func (s *SliceIterator) Next() (key, value []byte, err error) {
select {
case <-s.ctx.Done():
s.data = nil
default:
}

if s.idx >= len(s.data) {
return nil, nil, errors.Wrap(errors.ErrIteratorDone, "slice iterator")
}
Expand All @@ -31,11 +43,6 @@ func (s *SliceIterator) Next() (key, value []byte, err error) {
return val.Key, val.Value, nil
}

// Release releases the Iterator.
func (s *SliceIterator) Release() {
s.data = nil
}

/////////////////////////////////////////////////////
// Empty KVStore

Expand All @@ -45,25 +52,20 @@ type EmptyKVStore struct{}
var _ KVStore = EmptyKVStore{}

// Get always returns nil
func (e EmptyKVStore) Get(key []byte) ([]byte, error) { return nil, nil }
func (e EmptyKVStore) Get(ctx context.Context, key []byte) ([]byte, error) { return nil, nil }

// Has always returns false
func (e EmptyKVStore) Has(key []byte) (bool, error) { return false, nil }
func (e EmptyKVStore) Has(ctx context.Context, key []byte) (bool, error) { return false, nil }

// Set is a noop
func (e EmptyKVStore) Set(key, value []byte) error { return nil }
func (e EmptyKVStore) Set(ctx context.Context, key, value []byte) error { return nil }

// Delete is a noop
func (e EmptyKVStore) Delete(key []byte) error { return nil }
func (e EmptyKVStore) Delete(ctx context.Context, key []byte) error { return nil }

// Iterator is always empty
func (e EmptyKVStore) Iterator(start, end []byte) (Iterator, error) {
return NewSliceIterator(nil), nil
}

// ReverseIterator is always empty
func (e EmptyKVStore) ReverseIterator(start, end []byte) (Iterator, error) {
return NewSliceIterator(nil), nil
func (e EmptyKVStore) Iterator(ctx context.Context, start, end []byte, reverse bool) (Iterator, error) {
return NewSliceIterator(context.Background(), nil), nil
}

// NewBatch returns a batch that can write to this tree later
Expand All @@ -89,12 +91,12 @@ type Op struct {
}

// Apply performs the stored operation on a writable store
func (o Op) Apply(out SetDeleter) error {
func (o Op) Apply(ctx context.Context, out SetDeleter) error {
switch o.kind {
case setKind:
return out.Set(o.key, o.value)
return out.Set(ctx, o.key, o.value)
case delKind:
return out.Delete(o.key)
return out.Delete(ctx, o.key)
default:
return errors.Wrapf(errors.ErrDatabase, "Unknown kind: %d", o.kind)
}
Expand Down Expand Up @@ -146,7 +148,7 @@ func NewNonAtomicBatch(out SetDeleter) *NonAtomicBatch {
}

// Set adds a set operation to the batch
func (b *NonAtomicBatch) Set(key, value []byte) error {
func (b *NonAtomicBatch) Set(ctx context.Context, key, value []byte) error {
set := Op{
kind: setKind,
key: key,
Expand All @@ -157,7 +159,7 @@ func (b *NonAtomicBatch) Set(key, value []byte) error {
}

// Delete adds a delete operation to the batch
func (b *NonAtomicBatch) Delete(key []byte) error {
func (b *NonAtomicBatch) Delete(ctx context.Context, key []byte) error {
del := Op{
kind: delKind,
key: key,
Expand All @@ -167,9 +169,9 @@ func (b *NonAtomicBatch) Delete(key []byte) error {
}

// Write writes all the ops to the underlying store and resets
func (b *NonAtomicBatch) Write() error {
func (b *NonAtomicBatch) Write(ctx context.Context) error {
for _, Op := range b.ops {
err := Op.Apply(b.out)
err := Op.Apply(ctx, b.out)
if err != nil {
return err
}
Expand Down
8 changes: 5 additions & 3 deletions store/helpers_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package store

import (
"context"
"testing"

"github.com/iov-one/weave/errors"
Expand All @@ -20,7 +21,7 @@ func TestSliceIterator(t *testing.T) {
models[i].Value = vs[i]
}
// make sure proper iteration works
iter, i := NewSliceIterator(models), 0
iter, i := NewSliceIterator(context.Background(), models), 0
key, value, err := iter.Next()
for err == nil {
assert.Equal(t, ks[i], key)
Expand All @@ -33,10 +34,11 @@ func TestSliceIterator(t *testing.T) {
t.Fatalf("Expected ErrIteratorDone, got %+v", err)
}

it := NewSliceIterator(models)
ctx, cancel := context.WithCancel(context.Background())
it := NewSliceIterator(ctx, models)
_, _, err = it.Next()
assert.Nil(t, err)
it.Release()
cancel()
_, _, err = it.Next()
if !errors.ErrIteratorDone.Is(err) {
t.Fatal("closed iterator must be invalid")
Expand Down
Loading