Skip to content

Commit

Permalink
Merge pull request #9 from embano1/issue-7
Browse files Browse the repository at this point in the history
Remove `ErrSlowReader` implementation in stream
  • Loading branch information
embano1 authored Jan 13, 2022
2 parents d7e0c6e + e2cd043 commit b2818a9
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 245 deletions.
56 changes: 30 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ An easy to use, lightweight, thread-safe and append-only in-memory data
structure modelled as a *Log*.

❌ Note: this package is not about providing an in-memory `logging` library. To
read more about the ideas behind `memlog` please read ["The Log: What every
read more about the ideas behind `memlog` please see ["The Log: What every
software engineer should know about real-time data's unifying
abstraction"](https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying).
## Motivation
Expand Down Expand Up @@ -80,7 +80,7 @@ value.
[`kvstore`](https://github.com/knative/pkg/tree/main/kvstore) provides a nice
abstraction on top of a `ConfigMap` for such requirements.

If the `memlog` process crashes, it can then resume the last checkpointed
If the `memlog` process crashes, it can then resume from the last checkpointed
`Offset`, load the changes since then from the source and resume streaming.

💡 This approach is quiet similar to the Kubernetes `ListerWatcher()`
Expand Down Expand Up @@ -124,34 +124,38 @@ creating multiple `Logs` might be useful. For example:
package main

import (
"context"
"log"
"context"
"fmt"
"os"

"github.com/embano1/memlog"
"github.com/embano1/memlog"
)

func main() {
ctx := context.Background()
l, err := memlog.New(ctx)
if err != nil {
log.Fatalf("create log: %v", err)
}

offset, err := l.Write(ctx, []byte("Hello World"))
if err != nil {
log.Fatalf("write: %v", err)
}

log.Printf("reading record from offset %d", offset)
record, err := l.Read(ctx, offset)
if err != nil {
log.Fatalf("write: %v", err)
}

log.Printf("record data: %s", record.Data)

// 2022/01/05 21:03:31 reading record from offset 0
// 2022/01/05 21:03:31 record data: Hello World
ctx := context.Background()
l, err := memlog.New(ctx)
if err != nil {
fmt.Printf("create log: %v", err)
os.Exit(1)
}

offset, err := l.Write(ctx, []byte("Hello World"))
if err != nil {
fmt.Printf("write: %v", err)
os.Exit(1)
}

fmt.Printf("reading record at offset %d\n", offset)
record, err := l.Read(ctx, offset)
if err != nil {
fmt.Printf("read: %v", err)
os.Exit(1)
}

fmt.Printf("data says: %s", record.Data)

// reading record at offset 0
// data says: Hello World
}
```

Expand Down
66 changes: 27 additions & 39 deletions example_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ import (
"errors"
"fmt"
"os"
"sync"
"time"

"golang.org/x/sync/errgroup"

"github.com/embano1/memlog"
)

func Example_stream() {
// showing some custom options in action
const (
logStart = 10
logSize = 100
logStart = 10
logSize = 100
writeRecords = 10
)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -31,7 +33,6 @@ func Example_stream() {
os.Exit(1)
}

const writeRecords = 10
// write some records (offsets 10-14)
for i := 0; i < writeRecords/2; i++ {
d := fmt.Sprintf(`{"id":%d,"message","hello world"}`, i+logStart)
Expand All @@ -42,59 +43,46 @@ func Example_stream() {
}
}

// start stream from latest (offset 14)
_, latest := l.Range(ctx)
recChan, errChan := l.Stream(ctx, latest)

var wg sync.WaitGroup
eg, egCtx := errgroup.WithContext(ctx)

_, latest := l.Range(egCtx)
// stream records
wg.Add(1)
go func() {
defer wg.Done()
eg.Go(func() error {
// start stream from latest (offset 14)
stream := l.Stream(egCtx, latest)

for {
select {
case r := <-recChan:
fmt.Printf("Record at offset %d says %q\n", r.Record.Metadata.Offset, r.Record.Data)
case streamErr := <-errChan:
if errors.Is(streamErr, context.Canceled) {
return
}
fmt.Printf("stream: %v", streamErr)
os.Exit(1)
if r, ok := stream.Next(); ok {
fmt.Printf("Record at offset %d says %q\n", r.Metadata.Offset, r.Data)
continue
}
break
}
}()
return stream.Err()
})

// continue writing while streaming
wg.Add(1)
go func() {
defer func() {
wg.Done()
}()
eg.Go(func() error {
for i := writeRecords / 2; i < writeRecords; i++ {
d := fmt.Sprintf(`{"id":%d,"message","hello world"}`, i+logStart)
_, err := l.Write(ctx, []byte(d))
if err != nil && !errors.Is(err, context.Canceled) {
fmt.Printf("write: %v", err)
os.Exit(1)
return err
}
}
}()
return nil
})

// simulate SIGTERM after 2s
wg.Add(1)
go func() {
defer wg.Done()
eg.Go(func() error {
time.Sleep(time.Second * 2)
cancel()
}()
return nil
})

wg.Wait()

// drain any remaining records to release the closed (buffered) channel
for r := range recChan {
fmt.Printf("Record at offset %d says %q\n", r.Record.Metadata.Offset, r.Record.Data)
if err = eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
fmt.Printf("run example: %v", err)
os.Exit(1)
}

// Output: Record at offset 14 says "{\"id\":14,\"message\",\"hello world\"}"
Expand Down
4 changes: 2 additions & 2 deletions memlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/embano1/memlog"
)

func Test_Log_Checkpoint_Resume(t *testing.T) {
func TestLog_Checkpoint_Resume(t *testing.T) {
const (
sourceDataCount = 50
start = memlog.Offset(0)
Expand Down Expand Up @@ -132,7 +132,7 @@ func Test_Log_Checkpoint_Resume(t *testing.T) {
})
}

func Test_Log_Concurrent(t *testing.T) {
func TestLog_Concurrent(t *testing.T) {
type wantOffsets struct {
earliest memlog.Offset
latest memlog.Offset
Expand Down
142 changes: 53 additions & 89 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,105 +7,69 @@ import (
)

const (
streamBuffer = 100 // limit before ErrSlowReader
streamBackoffInterval = time.Millisecond * 10
)

// ErrSlowReader is returned by a stream when the stream buffer is full
var ErrSlowReader = errors.New("slow reader blocking stream channel send")

// StreamHeader is metadata associated with a stream record
type StreamHeader struct {
Earliest Offset
Latest Offset
}

// StreamRecord is a record received from a stream
type StreamRecord struct {
Metadata StreamHeader
Record Record
// Stream is an iterator to stream records in order from a log
type Stream struct {
ctx context.Context
log *Log
position Offset
done bool
err error
}

// Stream streams records over the returned channel, starting at the given start
// offset. If the start offset is in the future, stream will wait until this
// offset is written.
//
// Both channels are closed when the provided context is cancelled or an
// unrecoverable error, e.g. ErrOutOfRange, occurs.
//
// If the caller is not keeping up with the record stream (buffered channel),
// ErrSlowReader will be returned and the stream terminates.
//
// The caller must drain both (closed) channels to release all associated
// resources.
// Next returns the next Record, but only if ok is true. If ok is false, the
// iterator was stopped and any subsequent calls will return an invalid record
// and false.
//
// Safe for concurrent use.
func (l *Log) Stream(ctx context.Context, start Offset) (<-chan StreamRecord, <-chan error) {
var (
// when buffer is full, returns with ErrSlowReader
streamCh = make(chan StreamRecord, streamBuffer)

// unbuffered to guarantee delivery before returning. avoids coding
// complexity/bugs on caller side when streamCh is closed (returning invalid
// empty Records to receiver)
errCh = make(chan error)
)

go func() {
defer func() {
close(streamCh)
close(errCh)
}()

offset := start
for {
select {
case <-ctx.Done():
errCh <- ctx.Err()
return

default:
sendOne := func() error {
if len(streamCh) == streamBuffer {
return ErrSlowReader
}

l.mu.RLock()
defer l.mu.RUnlock()

earliest, latest := l.offsetRange()
r, err := l.read(ctx, offset)
if err != nil {
if errors.Is(err, ErrFutureOffset) {
// back off and continue polling
time.Sleep(streamBackoffInterval)
return nil
}
// The caller must consult Err() which error, if any, caused stopping the error.
func (s *Stream) Next() (r Record, ok bool) {
for {
if s.done {
return Record{}, false
}

return err
}
if s.ctx.Err() != nil {
s.err = s.ctx.Err()
s.done = true
return Record{}, false
}

rec := StreamRecord{
Metadata: StreamHeader{
Earliest: earliest,
Latest: latest,
},
Record: r,
}
r, err := s.log.Read(s.ctx, s.position)
if err != nil {
if errors.Is(err, ErrFutureOffset) {
// back off and continue polling
time.Sleep(streamBackoffInterval)
continue
}

streamCh <- rec
offset = r.Metadata.Offset + 1
s.err = err
s.done = true
return Record{}, false
}

return nil
}
s.position = r.Metadata.Offset + 1
return r, true
}
}

if err := sendOne(); err != nil {
errCh <- err
return
}
}
}
}()
// Err returns the first error, if any, that has ocurred during streaming. When
// the stream iterator is stopped this method should be called to inspect
// whether the iterator was stopped due to an error.
func (s *Stream) Err() error {
return s.err
}

return streamCh, errCh
// Stream returns a stream iterator to stream records, starting at the given
// start offset. If the start offset is in the future, stream will continuously
// poll until this offset is written.
//
// The returned stream iterator must only be used within the same goroutine.
func (l *Log) Stream(ctx context.Context, start Offset) Stream {
return Stream{
ctx: ctx,
log: l,
position: start,
}
}
Loading

0 comments on commit b2818a9

Please sign in to comment.