From d431fc893cb1d933894746609df1ada600ee14ea Mon Sep 17 00:00:00 2001 From: Victor Conner Date: Fri, 14 Jun 2024 07:21:13 +0200 Subject: [PATCH] Improved the documentation --- buffer.go | 3 ++- cache.go | 17 ++++++++++------- clock.go | 18 ++++++++++++++++++ distribution.go | 14 ++++++++++++++ fetch.go | 8 ++++---- inflight.go | 4 ++-- refresh.go | 2 +- shard.go | 2 +- 8 files changed, 52 insertions(+), 16 deletions(-) diff --git a/buffer.go b/buffer.go index 046f4b8..1080d71 100644 --- a/buffer.go +++ b/buffer.go @@ -4,11 +4,13 @@ import ( "time" ) +// buffer represents a buffer for a batch refresh. type buffer struct { channel chan []string ids []string } +// createBuffer should be called WITH a lock when a refresh buffer is created. func (c *Client[T]) createBuffer(permutation string, ids []string) { bufferIDs := make([]string, 0, c.bufferSize) bufferIDs = append(bufferIDs, ids...) @@ -74,7 +76,6 @@ func bufferBatchRefresh[T any](c *Client[T], ids []string, keyFn KeyFn, fetchFn c.safeGo(func() { bufferBatchRefresh(c, ids, keyFn, fetchFn) }) - return } return } diff --git a/cache.go b/cache.go index e388be8..65034a7 100644 --- a/cache.go +++ b/cache.go @@ -124,19 +124,22 @@ func (c *Client[T]) getShard(key string) *shard[T] { return c.shards[shardIndex] } -func (c *Client[T]) get(key string) (value T, exists, ignore, refresh bool) { +// getWithState retrieves a single value from the cache and returns additional +// information about the state of the record. The state includes whether the record +// exists, if it has been marked as missing, and if it is due for a refresh. +func (c *Client[T]) getWithState(key string) (value T, exists, markedAsMissing, refresh bool) { shard := c.getShard(key) - val, exists, ignore, refresh := shard.get(key) - c.reportCacheHits(exists, ignore, refresh) - return val, exists, ignore, refresh + val, exists, markedAsMissing, refresh := shard.get(key) + c.reportCacheHits(exists, markedAsMissing, refresh) + return val, exists, markedAsMissing, refresh } // Get retrieves a single value from the cache. func (c *Client[T]) Get(key string) (T, bool) { shard := c.getShard(key) - val, ok, ignore, refresh := shard.get(key) - c.reportCacheHits(ok, ignore, refresh) - return val, ok && !ignore + val, ok, markedAsMissing, refresh := shard.get(key) + c.reportCacheHits(ok, markedAsMissing, refresh) + return val, ok && !markedAsMissing } // GetMany retrieves multiple values from the cache. diff --git a/clock.go b/clock.go index 787a256..c525a76 100644 --- a/clock.go +++ b/clock.go @@ -6,6 +6,7 @@ import ( "time" ) +// Clock is an abstraction for time.Time package that allows for testing. type Clock interface { Now() time.Time NewTicker(d time.Duration) (<-chan time.Time, func()) @@ -13,26 +14,32 @@ type Clock interface { Since(t time.Time) time.Duration } +// RealClock provides functions that wraps the real time.Time package. type RealClock struct{} +// NewClock returns a new RealClock. func NewClock() *RealClock { return &RealClock{} } +// Now wraps time.Now() from the standard library. func (c *RealClock) Now() time.Time { return time.Now() } +// NewTicker returns the channel and stop function from the ticker from the standard library. func (c *RealClock) NewTicker(d time.Duration) (<-chan time.Time, func()) { t := time.NewTicker(d) return t.C, t.Stop } +// NewTimer returns the channel and stop function from the timer from the standard library. func (c *RealClock) NewTimer(d time.Duration) (<-chan time.Time, func() bool) { t := time.NewTimer(d) return t.C, t.Stop } +// Since wraps time.Since() from the standard library. func (c *RealClock) Since(t time.Time) time.Duration { return time.Since(t) } @@ -50,6 +57,7 @@ type testTicker struct { stopped *atomic.Bool } +// TestClock is a clock that satisfies the Clock interface. It should only be used for testing. type TestClock struct { mu sync.Mutex time time.Time @@ -57,6 +65,7 @@ type TestClock struct { tickers []*testTicker } +// NewTestClock returns a new TestClock with the specified time. func NewTestClock(time time.Time) *TestClock { var c TestClock c.time = time @@ -65,6 +74,7 @@ func NewTestClock(time time.Time) *TestClock { return &c } +// Set sets the internal time of the test clock and triggers any timers or tickers that should fire. func (c *TestClock) Set(t time.Time) { c.mu.Lock() defer c.mu.Unlock() @@ -98,16 +108,21 @@ func (c *TestClock) Set(t time.Time) { c.timers = unfiredTimers } +// Add adds the duration to the internal time of the test clock +// and triggers any timers or tickers that should fire. func (c *TestClock) Add(d time.Duration) { c.Set(c.time.Add(d)) } +// Now returns the internal time of the test clock. func (c *TestClock) Now() time.Time { c.mu.Lock() defer c.mu.Unlock() return c.time } +// NewTicker creates a new ticker that will fire every time +// the internal clock advances by the specified duration. func (c *TestClock) NewTicker(d time.Duration) (<-chan time.Time, func()) { c.mu.Lock() defer c.mu.Unlock() @@ -123,6 +138,8 @@ func (c *TestClock) NewTicker(d time.Duration) (<-chan time.Time, func()) { return ch, stop } +// NewTimer creates a new timer that will fire once the internal time +// of the clock has been advanced passed the specified duration. func (c *TestClock) NewTimer(d time.Duration) (<-chan time.Time, func() bool) { c.mu.Lock() defer c.mu.Unlock() @@ -145,6 +162,7 @@ func (c *TestClock) NewTimer(d time.Duration) (<-chan time.Time, func() bool) { return ch, stop } +// Since returns the duration between the internal time of the clock and the specified time. func (c *TestClock) Since(t time.Time) time.Duration { return c.Now().Sub(t) } diff --git a/distribution.go b/distribution.go index aa5004f..6e1d6f8 100644 --- a/distribution.go +++ b/distribution.go @@ -9,12 +9,16 @@ import ( "time" ) +// distributedRecord represents the records that we're writing to the distributed storage. type distributedRecord[V any] struct { CreatedAt time.Time `json:"created_at"` Value V `json:"value"` IsMissingRecord bool `json:"is_missing_record"` } +// DistributedStorage is an abstraction that the cache interacts with in order +// to keep the distributed storage and in-memory cache in sync. Please note that +// you are responsible for setting the TTL and eviction policy of this storage. type DistributedStorage interface { Get(ctx context.Context, key string) ([]byte, bool) Set(ctx context.Context, key string, value []byte) @@ -22,19 +26,29 @@ type DistributedStorage interface { SetBatch(ctx context.Context, records map[string][]byte) } +// DistributedStorageWithDeletions is an abstraction that the cache interacts +// with when you want to use a distributed storage with early refreshes. Please +// note that you are responsible for setting the TTL and eviction policy of +// this storage. The cache will only call the delete functions when it performs +// a refresh and notices that the record has been deleted at the underlying +// data source. type DistributedStorageWithDeletions interface { DistributedStorage Delete(ctx context.Context, key string) DeleteBatch(ctx context.Context, keys []string) } +// distributedStorage adds noop implementations for the delete functions so +// that the cache doesn't have to deal with multiple storage types. type distributedStorage struct { DistributedStorage } +// Delete is a noop implementation of the delete function. func (d *distributedStorage) Delete(_ context.Context, _ string) { } +// DeleteBatch is a noop implementation of the delete batch function. func (d *distributedStorage) DeleteBatch(_ context.Context, _ []string) { } diff --git a/fetch.go b/fetch.go index 0785bf2..07c94b8 100644 --- a/fetch.go +++ b/fetch.go @@ -12,14 +12,14 @@ func (c *Client[T]) groupIDs(ids []string, keyFn KeyFn) (hits map[string]T, miss for _, id := range ids { key := keyFn(id) - value, exists, shouldIgnore, shouldRefresh := c.get(key) + value, exists, markedAsMissing, shouldRefresh := c.getWithState(key) // Check if the record should be refreshed in the background. if shouldRefresh { refreshes = append(refreshes, id) } - if shouldIgnore { + if markedAsMissing { continue } @@ -37,7 +37,7 @@ func getFetch[V, T any](ctx context.Context, c *Client[T], key string, fetchFn F wrappedFetch := wrap[T](distributedFetch(c, key, fetchFn)) // Begin by checking if we have the item in our cache. - value, ok, shouldIgnore, shouldRefresh := c.get(key) + value, ok, markedAsMissing, shouldRefresh := c.getWithState(key) if shouldRefresh { c.safeGo(func() { @@ -45,7 +45,7 @@ func getFetch[V, T any](ctx context.Context, c *Client[T], key string, fetchFn F }) } - if shouldIgnore { + if markedAsMissing { return value, ErrMissingRecord } diff --git a/inflight.go b/inflight.go index 5ee4180..098813d 100644 --- a/inflight.go +++ b/inflight.go @@ -171,8 +171,8 @@ func callAndCacheBatch[V, T any](ctx context.Context, c *Client[T], opts callBat return response, call.err } - // Remember: we need to iterate through the values we have for this call. - // We might just need one ID and the batch could contain a hundred. + // We need to iterate through the values that we want from this call. The + // batch could contain a hundred IDs, but we might only want a few of them. for _, id := range callIDs { v, ok := call.val[id] if !ok { diff --git a/refresh.go b/refresh.go index b1ea5d0..00abb6d 100644 --- a/refresh.go +++ b/refresh.go @@ -28,7 +28,7 @@ func (c *Client[T]) refreshBatch(ids []string, keyFn KeyFn, fetchFn BatchFetchFn // Check if any of the records have been deleted at the data source. for _, id := range ids { - _, okCache, _, _ := c.get(keyFn(id)) + _, okCache, _, _ := c.getWithState(keyFn(id)) _, okResponse := response[id] if okResponse { diff --git a/shard.go b/shard.go index a9d9faf..09e05c2 100644 --- a/shard.go +++ b/shard.go @@ -66,7 +66,7 @@ func (s *shard[T]) forceEvict() { s.reportEntriesEvicted(entriesEvicted) } -func (s *shard[T]) get(key string) (val T, exists, ignore, refresh bool) { +func (s *shard[T]) get(key string) (val T, exists, markedAsMissing, refresh bool) { s.RLock() item, ok := s.entries[key] if !ok {