diff --git a/service/history/decision/handler.go b/service/history/decision/handler.go index 07782763ea6..6fc8f799579 100644 --- a/service/history/decision/handler.go +++ b/service/history/decision/handler.go @@ -61,7 +61,7 @@ type ( shard shard.Context timeSource clock.TimeSource domainCache cache.DomainCache - executionCache *execution.Cache + executionCache execution.Cache tokenSerializer common.TaskTokenSerializer metricsClient metrics.Client logger log.Logger @@ -74,7 +74,7 @@ type ( // NewHandler creates a new Handler for handling decision business logic func NewHandler( shard shard.Context, - executionCache *execution.Cache, + executionCache execution.Cache, tokenSerializer common.TaskTokenSerializer, ) Handler { config := shard.GetConfig() diff --git a/service/history/decision/handler_test.go b/service/history/decision/handler_test.go index d2a675525c6..b9019d3f407 100644 --- a/service/history/decision/handler_test.go +++ b/service/history/decision/handler_test.go @@ -111,7 +111,7 @@ func (s *DecisionHandlerSuite) TestNewHandler() { shardContext.EXPECT().GetDomainCache().Times(2) shardContext.EXPECT().GetMetricsClient().Times(2) shardContext.EXPECT().GetThrottledLogger().Times(1).Return(testlogger.New(s.T())) - h := NewHandler(shardContext, &execution.Cache{}, tokenSerializer) + h := NewHandler(shardContext, execution.NewMockCache(s.controller), tokenSerializer) s.NotNil(h) s.Equal("handlerImpl", reflect.ValueOf(h).Elem().Type().Name()) } diff --git a/service/history/engine/engineimpl/history_engine.go b/service/history/engine/engineimpl/history_engine.go index 8d52c1a5c82..ec1ef428d8d 100644 --- a/service/history/engine/engineimpl/history_engine.go +++ b/service/history/engine/engineimpl/history_engine.go @@ -96,7 +96,7 @@ type historyEngineImpl struct { nDCActivityReplicator ndc.ActivityReplicator historyEventNotifier events.Notifier tokenSerializer common.TaskTokenSerializer - executionCache *execution.Cache + executionCache execution.Cache metricsClient metrics.Client logger log.Logger throttledLogger log.Logger @@ -120,7 +120,7 @@ type historyEngineImpl struct { wfIDCache workflowcache.WFCache ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter - updateWithActionFn func(context.Context, *execution.Cache, string, types.WorkflowExecution, bool, time.Time, func(wfContext execution.Context, mutableState execution.MutableState) error) error + updateWithActionFn func(context.Context, execution.Cache, string, types.WorkflowExecution, bool, time.Time, func(wfContext execution.Context, mutableState execution.MutableState) error) error } var ( diff --git a/service/history/engine/engineimpl/history_engine_test.go b/service/history/engine/engineimpl/history_engine_test.go index aed2550367f..74af6069057 100644 --- a/service/history/engine/engineimpl/history_engine_test.go +++ b/service/history/engine/engineimpl/history_engine_test.go @@ -5450,7 +5450,7 @@ func TestRecordChildExecutionCompleted(t *testing.T) { timeSource: mockShard.GetTimeSource(), metricsClient: metrics.NewClient(tally.NoopScope, metrics.History), logger: mockShard.GetLogger(), - updateWithActionFn: func(_ context.Context, _ *execution.Cache, _ string, _ types.WorkflowExecution, _ bool, _ time.Time, actionFn func(wfContext execution.Context, mutableState execution.MutableState) error) error { + updateWithActionFn: func(_ context.Context, _ execution.Cache, _ string, _ types.WorkflowExecution, _ bool, _ time.Time, actionFn func(wfContext execution.Context, mutableState execution.MutableState) error) error { return actionFn(nil, ms) }, } diff --git a/service/history/execution/cache.go b/service/history/execution/cache.go index dd56339be30..303e82a379b 100644 --- a/service/history/execution/cache.go +++ b/service/history/execution/cache.go @@ -18,6 +18,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination cache_mock.go + package execution import ( @@ -40,12 +42,52 @@ import ( "github.com/uber/cadence/service/history/shard" ) +// Cache is a cache that holds workflow execution context +type Cache interface { + cache.Cache + + // GetOrCreateCurrentWorkflowExecution gets or creates workflow execution context for the current run + GetOrCreateCurrentWorkflowExecution( + ctx context.Context, + domainID string, + workflowID string, + ) (Context, ReleaseFunc, error) + + // GetAndCreateWorkflowExecution is for analyzing mutableState, it will try getting Context from cache + // and also load from database + GetAndCreateWorkflowExecution( + ctx context.Context, + domainID string, + execution types.WorkflowExecution, + ) (Context, Context, ReleaseFunc, bool, error) + + // GetOrCreateWorkflowExecutionForBackground gets or creates workflow execution context with background context + GetOrCreateWorkflowExecutionForBackground( + domainID string, + execution types.WorkflowExecution, + ) (Context, ReleaseFunc, error) + + // GetOrCreateWorkflowExecutionWithTimeout gets or creates workflow execution context with timeout + GetOrCreateWorkflowExecutionWithTimeout( + domainID string, + execution types.WorkflowExecution, + timeout time.Duration, + ) (Context, ReleaseFunc, error) + + // GetOrCreateWorkflowExecution gets or creates workflow execution context + GetOrCreateWorkflowExecution( + ctx context.Context, + domainID string, + execution types.WorkflowExecution, + ) (Context, ReleaseFunc, error) +} + type ( // ReleaseFunc releases workflow execution context ReleaseFunc func(err error) // Cache caches workflow execution context - Cache struct { + cacheImpl struct { cache.Cache shard shard.Context executionManager persistence.ExecutionManager @@ -67,7 +109,7 @@ const ( ) // NewCache creates a new workflow execution context cache -func NewCache(shard shard.Context) *Cache { +func NewCache(shard shard.Context) Cache { opts := &cache.Options{} config := shard.GetConfig() opts.InitialCapacity = config.HistoryCacheInitialSize() @@ -75,7 +117,7 @@ func NewCache(shard shard.Context) *Cache { opts.Pin = true opts.MaxCount = config.HistoryCacheMaxSize() - return &Cache{ + return &cacheImpl{ Cache: cache.New(opts), shard: shard, executionManager: shard.GetExecutionManager(), @@ -86,7 +128,7 @@ func NewCache(shard shard.Context) *Cache { } // GetOrCreateCurrentWorkflowExecution gets or creates workflow execution context for the current run -func (c *Cache) GetOrCreateCurrentWorkflowExecution( +func (c *cacheImpl) GetOrCreateCurrentWorkflowExecution( ctx context.Context, domainID string, workflowID string, @@ -115,7 +157,7 @@ func (c *Cache) GetOrCreateCurrentWorkflowExecution( // GetAndCreateWorkflowExecution is for analyzing mutableState, it will try getting Context from cache // and also load from database -func (c *Cache) GetAndCreateWorkflowExecution( +func (c *cacheImpl) GetAndCreateWorkflowExecution( ctx context.Context, domainID string, execution types.WorkflowExecution, @@ -157,7 +199,7 @@ func (c *Cache) GetAndCreateWorkflowExecution( // GetOrCreateWorkflowExecutionForBackground gets or creates workflow execution context with background context // currently only used in tests -func (c *Cache) GetOrCreateWorkflowExecutionForBackground( +func (c *cacheImpl) GetOrCreateWorkflowExecutionForBackground( domainID string, execution types.WorkflowExecution, ) (Context, ReleaseFunc, error) { @@ -166,7 +208,7 @@ func (c *Cache) GetOrCreateWorkflowExecutionForBackground( } // GetOrCreateWorkflowExecutionWithTimeout gets or creates workflow execution context with timeout -func (c *Cache) GetOrCreateWorkflowExecutionWithTimeout( +func (c *cacheImpl) GetOrCreateWorkflowExecutionWithTimeout( domainID string, execution types.WorkflowExecution, timeout time.Duration, @@ -179,7 +221,7 @@ func (c *Cache) GetOrCreateWorkflowExecutionWithTimeout( } // GetOrCreateWorkflowExecution gets or creates workflow execution context -func (c *Cache) GetOrCreateWorkflowExecution( +func (c *cacheImpl) GetOrCreateWorkflowExecution( ctx context.Context, domainID string, execution types.WorkflowExecution, @@ -204,7 +246,7 @@ func (c *Cache) GetOrCreateWorkflowExecution( ) } -func (c *Cache) getOrCreateWorkflowExecutionInternal( +func (c *cacheImpl) getOrCreateWorkflowExecutionInternal( ctx context.Context, domainID string, execution types.WorkflowExecution, @@ -245,7 +287,7 @@ func (c *Cache) getOrCreateWorkflowExecutionInternal( return workflowCtx, releaseFunc, nil } -func (c *Cache) validateWorkflowExecutionInfo( +func (c *cacheImpl) validateWorkflowExecutionInfo( ctx context.Context, domainID string, execution *types.WorkflowExecution, @@ -277,7 +319,7 @@ func (c *Cache) validateWorkflowExecutionInfo( return nil } -func (c *Cache) makeReleaseFunc( +func (c *cacheImpl) makeReleaseFunc( key definition.WorkflowIdentifier, context Context, forceClearContext bool, @@ -305,7 +347,7 @@ func (c *Cache) makeReleaseFunc( } } -func (c *Cache) getCurrentExecutionWithRetry( +func (c *cacheImpl) getCurrentExecutionWithRetry( ctx context.Context, request *persistence.GetCurrentExecutionRequest, ) (*persistence.GetCurrentExecutionResponse, error) { diff --git a/service/history/execution/cache_mock.go b/service/history/execution/cache_mock.go new file mode 100644 index 00000000000..ebb8fd92e86 --- /dev/null +++ b/service/history/execution/cache_mock.go @@ -0,0 +1,238 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: cache.go + +// Package execution is a generated GoMock package. +package execution + +import ( + context "context" + reflect "reflect" + time "time" + + gomock "github.com/golang/mock/gomock" + + cache "github.com/uber/cadence/common/cache" + types "github.com/uber/cadence/common/types" +) + +// MockCache is a mock of Cache interface. +type MockCache struct { + ctrl *gomock.Controller + recorder *MockCacheMockRecorder +} + +// MockCacheMockRecorder is the mock recorder for MockCache. +type MockCacheMockRecorder struct { + mock *MockCache +} + +// NewMockCache creates a new mock instance. +func NewMockCache(ctrl *gomock.Controller) *MockCache { + mock := &MockCache{ctrl: ctrl} + mock.recorder = &MockCacheMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockCache) EXPECT() *MockCacheMockRecorder { + return m.recorder +} + +// Delete mocks base method. +func (m *MockCache) Delete(key interface{}) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Delete", key) +} + +// Delete indicates an expected call of Delete. +func (mr *MockCacheMockRecorder) Delete(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockCache)(nil).Delete), key) +} + +// Get mocks base method. +func (m *MockCache) Get(key interface{}) interface{} { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", key) + ret0, _ := ret[0].(interface{}) + return ret0 +} + +// Get indicates an expected call of Get. +func (mr *MockCacheMockRecorder) Get(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockCache)(nil).Get), key) +} + +// GetAndCreateWorkflowExecution mocks base method. +func (m *MockCache) GetAndCreateWorkflowExecution(ctx context.Context, domainID string, execution types.WorkflowExecution) (Context, Context, ReleaseFunc, bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAndCreateWorkflowExecution", ctx, domainID, execution) + ret0, _ := ret[0].(Context) + ret1, _ := ret[1].(Context) + ret2, _ := ret[2].(ReleaseFunc) + ret3, _ := ret[3].(bool) + ret4, _ := ret[4].(error) + return ret0, ret1, ret2, ret3, ret4 +} + +// GetAndCreateWorkflowExecution indicates an expected call of GetAndCreateWorkflowExecution. +func (mr *MockCacheMockRecorder) GetAndCreateWorkflowExecution(ctx, domainID, execution interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAndCreateWorkflowExecution", reflect.TypeOf((*MockCache)(nil).GetAndCreateWorkflowExecution), ctx, domainID, execution) +} + +// GetOrCreateCurrentWorkflowExecution mocks base method. +func (m *MockCache) GetOrCreateCurrentWorkflowExecution(ctx context.Context, domainID, workflowID string) (Context, ReleaseFunc, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOrCreateCurrentWorkflowExecution", ctx, domainID, workflowID) + ret0, _ := ret[0].(Context) + ret1, _ := ret[1].(ReleaseFunc) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// GetOrCreateCurrentWorkflowExecution indicates an expected call of GetOrCreateCurrentWorkflowExecution. +func (mr *MockCacheMockRecorder) GetOrCreateCurrentWorkflowExecution(ctx, domainID, workflowID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrCreateCurrentWorkflowExecution", reflect.TypeOf((*MockCache)(nil).GetOrCreateCurrentWorkflowExecution), ctx, domainID, workflowID) +} + +// GetOrCreateWorkflowExecution mocks base method. +func (m *MockCache) GetOrCreateWorkflowExecution(ctx context.Context, domainID string, execution types.WorkflowExecution) (Context, ReleaseFunc, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOrCreateWorkflowExecution", ctx, domainID, execution) + ret0, _ := ret[0].(Context) + ret1, _ := ret[1].(ReleaseFunc) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// GetOrCreateWorkflowExecution indicates an expected call of GetOrCreateWorkflowExecution. +func (mr *MockCacheMockRecorder) GetOrCreateWorkflowExecution(ctx, domainID, execution interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrCreateWorkflowExecution", reflect.TypeOf((*MockCache)(nil).GetOrCreateWorkflowExecution), ctx, domainID, execution) +} + +// GetOrCreateWorkflowExecutionForBackground mocks base method. +func (m *MockCache) GetOrCreateWorkflowExecutionForBackground(domainID string, execution types.WorkflowExecution) (Context, ReleaseFunc, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOrCreateWorkflowExecutionForBackground", domainID, execution) + ret0, _ := ret[0].(Context) + ret1, _ := ret[1].(ReleaseFunc) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// GetOrCreateWorkflowExecutionForBackground indicates an expected call of GetOrCreateWorkflowExecutionForBackground. +func (mr *MockCacheMockRecorder) GetOrCreateWorkflowExecutionForBackground(domainID, execution interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrCreateWorkflowExecutionForBackground", reflect.TypeOf((*MockCache)(nil).GetOrCreateWorkflowExecutionForBackground), domainID, execution) +} + +// GetOrCreateWorkflowExecutionWithTimeout mocks base method. +func (m *MockCache) GetOrCreateWorkflowExecutionWithTimeout(domainID string, execution types.WorkflowExecution, timeout time.Duration) (Context, ReleaseFunc, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOrCreateWorkflowExecutionWithTimeout", domainID, execution, timeout) + ret0, _ := ret[0].(Context) + ret1, _ := ret[1].(ReleaseFunc) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// GetOrCreateWorkflowExecutionWithTimeout indicates an expected call of GetOrCreateWorkflowExecutionWithTimeout. +func (mr *MockCacheMockRecorder) GetOrCreateWorkflowExecutionWithTimeout(domainID, execution, timeout interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrCreateWorkflowExecutionWithTimeout", reflect.TypeOf((*MockCache)(nil).GetOrCreateWorkflowExecutionWithTimeout), domainID, execution, timeout) +} + +// Iterator mocks base method. +func (m *MockCache) Iterator() cache.Iterator { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Iterator") + ret0, _ := ret[0].(cache.Iterator) + return ret0 +} + +// Iterator indicates an expected call of Iterator. +func (mr *MockCacheMockRecorder) Iterator() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Iterator", reflect.TypeOf((*MockCache)(nil).Iterator)) +} + +// Put mocks base method. +func (m *MockCache) Put(key, value interface{}) interface{} { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Put", key, value) + ret0, _ := ret[0].(interface{}) + return ret0 +} + +// Put indicates an expected call of Put. +func (mr *MockCacheMockRecorder) Put(key, value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockCache)(nil).Put), key, value) +} + +// PutIfNotExist mocks base method. +func (m *MockCache) PutIfNotExist(key, value interface{}) (interface{}, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PutIfNotExist", key, value) + ret0, _ := ret[0].(interface{}) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PutIfNotExist indicates an expected call of PutIfNotExist. +func (mr *MockCacheMockRecorder) PutIfNotExist(key, value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutIfNotExist", reflect.TypeOf((*MockCache)(nil).PutIfNotExist), key, value) +} + +// Release mocks base method. +func (m *MockCache) Release(key interface{}) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Release", key) +} + +// Release indicates an expected call of Release. +func (mr *MockCacheMockRecorder) Release(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Release", reflect.TypeOf((*MockCache)(nil).Release), key) +} + +// Size mocks base method. +func (m *MockCache) Size() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Size") + ret0, _ := ret[0].(int) + return ret0 +} + +// Size indicates an expected call of Size. +func (mr *MockCacheMockRecorder) Size() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Size", reflect.TypeOf((*MockCache)(nil).Size)) +} diff --git a/service/history/execution/cache_test.go b/service/history/execution/cache_test.go index 2e714cbb167..4e9d1cf2b0a 100644 --- a/service/history/execution/cache_test.go +++ b/service/history/execution/cache_test.go @@ -45,7 +45,7 @@ type ( controller *gomock.Controller mockShard *shard.TestContext - cache *Cache + cache Cache } ) diff --git a/service/history/ndc/activity_replicator.go b/service/history/ndc/activity_replicator.go index 973d6e38e7d..b68903de79c 100644 --- a/service/history/ndc/activity_replicator.go +++ b/service/history/ndc/activity_replicator.go @@ -52,7 +52,7 @@ type ( } activityReplicatorImpl struct { - executionCache *execution.Cache + executionCache execution.Cache clusterMetadata cluster.Metadata logger log.Logger } @@ -63,7 +63,7 @@ var _ ActivityReplicator = (*activityReplicatorImpl)(nil) // NewActivityReplicator creates activity replicator func NewActivityReplicator( shard shard.Context, - executionCache *execution.Cache, + executionCache execution.Cache, logger log.Logger, ) ActivityReplicator { diff --git a/service/history/ndc/activity_replicator_test.go b/service/history/ndc/activity_replicator_test.go index 3900071fa79..9be82d20910 100644 --- a/service/history/ndc/activity_replicator_test.go +++ b/service/history/ndc/activity_replicator_test.go @@ -61,7 +61,7 @@ type ( mockExecutionMgr *mocks.ExecutionManager logger log.Logger - executionCache *execution.Cache + executionCache execution.Cache activityReplicator ActivityReplicator } diff --git a/service/history/ndc/history_replicator.go b/service/history/ndc/history_replicator.go index 1017b8c2a12..8220ad218ef 100644 --- a/service/history/ndc/history_replicator.go +++ b/service/history/ndc/history_replicator.go @@ -59,7 +59,7 @@ type ( historySerializer persistence.PayloadSerializer metricsClient metrics.Client domainCache cache.DomainCache - executionCache *execution.Cache + executionCache execution.Cache eventsReapplier EventsReapplier transactionManager transactionManager logger log.Logger @@ -118,7 +118,7 @@ var errPanic = errors.NewInternalFailureError("encounter panic") // NewHistoryReplicator creates history replicator func NewHistoryReplicator( shard shard.Context, - executionCache *execution.Cache, + executionCache execution.Cache, eventsReapplier EventsReapplier, logger log.Logger, ) HistoryReplicator { diff --git a/service/history/ndc/transaction_manager.go b/service/history/ndc/transaction_manager.go index 3e35a9e6a4a..145718d1c69 100644 --- a/service/history/ndc/transaction_manager.go +++ b/service/history/ndc/transaction_manager.go @@ -145,7 +145,7 @@ type ( transactionManagerImpl struct { shard shard.Context - executionCache *execution.Cache + executionCache execution.Cache clusterMetadata cluster.Metadata historyV2Manager persistence.HistoryManager serializer persistence.PayloadSerializer @@ -163,7 +163,7 @@ var _ transactionManager = (*transactionManagerImpl)(nil) func newTransactionManager( shard shard.Context, - executionCache *execution.Cache, + executionCache execution.Cache, eventsReapplier EventsReapplier, logger log.Logger, ) *transactionManagerImpl { diff --git a/service/history/queue/cross_cluster_queue_processor.go b/service/history/queue/cross_cluster_queue_processor.go index 2a4b17d337e..f299f4151ae 100644 --- a/service/history/queue/cross_cluster_queue_processor.go +++ b/service/history/queue/cross_cluster_queue_processor.go @@ -57,7 +57,7 @@ type ( func NewCrossClusterQueueProcessor( shard shard.Context, historyEngine engine.Engine, - executionCache *execution.Cache, + executionCache execution.Cache, taskProcessor task.Processor, ) Processor { logger := shard.GetLogger().WithTags(tag.ComponentCrossClusterQueueProcessor) diff --git a/service/history/queue/cross_cluster_queue_processor_base.go b/service/history/queue/cross_cluster_queue_processor_base.go index 45f4c8ccb53..2f431218712 100644 --- a/service/history/queue/cross_cluster_queue_processor_base.go +++ b/service/history/queue/cross_cluster_queue_processor_base.go @@ -75,7 +75,7 @@ type ( func newCrossClusterQueueProcessorBase( shard shard.Context, clusterName string, - executionCache *execution.Cache, + executionCache execution.Cache, taskProcessor task.Processor, taskExecutor task.Executor, logger log.Logger, @@ -116,7 +116,7 @@ func newCrossClusterQueueProcessorBase( func newCrossClusterQueueProcessorBaseHelper( shard shard.Context, targetCluster string, - executionCache *execution.Cache, + executionCache execution.Cache, processingQueueStates []ProcessingQueueState, taskProcessor task.Processor, options *queueProcessorOptions, diff --git a/service/history/queue/factory.go b/service/history/queue/factory.go index f911d4a45af..5379e043c00 100644 --- a/service/history/queue/factory.go +++ b/service/history/queue/factory.go @@ -39,7 +39,7 @@ type ProcessorFactory interface { shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, - executionCache *execution.Cache, + executionCache execution.Cache, workflowResetter reset.WorkflowResetter, archivalClient archiver.Client, executionCheck invariant.Invariant, @@ -51,7 +51,7 @@ type ProcessorFactory interface { shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, - executionCache *execution.Cache, + executionCache execution.Cache, archivalClient archiver.Client, executionCheck invariant.Invariant, ) Processor @@ -59,7 +59,7 @@ type ProcessorFactory interface { NewCrossClusterQueueProcessor( shard shard.Context, historyEngine engine.Engine, - executionCache *execution.Cache, + executionCache execution.Cache, taskProcessor task.Processor, ) Processor } @@ -75,7 +75,7 @@ func (f *factoryImpl) NewTransferQueueProcessor( shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, - executionCache *execution.Cache, + executionCache execution.Cache, workflowResetter reset.WorkflowResetter, archivalClient archiver.Client, executionCheck invariant.Invariant, @@ -99,7 +99,7 @@ func (f *factoryImpl) NewTimerQueueProcessor( shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, - executionCache *execution.Cache, + executionCache execution.Cache, archivalClient archiver.Client, executionCheck invariant.Invariant, ) Processor { @@ -116,7 +116,7 @@ func (f *factoryImpl) NewTimerQueueProcessor( func (f *factoryImpl) NewCrossClusterQueueProcessor( shard shard.Context, historyEngine engine.Engine, - executionCache *execution.Cache, + executionCache execution.Cache, taskProcessor task.Processor, ) Processor { return NewCrossClusterQueueProcessor( diff --git a/service/history/queue/factory_mock.go b/service/history/queue/factory_mock.go index ff321e2089a..e885cf30178 100644 --- a/service/history/queue/factory_mock.go +++ b/service/history/queue/factory_mock.go @@ -66,7 +66,7 @@ func (m *MockProcessorFactory) EXPECT() *MockProcessorFactoryMockRecorder { } // NewCrossClusterQueueProcessor mocks base method. -func (m *MockProcessorFactory) NewCrossClusterQueueProcessor(shard shard.Context, historyEngine engine.Engine, executionCache *execution.Cache, taskProcessor task.Processor) Processor { +func (m *MockProcessorFactory) NewCrossClusterQueueProcessor(shard shard.Context, historyEngine engine.Engine, executionCache execution.Cache, taskProcessor task.Processor) Processor { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "NewCrossClusterQueueProcessor", shard, historyEngine, executionCache, taskProcessor) ret0, _ := ret[0].(Processor) @@ -80,7 +80,7 @@ func (mr *MockProcessorFactoryMockRecorder) NewCrossClusterQueueProcessor(shard, } // NewTimerQueueProcessor mocks base method. -func (m *MockProcessorFactory) NewTimerQueueProcessor(shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, executionCache *execution.Cache, archivalClient archiver.Client, executionCheck invariant.Invariant) Processor { +func (m *MockProcessorFactory) NewTimerQueueProcessor(shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, executionCache execution.Cache, archivalClient archiver.Client, executionCheck invariant.Invariant) Processor { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "NewTimerQueueProcessor", shard, historyEngine, taskProcessor, executionCache, archivalClient, executionCheck) ret0, _ := ret[0].(Processor) @@ -94,7 +94,7 @@ func (mr *MockProcessorFactoryMockRecorder) NewTimerQueueProcessor(shard, histor } // NewTransferQueueProcessor mocks base method. -func (m *MockProcessorFactory) NewTransferQueueProcessor(shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, executionCache *execution.Cache, workflowResetter reset.WorkflowResetter, archivalClient archiver.Client, executionCheck invariant.Invariant, wfIDCache workflowcache.WFCache, ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter) Processor { +func (m *MockProcessorFactory) NewTransferQueueProcessor(shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, executionCache execution.Cache, workflowResetter reset.WorkflowResetter, archivalClient archiver.Client, executionCheck invariant.Invariant, wfIDCache workflowcache.WFCache, ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter) Processor { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "NewTransferQueueProcessor", shard, historyEngine, taskProcessor, executionCache, workflowResetter, archivalClient, executionCheck, wfIDCache, ratelimitInternalPerWorkflowID) ret0, _ := ret[0].(Processor) diff --git a/service/history/queue/timer_queue_processor.go b/service/history/queue/timer_queue_processor.go index 8890b7916c7..7ab0eaf3c61 100644 --- a/service/history/queue/timer_queue_processor.go +++ b/service/history/queue/timer_queue_processor.go @@ -75,7 +75,7 @@ func NewTimerQueueProcessor( shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, - executionCache *execution.Cache, + executionCache execution.Cache, archivalClient archiver.Client, executionCheck invariant.Invariant, ) Processor { diff --git a/service/history/queue/transfer_queue_processor.go b/service/history/queue/transfer_queue_processor.go index b548649cfef..02af6cf2e2e 100644 --- a/service/history/queue/transfer_queue_processor.go +++ b/service/history/queue/transfer_queue_processor.go @@ -86,7 +86,7 @@ func NewTransferQueueProcessor( shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, - executionCache *execution.Cache, + executionCache execution.Cache, workflowResetter reset.WorkflowResetter, archivalClient archiver.Client, executionCheck invariant.Invariant, diff --git a/service/history/replication/task_hydrator.go b/service/history/replication/task_hydrator.go index f96d1e360b6..a477a7d46ed 100644 --- a/service/history/replication/task_hydrator.go +++ b/service/history/replication/task_hydrator.go @@ -70,7 +70,7 @@ func NewImmediateTaskHydrator(isRunning bool, vh *persistence.VersionHistories, // NewDeferredTaskHydrator will enrich replication tasks with additional information that is not available on hand, // but is rather loaded in a deferred way later from a database and cache. -func NewDeferredTaskHydrator(shardID int, historyManager persistence.HistoryManager, executionCache *execution.Cache, domains domainCache) TaskHydrator { +func NewDeferredTaskHydrator(shardID int, historyManager persistence.HistoryManager, executionCache execution.Cache, domains domainCache) TaskHydrator { return TaskHydrator{ history: historyLoader{shardID, historyManager, domains}, msProvider: mutableStateLoader{executionCache}, @@ -262,7 +262,7 @@ func (h historyLoader) getEventsBlob(ctx context.Context, domainID string, branc // mutableStateLoader uses workflow execution cache to load mutable state type mutableStateLoader struct { - cache *execution.Cache + cache execution.Cache } func (l mutableStateLoader) GetMutableState(ctx context.Context, domainID, workflowID, runID string) (mutableState, execution.ReleaseFunc, error) { diff --git a/service/history/reset/resetter.go b/service/history/reset/resetter.go index d7db70980e3..9720f68f2e3 100644 --- a/service/history/reset/resetter.go +++ b/service/history/reset/resetter.go @@ -64,7 +64,7 @@ type ( domainCache cache.DomainCache clusterMetadata cluster.Metadata historyV2Mgr persistence.HistoryManager - executionCache *execution.Cache + executionCache execution.Cache newStateRebuilder nDCStateRebuilderProvider logger log.Logger } @@ -77,7 +77,7 @@ var _ WorkflowResetter = (*workflowResetterImpl)(nil) // NewWorkflowResetter creates a workflow resetter func NewWorkflowResetter( shard shard.Context, - executionCache *execution.Cache, + executionCache execution.Cache, logger log.Logger, ) WorkflowResetter { return &workflowResetterImpl{ diff --git a/service/history/task/cross_cluster_source_task_executor.go b/service/history/task/cross_cluster_source_task_executor.go index b1c2dba7505..0034c4ad8e4 100644 --- a/service/history/task/cross_cluster_source_task_executor.go +++ b/service/history/task/cross_cluster_source_task_executor.go @@ -43,7 +43,7 @@ var ( type ( crossClusterSourceTaskExecutor struct { shard shard.Context - executionCache *execution.Cache + executionCache execution.Cache logger log.Logger metricsClient metrics.Client } @@ -51,7 +51,7 @@ type ( func NewCrossClusterSourceTaskExecutor( shard shard.Context, - executionCache *execution.Cache, + executionCache execution.Cache, logger log.Logger, ) Executor { return &crossClusterSourceTaskExecutor{ diff --git a/service/history/task/cross_cluster_source_task_executor_test.go b/service/history/task/cross_cluster_source_task_executor_test.go index 916e962d9ee..eccc75ccd66 100644 --- a/service/history/task/cross_cluster_source_task_executor_test.go +++ b/service/history/task/cross_cluster_source_task_executor_test.go @@ -56,7 +56,7 @@ type ( mockDomainCache *cache.MockDomainCache mockExecutionMgr *mocks.ExecutionManager mockHistoryV2Mgr *mocks.HistoryV2Manager - executionCache *execution.Cache + executionCache execution.Cache executor Executor } diff --git a/service/history/task/cross_cluster_task.go b/service/history/task/cross_cluster_task.go index 57dda12108d..5886c8eb4a9 100644 --- a/service/history/task/cross_cluster_task.go +++ b/service/history/task/cross_cluster_task.go @@ -104,7 +104,7 @@ type ( // targetDomain if the targetDomain performed a failover after // the task is loaded. targetCluster string - executionCache *execution.Cache + executionCache execution.Cache response *types.CrossClusterTaskResponse readyForPollFn func(task CrossClusterTask) } @@ -141,7 +141,7 @@ type ( func NewCrossClusterSourceTask( shard shard.Context, targetCluster string, - executionCache *execution.Cache, + executionCache execution.Cache, taskInfo Info, taskExecutor Executor, taskProcessor Processor, @@ -953,7 +953,7 @@ func (t *crossClusterTaskBase) ProcessingState() processingState { func loadWorkflowForCrossClusterTask( ctx context.Context, - executionCache *execution.Cache, + executionCache execution.Cache, taskInfo *persistence.CrossClusterTaskInfo, metricsClient metrics.Client, logger log.Logger, diff --git a/service/history/task/cross_cluster_task_test.go b/service/history/task/cross_cluster_task_test.go index 49f6a8f7786..ffbd1b77212 100644 --- a/service/history/task/cross_cluster_task_test.go +++ b/service/history/task/cross_cluster_task_test.go @@ -59,7 +59,7 @@ type ( mockExecutor *MockExecutor mockProcessor *MockProcessor mockRedispatcher *MockRedispatcher - executionCache *execution.Cache + executionCache execution.Cache } ) diff --git a/service/history/task/timer_active_task_executor.go b/service/history/task/timer_active_task_executor.go index 0abcfc0096a..f749c61705c 100644 --- a/service/history/task/timer_active_task_executor.go +++ b/service/history/task/timer_active_task_executor.go @@ -57,7 +57,7 @@ type ( func NewTimerActiveTaskExecutor( shard shard.Context, archiverClient archiver.Client, - executionCache *execution.Cache, + executionCache execution.Cache, logger log.Logger, metricsClient metrics.Client, config *config.Config, diff --git a/service/history/task/timer_active_task_executor_test.go b/service/history/task/timer_active_task_executor_test.go index 68cc97d1882..7552dbdd270 100644 --- a/service/history/task/timer_active_task_executor_test.go +++ b/service/history/task/timer_active_task_executor_test.go @@ -65,7 +65,7 @@ type ( mockExecutionMgr *mocks.ExecutionManager mockHistoryV2Mgr *mocks.HistoryV2Manager - executionCache *execution.Cache + executionCache execution.Cache logger log.Logger domainID string domain string diff --git a/service/history/task/timer_standby_task_executor.go b/service/history/task/timer_standby_task_executor.go index 51ac4102fd0..c937eb60674 100644 --- a/service/history/task/timer_standby_task_executor.go +++ b/service/history/task/timer_standby_task_executor.go @@ -56,7 +56,7 @@ type ( func NewTimerStandbyTaskExecutor( shard shard.Context, archiverClient archiver.Client, - executionCache *execution.Cache, + executionCache execution.Cache, historyResender ndc.HistoryResender, logger log.Logger, metricsClient metrics.Client, diff --git a/service/history/task/timer_task_executor_base.go b/service/history/task/timer_task_executor_base.go index 6474a2d55bb..7fd54edc9c8 100644 --- a/service/history/task/timer_task_executor_base.go +++ b/service/history/task/timer_task_executor_base.go @@ -45,7 +45,7 @@ type ( timerTaskExecutorBase struct { shard shard.Context archiverClient archiver.Client - executionCache *execution.Cache + executionCache execution.Cache logger log.Logger metricsClient metrics.Client config *config.Config @@ -58,7 +58,7 @@ type ( func newTimerTaskExecutorBase( shard shard.Context, archiverClient archiver.Client, - executionCache *execution.Cache, + executionCache execution.Cache, logger log.Logger, metricsClient metrics.Client, config *config.Config, diff --git a/service/history/task/transfer_active_task_executor.go b/service/history/task/transfer_active_task_executor.go index 802bbd646c6..77b2ff9a98b 100644 --- a/service/history/task/transfer_active_task_executor.go +++ b/service/history/task/transfer_active_task_executor.go @@ -85,7 +85,7 @@ type ( func NewTransferActiveTaskExecutor( shard shard.Context, archiverClient archiver.Client, - executionCache *execution.Cache, + executionCache execution.Cache, workflowResetter reset.WorkflowResetter, logger log.Logger, config *config.Config, diff --git a/service/history/task/transfer_standby_task_executor.go b/service/history/task/transfer_standby_task_executor.go index 1e4f10ec54f..8b1ff037d11 100644 --- a/service/history/task/transfer_standby_task_executor.go +++ b/service/history/task/transfer_standby_task_executor.go @@ -51,7 +51,7 @@ type ( func NewTransferStandbyTaskExecutor( shard shard.Context, archiverClient archiver.Client, - executionCache *execution.Cache, + executionCache execution.Cache, historyResender ndc.HistoryResender, logger log.Logger, clusterName string, diff --git a/service/history/task/transfer_task_executor_base.go b/service/history/task/transfer_task_executor_base.go index fea9305576a..94c8fda52a0 100644 --- a/service/history/task/transfer_task_executor_base.go +++ b/service/history/task/transfer_task_executor_base.go @@ -51,7 +51,7 @@ type ( transferTaskExecutorBase struct { shard shard.Context archiverClient archiver.Client - executionCache *execution.Cache + executionCache execution.Cache logger log.Logger metricsClient metrics.Client matchingClient matching.Client @@ -64,7 +64,7 @@ type ( func newTransferTaskExecutorBase( shard shard.Context, archiverClient archiver.Client, - executionCache *execution.Cache, + executionCache execution.Cache, logger log.Logger, config *config.Config, ) *transferTaskExecutorBase { diff --git a/service/history/workflow/util.go b/service/history/workflow/util.go index 7d9c3107eb9..f853b1bdd5b 100644 --- a/service/history/workflow/util.go +++ b/service/history/workflow/util.go @@ -52,7 +52,7 @@ var ( func LoadOnce( ctx context.Context, - cache *execution.Cache, + cache execution.Cache, domainID string, workflowID string, runID string, @@ -81,7 +81,7 @@ func LoadOnce( func Load( ctx context.Context, - cache *execution.Cache, + cache execution.Cache, executionManager persistence.ExecutionManager, domainID string, domainName string, @@ -134,7 +134,7 @@ func Load( // If the update should always be applied to the current run, use UpdateCurrentWithActionFunc instead. func UpdateWithActionFunc( ctx context.Context, - cache *execution.Cache, + cache execution.Cache, domainID string, execution types.WorkflowExecution, now time.Time, @@ -155,7 +155,7 @@ func UpdateWithActionFunc( // This function is suitable for the case when the change should always be applied to the current execution. func UpdateCurrentWithActionFunc( ctx context.Context, - cache *execution.Cache, + cache execution.Cache, executionManager persistence.ExecutionManager, domainID string, domainCache cache.DomainCache, @@ -180,7 +180,7 @@ func UpdateCurrentWithActionFunc( // TODO: deprecate and use UpdateWithActionFunc func UpdateWithAction( ctx context.Context, - cache *execution.Cache, + cache execution.Cache, domainID string, execution types.WorkflowExecution, createDecisionTask bool,