Skip to content

Commit

Permalink
Improved the documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
creativecreature committed Jun 14, 2024
1 parent 2dd08c3 commit d431fc8
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 16 deletions.
3 changes: 2 additions & 1 deletion buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"time"
)

// buffer represents a buffer for a batch refresh.
type buffer struct {
channel chan []string
ids []string
}

// createBuffer should be called WITH a lock when a refresh buffer is created.
func (c *Client[T]) createBuffer(permutation string, ids []string) {
bufferIDs := make([]string, 0, c.bufferSize)
bufferIDs = append(bufferIDs, ids...)
Expand Down Expand Up @@ -74,7 +76,6 @@ func bufferBatchRefresh[T any](c *Client[T], ids []string, keyFn KeyFn, fetchFn
c.safeGo(func() {
bufferBatchRefresh(c, ids, keyFn, fetchFn)
})
return
}
return
}
Expand Down
17 changes: 10 additions & 7 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,22 @@ func (c *Client[T]) getShard(key string) *shard[T] {
return c.shards[shardIndex]
}

func (c *Client[T]) get(key string) (value T, exists, ignore, refresh bool) {
// getWithState retrieves a single value from the cache and returns additional
// information about the state of the record. The state includes whether the record
// exists, if it has been marked as missing, and if it is due for a refresh.
func (c *Client[T]) getWithState(key string) (value T, exists, markedAsMissing, refresh bool) {
shard := c.getShard(key)
val, exists, ignore, refresh := shard.get(key)
c.reportCacheHits(exists, ignore, refresh)
return val, exists, ignore, refresh
val, exists, markedAsMissing, refresh := shard.get(key)
c.reportCacheHits(exists, markedAsMissing, refresh)
return val, exists, markedAsMissing, refresh
}

// Get retrieves a single value from the cache.
func (c *Client[T]) Get(key string) (T, bool) {
shard := c.getShard(key)
val, ok, ignore, refresh := shard.get(key)
c.reportCacheHits(ok, ignore, refresh)
return val, ok && !ignore
val, ok, markedAsMissing, refresh := shard.get(key)
c.reportCacheHits(ok, markedAsMissing, refresh)
return val, ok && !markedAsMissing
}

// GetMany retrieves multiple values from the cache.
Expand Down
18 changes: 18 additions & 0 deletions clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,40 @@ import (
"time"
)

// Clock is an abstraction for time.Time package that allows for testing.
type Clock interface {
Now() time.Time
NewTicker(d time.Duration) (<-chan time.Time, func())
NewTimer(d time.Duration) (<-chan time.Time, func() bool)
Since(t time.Time) time.Duration
}

// RealClock provides functions that wraps the real time.Time package.
type RealClock struct{}

// NewClock returns a new RealClock.
func NewClock() *RealClock {
return &RealClock{}
}

// Now wraps time.Now() from the standard library.
func (c *RealClock) Now() time.Time {
return time.Now()
}

// NewTicker returns the channel and stop function from the ticker from the standard library.
func (c *RealClock) NewTicker(d time.Duration) (<-chan time.Time, func()) {
t := time.NewTicker(d)
return t.C, t.Stop
}

// NewTimer returns the channel and stop function from the timer from the standard library.
func (c *RealClock) NewTimer(d time.Duration) (<-chan time.Time, func() bool) {
t := time.NewTimer(d)
return t.C, t.Stop
}

// Since wraps time.Since() from the standard library.
func (c *RealClock) Since(t time.Time) time.Duration {
return time.Since(t)
}
Expand All @@ -50,13 +57,15 @@ type testTicker struct {
stopped *atomic.Bool
}

// TestClock is a clock that satisfies the Clock interface. It should only be used for testing.
type TestClock struct {
mu sync.Mutex
time time.Time
timers []*testTimer
tickers []*testTicker
}

// NewTestClock returns a new TestClock with the specified time.
func NewTestClock(time time.Time) *TestClock {
var c TestClock
c.time = time
Expand All @@ -65,6 +74,7 @@ func NewTestClock(time time.Time) *TestClock {
return &c
}

// Set sets the internal time of the test clock and triggers any timers or tickers that should fire.
func (c *TestClock) Set(t time.Time) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -98,16 +108,21 @@ func (c *TestClock) Set(t time.Time) {
c.timers = unfiredTimers
}

// Add adds the duration to the internal time of the test clock
// and triggers any timers or tickers that should fire.
func (c *TestClock) Add(d time.Duration) {
c.Set(c.time.Add(d))
}

// Now returns the internal time of the test clock.
func (c *TestClock) Now() time.Time {
c.mu.Lock()
defer c.mu.Unlock()
return c.time
}

// NewTicker creates a new ticker that will fire every time
// the internal clock advances by the specified duration.
func (c *TestClock) NewTicker(d time.Duration) (<-chan time.Time, func()) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -123,6 +138,8 @@ func (c *TestClock) NewTicker(d time.Duration) (<-chan time.Time, func()) {
return ch, stop
}

// NewTimer creates a new timer that will fire once the internal time
// of the clock has been advanced passed the specified duration.
func (c *TestClock) NewTimer(d time.Duration) (<-chan time.Time, func() bool) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -145,6 +162,7 @@ func (c *TestClock) NewTimer(d time.Duration) (<-chan time.Time, func() bool) {
return ch, stop
}

// Since returns the duration between the internal time of the clock and the specified time.
func (c *TestClock) Since(t time.Time) time.Duration {
return c.Now().Sub(t)
}
14 changes: 14 additions & 0 deletions distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,46 @@ import (
"time"
)

// distributedRecord represents the records that we're writing to the distributed storage.
type distributedRecord[V any] struct {
CreatedAt time.Time `json:"created_at"`
Value V `json:"value"`
IsMissingRecord bool `json:"is_missing_record"`
}

// DistributedStorage is an abstraction that the cache interacts with in order
// to keep the distributed storage and in-memory cache in sync. Please note that
// you are responsible for setting the TTL and eviction policy of this storage.
type DistributedStorage interface {
Get(ctx context.Context, key string) ([]byte, bool)
Set(ctx context.Context, key string, value []byte)
GetBatch(ctx context.Context, keys []string) map[string][]byte
SetBatch(ctx context.Context, records map[string][]byte)
}

// DistributedStorageWithDeletions is an abstraction that the cache interacts
// with when you want to use a distributed storage with early refreshes. Please
// note that you are responsible for setting the TTL and eviction policy of
// this storage. The cache will only call the delete functions when it performs
// a refresh and notices that the record has been deleted at the underlying
// data source.
type DistributedStorageWithDeletions interface {
DistributedStorage
Delete(ctx context.Context, key string)
DeleteBatch(ctx context.Context, keys []string)
}

// distributedStorage adds noop implementations for the delete functions so
// that the cache doesn't have to deal with multiple storage types.
type distributedStorage struct {
DistributedStorage
}

// Delete is a noop implementation of the delete function.
func (d *distributedStorage) Delete(_ context.Context, _ string) {
}

// DeleteBatch is a noop implementation of the delete batch function.
func (d *distributedStorage) DeleteBatch(_ context.Context, _ []string) {
}

Expand Down
8 changes: 4 additions & 4 deletions fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ func (c *Client[T]) groupIDs(ids []string, keyFn KeyFn) (hits map[string]T, miss

for _, id := range ids {
key := keyFn(id)
value, exists, shouldIgnore, shouldRefresh := c.get(key)
value, exists, markedAsMissing, shouldRefresh := c.getWithState(key)

// Check if the record should be refreshed in the background.
if shouldRefresh {
refreshes = append(refreshes, id)
}

if shouldIgnore {
if markedAsMissing {
continue
}

Expand All @@ -37,15 +37,15 @@ func getFetch[V, T any](ctx context.Context, c *Client[T], key string, fetchFn F
wrappedFetch := wrap[T](distributedFetch(c, key, fetchFn))

// Begin by checking if we have the item in our cache.
value, ok, shouldIgnore, shouldRefresh := c.get(key)
value, ok, markedAsMissing, shouldRefresh := c.getWithState(key)

if shouldRefresh {
c.safeGo(func() {
c.refresh(key, wrappedFetch)
})
}

if shouldIgnore {
if markedAsMissing {
return value, ErrMissingRecord
}

Expand Down
4 changes: 2 additions & 2 deletions inflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ func callAndCacheBatch[V, T any](ctx context.Context, c *Client[T], opts callBat
return response, call.err
}

// Remember: we need to iterate through the values we have for this call.
// We might just need one ID and the batch could contain a hundred.
// We need to iterate through the values that we want from this call. The
// batch could contain a hundred IDs, but we might only want a few of them.
for _, id := range callIDs {
v, ok := call.val[id]
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (c *Client[T]) refreshBatch(ids []string, keyFn KeyFn, fetchFn BatchFetchFn

// Check if any of the records have been deleted at the data source.
for _, id := range ids {
_, okCache, _, _ := c.get(keyFn(id))
_, okCache, _, _ := c.getWithState(keyFn(id))
_, okResponse := response[id]

if okResponse {
Expand Down
2 changes: 1 addition & 1 deletion shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *shard[T]) forceEvict() {
s.reportEntriesEvicted(entriesEvicted)
}

func (s *shard[T]) get(key string) (val T, exists, ignore, refresh bool) {
func (s *shard[T]) get(key string) (val T, exists, markedAsMissing, refresh bool) {
s.RLock()
item, ok := s.entries[key]
if !ok {
Expand Down

0 comments on commit d431fc8

Please sign in to comment.