Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Datastore based pinner #4

Merged
merged 29 commits into from
Nov 30, 2020
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9fc54f6
start restructuring for move to datastore pinner
aschmahmann Sep 30, 2020
be03825
Store pins in datastore instead of in mdag
gammazero Oct 26, 2020
77c9a1d
Import and Export functions. Cid stored as bytes. Revise indexer inte…
gammazero Oct 27, 2020
5398bb2
add name index
gammazero Oct 27, 2020
922fab7
add benchmarks
gammazero Oct 27, 2020
fdf37b1
Use dirty flag to determine when to rebuild indexes
gammazero Oct 28, 2020
a2720f1
Fix benchmarks
gammazero Oct 28, 2020
37293b6
Do not keep pinned CID sets in memory (no-cache implementation)
gammazero Oct 29, 2020
0244724
Add comments and unit test
gammazero Oct 29, 2020
22a61da
Speed up pinning by avoining 2nd recursive check if no changes
gammazero Oct 29, 2020
1662bb8
correct log level
gammazero Oct 29, 2020
cad8378
improve import/export unit test
gammazero Oct 29, 2020
d1a44d7
Update returns error if from CID is not pinned, even when from and to…
gammazero Nov 16, 2020
9bb7a0c
test update of same pin
gammazero Nov 16, 2020
eb32271
Cleanup and better test coverage
gammazero Nov 17, 2020
22254b2
Change requested in review
gammazero Nov 19, 2020
d787682
Additional changes from review
gammazero Nov 19, 2020
0435ac4
Removed New in favor of only having LoadPinner
gammazero Nov 19, 2020
dde41e9
Indexer encodes index and key to allow arbitrary strings
gammazero Nov 19, 2020
0a5737f
Use int64 for dirty count and remove unused const
gammazero Nov 19, 2020
fb419e7
use base64 encoding
gammazero Nov 20, 2020
00764f0
Encode using multibase
gammazero Nov 20, 2020
a6d812c
Changes from review
gammazero Nov 24, 2020
86f36c2
Rename LoadPinner to New for both pinners
gammazero Nov 24, 2020
7a128c6
Check context when loading pinner and during iterative operations
gammazero Nov 24, 2020
cd2065d
indexer.New takes ds.Key
gammazero Nov 24, 2020
9c335cd
Change pin encoding. Add unit test
gammazero Nov 24, 2020
9fafc51
switch to atlas pin encoding
aschmahmann Nov 30, 2020
2586c60
removed type annotations from pin struct
aschmahmann Nov 30, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
*~
*.log

# Test binary, build with `go test -c`
*.test

# Output of the go coverage tool
*.out
207 changes: 207 additions & 0 deletions dsindex/indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
// Package dsindex provides secondary indexing functionality for a datastore.
package dsindex

import (
"path"

ds "github.com/ipfs/go-datastore"
query "github.com/ipfs/go-datastore/query"
)

// Indexer maintains a secondary index. Each value of the secondary index maps
// to one more primary keys.
type Indexer interface {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
// Add adds the a to the an index
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
Add(index, id string) error

// Delete deletes the specified key from the index. If the key is not in
// the datastore, this method returns no error.
Delete(index, id string) error

// DeleteAll deletes all keys in the given index. If a key is not in the
// datastore, this method returns no error.
DeleteAll(index string) (count int, err error)

// ForEach calls the function for each key in the specified index, until
// there are no more keys, or until the function returns false. If index
// is empty string, then all index names are iterated.
ForEach(index string, fn func(index, id string) bool) error

// HasKey determines the specified index contains the specified primary key
HasKey(index, id string) (bool, error)

// HasAny determines if any key is in the specified index. If index is
// empty string, then all indexes are searched.
HasAny(index string) (bool, error)

// Search returns all keys for the given index
Search(index string) (ids []string, err error)

// Synchronize the indexes in this Indexer to match those of the given
// Indexer. The indexPath prefix is not synchronized, only the index/key
// portion of the indexes.
SyncTo(reference Indexer) (changed bool, err error)
}

// indexer is a simple implementation of Indexer. This implementation relies
// on the underlying data store supporting efficent querying by prefix.
//
// TODO: Consider adding caching
type indexer struct {
dstore ds.Datastore
indexPath string
}

// New creates a new datastore index. All indexes are stored prefixed with the
// specified index path.
func New(dstore ds.Datastore, indexPath string) Indexer {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
return &indexer{
dstore: dstore,
indexPath: indexPath,
}
}

func (x *indexer) Add(index, id string) error {
key := ds.NewKey(path.Join(x.indexPath, index, id))
return x.dstore.Put(key, []byte{})
}

func (x *indexer) Delete(index, id string) error {
return x.dstore.Delete(ds.NewKey(path.Join(x.indexPath, index, id)))
}

func (x *indexer) DeleteAll(index string) (int, error) {
ents, err := x.queryPrefix(path.Join(x.indexPath, index))
if err != nil {
return 0, err
}

for i := range ents {
err = x.dstore.Delete(ds.NewKey(ents[i].Key))
if err != nil {
return 0, err
}
}

return len(ents), nil
}

func (x *indexer) ForEach(index string, fn func(idx, id string) bool) error {
q := query.Query{
Prefix: path.Join(x.indexPath, index),
KeysOnly: true,
}
results, err := x.dstore.Query(q)
if err != nil {
return err
}

for {
r, ok := results.NextSync()
if !ok {
break
}
if r.Error != nil {
err = r.Error
break
}

ent := r.Entry
if !fn(path.Base(path.Dir(ent.Key)), path.Base(ent.Key)) {
break
}
}
results.Close()

return err
}

func (x *indexer) HasKey(index, id string) (bool, error) {
return x.dstore.Has(ds.NewKey(path.Join(x.indexPath, index, id)))
}

func (x *indexer) HasAny(index string) (bool, error) {
var any bool
err := x.ForEach(index, func(idx, id string) bool {
any = true
return false
})
return any, err
}

func (x *indexer) Search(index string) ([]string, error) {
ents, err := x.queryPrefix(path.Join(x.indexPath, index))
if err != nil {
return nil, err
}
if len(ents) == 0 {
return nil, nil
}

ids := make([]string, len(ents))
for i := range ents {
ids[i] = path.Base(ents[i].Key)
}
return ids, nil
}

func (x *indexer) SyncTo(ref Indexer) (bool, error) {
// Build reference index map
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
refs := map[string]string{}
err := ref.ForEach("", func(idx, id string) bool {
refs[id] = idx
return true
})
if err != nil {
return false, err
}
if len(refs) == 0 {
return false, nil
}

// Compare current indexes
var delKeys []string
err = x.ForEach("", func(idx, id string) bool {
refIdx, ok := refs[id]
if ok && refIdx == idx {
// same in both; delete from refs, do not add to delKeys
delete(refs, id)
} else {
delKeys = append(delKeys, path.Join(x.indexPath, idx, id))
}
return true
})
if err != nil {
return false, err
}

// Items in delKeys are indexes that no longer exist
for i := range delKeys {
err = x.dstore.Delete(ds.NewKey(delKeys[i]))
if err != nil {
return false, err
}
}

// What remains in refs are indexes that need to be added
for k, v := range refs {
err = x.dstore.Put(ds.NewKey(path.Join(x.indexPath, v, k)), nil)
if err != nil {
return false, err
}
}

return len(refs) != 0 || len(delKeys) != 0, nil
}

func (x *indexer) queryPrefix(prefix string) ([]query.Entry, error) {
q := query.Query{
Prefix: prefix,
KeysOnly: true,
}
results, err := x.dstore.Query(q)
if err != nil {
return nil, err
}
return results.Rest()
}
Loading