Skip to content

Commit

Permalink
clientv3: fix sync base
Browse files Browse the repository at this point in the history
It is not correct to use WithPrefix. Range end will change in every
batch.
  • Loading branch information
westhood committed Jul 6, 2016
1 parent 234c30c commit ebd6a66
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 1 deletion.
19 changes: 18 additions & 1 deletion clientv3/mirror/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ type syncer struct {
prefix string
}

var noPrefixEnd = string([]byte{0})

func prefixEnd(key string) string {
end := make([]byte, len(key))
copy(end, key)
for i := len(end) - 1; i >= 0; i-- {
if end[i] < 0xff {
end[i] = end[i] + 1
end = end[:i+1]
return string(end)
}
}
// next prefix does not exist (e.g., 0xffff);
// default to WithFromKey policy
return noPrefixEnd
}

func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error) {
respchan := make(chan clientv3.GetResponse, 1024)
errchan := make(chan error, 1)
Expand Down Expand Up @@ -78,7 +95,7 @@ func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, cha
// If len(s.prefix) != 0, we will sync key-value space with given prefix.
// We then range from the prefix to the next prefix if exists. Or we will
// range from the prefix to the end if the next prefix does not exists.
opts = append(opts, clientv3.WithPrefix())
opts = append(opts, clientv3.WithRange(prefixEnd(s.prefix)))
key = s.prefix
}

Expand Down
40 changes: 40 additions & 0 deletions clientv3/mirror/syncer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package mirror

import (
"fmt"
"log"
"testing"

"github.com/coreos/etcd/integration"
"golang.org/x/net/context"
)

func TestSyncBase(t *testing.T) {
cluster := integration.NewClusterV3(nil, &integration.ClusterConfig{Size: 1, UseGRPC: true})
defer cluster.Terminate(nil)

cli := cluster.Client(0)
ctx := context.TODO()

for i := 0; i < 2000; i++ {
if _, err := cli.Put(ctx, fmt.Sprint("test%d", i), "test"); err != nil {
log.Panic(err)
}
}

syncer := NewSyncer(cli, "test", 0)
respCh, _ := syncer.SyncBase(ctx)

count := 0

for resp := range respCh {
count = count + len(resp.Kvs)
if !resp.More {
break
}
}

if count != 2000 {
t.Fatalf("unexpected kv count: %d", count)
}
}

0 comments on commit ebd6a66

Please sign in to comment.