Skip to content

Commit

Permalink
Made execution.Cache an interface so we can mock it in unit tests (ca…
Browse files Browse the repository at this point in the history
…dence-workflow#6058)

What changed?
Made execution cache an interface

Why?
Now we can mock it in unit tests

How did you test it?
Unit and integration tests

Potential risks
Touches a lot of files but should be a safe change

Release notes

Documentation Changes
  • Loading branch information
jakobht authored and timl3136 committed Jun 6, 2024
1 parent 58fc1d0 commit 6fab563
Show file tree
Hide file tree
Showing 31 changed files with 345 additions and 65 deletions.
4 changes: 2 additions & 2 deletions service/history/decision/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type (
shard shard.Context
timeSource clock.TimeSource
domainCache cache.DomainCache
executionCache *execution.Cache
executionCache execution.Cache
tokenSerializer common.TaskTokenSerializer
metricsClient metrics.Client
logger log.Logger
Expand All @@ -74,7 +74,7 @@ type (
// NewHandler creates a new Handler for handling decision business logic
func NewHandler(
shard shard.Context,
executionCache *execution.Cache,
executionCache execution.Cache,
tokenSerializer common.TaskTokenSerializer,
) Handler {
config := shard.GetConfig()
Expand Down
2 changes: 1 addition & 1 deletion service/history/decision/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *DecisionHandlerSuite) TestNewHandler() {
shardContext.EXPECT().GetDomainCache().Times(2)
shardContext.EXPECT().GetMetricsClient().Times(2)
shardContext.EXPECT().GetThrottledLogger().Times(1).Return(testlogger.New(s.T()))
h := NewHandler(shardContext, &execution.Cache{}, tokenSerializer)
h := NewHandler(shardContext, execution.NewMockCache(s.controller), tokenSerializer)
s.NotNil(h)
s.Equal("handlerImpl", reflect.ValueOf(h).Elem().Type().Name())
}
Expand Down
4 changes: 2 additions & 2 deletions service/history/engine/engineimpl/history_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type historyEngineImpl struct {
nDCActivityReplicator ndc.ActivityReplicator
historyEventNotifier events.Notifier
tokenSerializer common.TaskTokenSerializer
executionCache *execution.Cache
executionCache execution.Cache
metricsClient metrics.Client
logger log.Logger
throttledLogger log.Logger
Expand All @@ -120,7 +120,7 @@ type historyEngineImpl struct {
wfIDCache workflowcache.WFCache
ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter

updateWithActionFn func(context.Context, *execution.Cache, string, types.WorkflowExecution, bool, time.Time, func(wfContext execution.Context, mutableState execution.MutableState) error) error
updateWithActionFn func(context.Context, execution.Cache, string, types.WorkflowExecution, bool, time.Time, func(wfContext execution.Context, mutableState execution.MutableState) error) error
}

var (
Expand Down
2 changes: 1 addition & 1 deletion service/history/engine/engineimpl/history_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5450,7 +5450,7 @@ func TestRecordChildExecutionCompleted(t *testing.T) {
timeSource: mockShard.GetTimeSource(),
metricsClient: metrics.NewClient(tally.NoopScope, metrics.History),
logger: mockShard.GetLogger(),
updateWithActionFn: func(_ context.Context, _ *execution.Cache, _ string, _ types.WorkflowExecution, _ bool, _ time.Time, actionFn func(wfContext execution.Context, mutableState execution.MutableState) error) error {
updateWithActionFn: func(_ context.Context, _ execution.Cache, _ string, _ types.WorkflowExecution, _ bool, _ time.Time, actionFn func(wfContext execution.Context, mutableState execution.MutableState) error) error {
return actionFn(nil, ms)
},
}
Expand Down
66 changes: 54 additions & 12 deletions service/history/execution/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination cache_mock.go

package execution

import (
Expand All @@ -40,12 +42,52 @@ import (
"github.com/uber/cadence/service/history/shard"
)

// Cache is a cache that holds workflow execution context
type Cache interface {
cache.Cache

// GetOrCreateCurrentWorkflowExecution gets or creates workflow execution context for the current run
GetOrCreateCurrentWorkflowExecution(
ctx context.Context,
domainID string,
workflowID string,
) (Context, ReleaseFunc, error)

// GetAndCreateWorkflowExecution is for analyzing mutableState, it will try getting Context from cache
// and also load from database
GetAndCreateWorkflowExecution(
ctx context.Context,
domainID string,
execution types.WorkflowExecution,
) (Context, Context, ReleaseFunc, bool, error)

// GetOrCreateWorkflowExecutionForBackground gets or creates workflow execution context with background context
GetOrCreateWorkflowExecutionForBackground(
domainID string,
execution types.WorkflowExecution,
) (Context, ReleaseFunc, error)

// GetOrCreateWorkflowExecutionWithTimeout gets or creates workflow execution context with timeout
GetOrCreateWorkflowExecutionWithTimeout(
domainID string,
execution types.WorkflowExecution,
timeout time.Duration,
) (Context, ReleaseFunc, error)

// GetOrCreateWorkflowExecution gets or creates workflow execution context
GetOrCreateWorkflowExecution(
ctx context.Context,
domainID string,
execution types.WorkflowExecution,
) (Context, ReleaseFunc, error)
}

type (
// ReleaseFunc releases workflow execution context
ReleaseFunc func(err error)

// Cache caches workflow execution context
Cache struct {
cacheImpl struct {
cache.Cache
shard shard.Context
executionManager persistence.ExecutionManager
Expand All @@ -67,15 +109,15 @@ const (
)

// NewCache creates a new workflow execution context cache
func NewCache(shard shard.Context) *Cache {
func NewCache(shard shard.Context) Cache {
opts := &cache.Options{}
config := shard.GetConfig()
opts.InitialCapacity = config.HistoryCacheInitialSize()
opts.TTL = config.HistoryCacheTTL()
opts.Pin = true
opts.MaxCount = config.HistoryCacheMaxSize()

return &Cache{
return &cacheImpl{
Cache: cache.New(opts),
shard: shard,
executionManager: shard.GetExecutionManager(),
Expand All @@ -86,7 +128,7 @@ func NewCache(shard shard.Context) *Cache {
}

// GetOrCreateCurrentWorkflowExecution gets or creates workflow execution context for the current run
func (c *Cache) GetOrCreateCurrentWorkflowExecution(
func (c *cacheImpl) GetOrCreateCurrentWorkflowExecution(
ctx context.Context,
domainID string,
workflowID string,
Expand Down Expand Up @@ -115,7 +157,7 @@ func (c *Cache) GetOrCreateCurrentWorkflowExecution(

// GetAndCreateWorkflowExecution is for analyzing mutableState, it will try getting Context from cache
// and also load from database
func (c *Cache) GetAndCreateWorkflowExecution(
func (c *cacheImpl) GetAndCreateWorkflowExecution(
ctx context.Context,
domainID string,
execution types.WorkflowExecution,
Expand Down Expand Up @@ -157,7 +199,7 @@ func (c *Cache) GetAndCreateWorkflowExecution(

// GetOrCreateWorkflowExecutionForBackground gets or creates workflow execution context with background context
// currently only used in tests
func (c *Cache) GetOrCreateWorkflowExecutionForBackground(
func (c *cacheImpl) GetOrCreateWorkflowExecutionForBackground(
domainID string,
execution types.WorkflowExecution,
) (Context, ReleaseFunc, error) {
Expand All @@ -166,7 +208,7 @@ func (c *Cache) GetOrCreateWorkflowExecutionForBackground(
}

// GetOrCreateWorkflowExecutionWithTimeout gets or creates workflow execution context with timeout
func (c *Cache) GetOrCreateWorkflowExecutionWithTimeout(
func (c *cacheImpl) GetOrCreateWorkflowExecutionWithTimeout(
domainID string,
execution types.WorkflowExecution,
timeout time.Duration,
Expand All @@ -179,7 +221,7 @@ func (c *Cache) GetOrCreateWorkflowExecutionWithTimeout(
}

// GetOrCreateWorkflowExecution gets or creates workflow execution context
func (c *Cache) GetOrCreateWorkflowExecution(
func (c *cacheImpl) GetOrCreateWorkflowExecution(
ctx context.Context,
domainID string,
execution types.WorkflowExecution,
Expand All @@ -204,7 +246,7 @@ func (c *Cache) GetOrCreateWorkflowExecution(
)
}

func (c *Cache) getOrCreateWorkflowExecutionInternal(
func (c *cacheImpl) getOrCreateWorkflowExecutionInternal(
ctx context.Context,
domainID string,
execution types.WorkflowExecution,
Expand Down Expand Up @@ -245,7 +287,7 @@ func (c *Cache) getOrCreateWorkflowExecutionInternal(
return workflowCtx, releaseFunc, nil
}

func (c *Cache) validateWorkflowExecutionInfo(
func (c *cacheImpl) validateWorkflowExecutionInfo(
ctx context.Context,
domainID string,
execution *types.WorkflowExecution,
Expand Down Expand Up @@ -277,7 +319,7 @@ func (c *Cache) validateWorkflowExecutionInfo(
return nil
}

func (c *Cache) makeReleaseFunc(
func (c *cacheImpl) makeReleaseFunc(
key definition.WorkflowIdentifier,
context Context,
forceClearContext bool,
Expand Down Expand Up @@ -305,7 +347,7 @@ func (c *Cache) makeReleaseFunc(
}
}

func (c *Cache) getCurrentExecutionWithRetry(
func (c *cacheImpl) getCurrentExecutionWithRetry(
ctx context.Context,
request *persistence.GetCurrentExecutionRequest,
) (*persistence.GetCurrentExecutionResponse, error) {
Expand Down
Loading

0 comments on commit 6fab563

Please sign in to comment.