Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Splitting wfCacheEnabled config for internal and external requests #5647

Merged
merged 6 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1715,7 +1715,13 @@ const (
// Value type: Bool
sankari165 marked this conversation as resolved.
Show resolved Hide resolved
// Default value: false
// Allowed filters: DomainName
WorkflowIDCacheEnabled
WorkflowIDCacheExternalEnabled
// WorkflowIDCacheInternalEnabled is the key to enable/disable caching of workflowID specific information for internal requests
// KeyName: history.workflowIDCacheInternalEnabled
// Value type: Bool
// Default value: false
// Allowed filters: DomainName
WorkflowIDCacheInternalEnabled
// AllowArchivingIncompleteHistory will continue on when seeing some error like history mutated(usually caused by database consistency issues)
// KeyName: worker.AllowArchivingIncompleteHistory
// Value type: Bool
Expand Down Expand Up @@ -4177,10 +4183,16 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "Enable TaskValidation",
DefaultValue: false,
},
WorkflowIDCacheEnabled: DynamicBool{
KeyName: "history.workflowIDCacheEnabled",
WorkflowIDCacheExternalEnabled: DynamicBool{
KeyName: "history.workflowIDCacheExternalEnabled",
Filters: []Filter{DomainName},
Description: "WorkflowIDCacheExternalEnabled is the key to enable/disable caching of workflowID specific information for external requests",
DefaultValue: false,
},
WorkflowIDCacheInternalEnabled: DynamicBool{
KeyName: "history.workflowIDCacheInternalEnabled",
Filters: []Filter{DomainName},
Description: "WorkflowIDCacheEnabled is the key to enable/disable caching of workflowID specific information",
Description: "WorkflowIDCacheInternalEnabled is the key to enable/disable caching of workflowID specific information for internal requests",
DefaultValue: false,
},
}
Expand Down
14 changes: 8 additions & 6 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,10 @@ type Config struct {
EnableRecordWorkflowExecutionUninitialized dynamicconfig.BoolPropertyFnWithDomainFilter

// The following are used by the history workflowID cache
WorkflowIDCacheEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
WorkflowIDExternalRPS dynamicconfig.IntPropertyFnWithDomainFilter
WorkflowIDInternalRPS dynamicconfig.IntPropertyFnWithDomainFilter
WorkflowIDCacheExternalEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
WorkflowIDCacheInternalEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
WorkflowIDExternalRPS dynamicconfig.IntPropertyFnWithDomainFilter
WorkflowIDInternalRPS dynamicconfig.IntPropertyFnWithDomainFilter

// The following are used by consistent query
EnableConsistentQuery dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -552,9 +553,10 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s
EnableReplicationTaskGeneration: dc.GetBoolPropertyFilteredByDomainIDAndWorkflowID(dynamicconfig.EnableReplicationTaskGeneration),
EnableRecordWorkflowExecutionUninitialized: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableRecordWorkflowExecutionUninitialized),

WorkflowIDCacheEnabled: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.WorkflowIDCacheEnabled),
WorkflowIDExternalRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.WorkflowIDExternalRPS),
WorkflowIDInternalRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.WorkflowIDInternalRPS),
WorkflowIDCacheExternalEnabled: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.WorkflowIDCacheExternalEnabled),
WorkflowIDCacheInternalEnabled: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.WorkflowIDCacheInternalEnabled),
WorkflowIDExternalRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.WorkflowIDExternalRPS),
WorkflowIDInternalRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.WorkflowIDInternalRPS),

EnableConsistentQuery: dc.GetBoolProperty(dynamicconfig.EnableConsistentQuery),
EnableConsistentQueryByDomain: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableConsistentQueryByDomain),
Expand Down
17 changes: 9 additions & 8 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,15 @@ func NewHandler(
tokenSerializer: common.NewJSONTaskTokenSerializer(),
rateLimiter: quotas.NewDynamicRateLimiter(config.RPS.AsFloat64()),
workflowIDCache: workflowcache.New(workflowcache.Params{
TTL: workflowIDCacheTTL,
ExternalLimiterFactory: quotas.NewSimpleDynamicRateLimiterFactory(config.WorkflowIDExternalRPS),
InternalLimiterFactory: quotas.NewSimpleDynamicRateLimiterFactory(config.WorkflowIDInternalRPS),
WorkflowIDCacheEnabled: config.WorkflowIDCacheEnabled,
MaxCount: workflowIDCacheMaxCount,
DomainCache: resource.GetDomainCache(),
Logger: resource.GetLogger(),
MetricsClient: resource.GetMetricsClient(),
TTL: workflowIDCacheTTL,
ExternalLimiterFactory: quotas.NewSimpleDynamicRateLimiterFactory(config.WorkflowIDExternalRPS),
InternalLimiterFactory: quotas.NewSimpleDynamicRateLimiterFactory(config.WorkflowIDInternalRPS),
WorkflowIDCacheExternalEnabled: config.WorkflowIDCacheExternalEnabled,
WorkflowIDCacheInternalEnabled: config.WorkflowIDCacheInternalEnabled,
MaxCount: workflowIDCacheMaxCount,
DomainCache: resource.GetDomainCache(),
Logger: resource.GetLogger(),
MetricsClient: resource.GetMetricsClient(),
}),
}

Expand Down
54 changes: 31 additions & 23 deletions service/history/workflowcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ type WFCache interface {
}

type wfCache struct {
lru cache.Cache
externalLimiterFactory quotas.LimiterFactory
internalLimiterFactory quotas.LimiterFactory
workflowIDCacheEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
domainCache cache.DomainCache
metricsClient metrics.Client
logger log.Logger
getCacheItemFn func(domainName string, workflowID string) (*cacheValue, error)
lru cache.Cache
externalLimiterFactory quotas.LimiterFactory
internalLimiterFactory quotas.LimiterFactory
workflowIDCacheExternalEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
workflowIDCacheInternalEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
domainCache cache.DomainCache
metricsClient metrics.Client
logger log.Logger
getCacheItemFn func(domainName string, workflowID string) (*cacheValue, error)
}

type cacheKey struct {
Expand All @@ -69,14 +70,15 @@ type cacheValue struct {

// Params is the parameters for a new WFCache
type Params struct {
TTL time.Duration
MaxCount int
ExternalLimiterFactory quotas.LimiterFactory
InternalLimiterFactory quotas.LimiterFactory
WorkflowIDCacheEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
DomainCache cache.DomainCache
MetricsClient metrics.Client
Logger log.Logger
TTL time.Duration
MaxCount int
ExternalLimiterFactory quotas.LimiterFactory
InternalLimiterFactory quotas.LimiterFactory
WorkflowIDCacheExternalEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
WorkflowIDCacheInternalEnabled dynamicconfig.BoolPropertyFnWithDomainFilter
DomainCache cache.DomainCache
MetricsClient metrics.Client
Logger log.Logger
}

// New creates a new WFCache
Expand All @@ -88,12 +90,13 @@ func New(params Params) WFCache {
MaxCount: params.MaxCount,
ActivelyEvict: true,
}),
externalLimiterFactory: params.ExternalLimiterFactory,
internalLimiterFactory: params.InternalLimiterFactory,
workflowIDCacheEnabled: params.WorkflowIDCacheEnabled,
domainCache: params.DomainCache,
metricsClient: params.MetricsClient,
logger: params.Logger,
externalLimiterFactory: params.ExternalLimiterFactory,
internalLimiterFactory: params.InternalLimiterFactory,
workflowIDCacheExternalEnabled: params.WorkflowIDCacheExternalEnabled,
workflowIDCacheInternalEnabled: params.WorkflowIDCacheInternalEnabled,
domainCache: params.DomainCache,
metricsClient: params.MetricsClient,
logger: params.Logger,
}
// We set getCacheItemFn to cache.getCacheItem so that we can mock it in unit tests
cache.getCacheItemFn = cache.getCacheItem
Expand All @@ -116,7 +119,7 @@ func (c *wfCache) allow(domainID string, workflowID string, rateLimitType rateLi
return true
}

if !c.workflowIDCacheEnabled(domainName) {
if !c.isWfCacheEnabled(rateLimitType, domainName) {
// The cache is not enabled, so we allow the call through
return true
}
Expand Down Expand Up @@ -153,6 +156,11 @@ func (c *wfCache) allow(domainID string, workflowID string, rateLimitType rateLi
}
}

func (c *wfCache) isWfCacheEnabled(rateLimitType rateLimitType, domainName string) bool {
return rateLimitType == external && c.workflowIDCacheExternalEnabled(domainName) ||
rateLimitType == internal && c.workflowIDCacheInternalEnabled(domainName)
}

func (c *wfCache) emitRateLimitMetrics(domainID string, workflowID string, domainName string, callType string, metric int) {
c.metricsClient.Scope(metrics.HistoryClientWfIDCacheScope, metrics.DomainTag(domainName)).IncCounter(metric)
c.logger.Info(
Expand Down
161 changes: 111 additions & 50 deletions service/history/workflowcache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,15 @@ func TestWfCache_AllowSingleWorkflow(t *testing.T) {

wfCache := New(Params{
// The cache TTL is set to 1 minute, so all requests will hit the cache
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheEnabled: func(domain string) bool { return true },
Logger: log.NewNoop(),
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheExternalEnabled: func(domain string) bool { return true },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return true },
Logger: log.NewNoop(),
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
})

assert.True(t, wfCache.AllowExternal(testDomainID, testWorkflowID))
Expand Down Expand Up @@ -115,14 +116,15 @@ func TestWfCache_AllowMultipleWorkflow(t *testing.T) {
internalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(internalLimiterWf2).Times(1)

wfCache := New(Params{
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheEnabled: func(domain string) bool { return true },
Logger: log.NewNoop(),
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheExternalEnabled: func(domain string) bool { return true },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return true },
Logger: log.NewNoop(),
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
})

assert.True(t, wfCache.AllowExternal(testDomainID, testWorkflowID))
Expand Down Expand Up @@ -155,14 +157,15 @@ func TestWfCache_AllowError(t *testing.T) {

// Setup the cache, we do not need the factories, as we will mock the getCacheItemFn
wfCache := New(Params{
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: nil,
InternalLimiterFactory: nil,
WorkflowIDCacheEnabled: func(domain string) bool { return true },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: nil,
InternalLimiterFactory: nil,
WorkflowIDCacheExternalEnabled: func(domain string) bool { return true },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return true },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
}).(*wfCache)

// We set getCacheItemFn to a function that will return an error so that we can test the error logic
Expand Down Expand Up @@ -201,14 +204,15 @@ func TestWfCache_AllowDomainCacheError(t *testing.T) {

// Setup the cache, we do not need the factories, as we will mock the getCacheItemFn
wfCache := New(Params{
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: nil,
InternalLimiterFactory: nil,
WorkflowIDCacheEnabled: func(domain string) bool { return true },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: nil,
InternalLimiterFactory: nil,
WorkflowIDCacheExternalEnabled: func(domain string) bool { return true },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return true },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
})

// We fail open
Expand All @@ -219,30 +223,86 @@ func TestWfCache_AllowDomainCacheError(t *testing.T) {
logger.AssertExpectations(t)
}

// TestWfCache_CacheDisabled tests that the cache will allow requests through if it is disabled
func TestWfCache_CacheDisabled(t *testing.T) {
// TestWfCache_CacheExternalDisabled tests that the cache will allow requests only for the requests where it is enabled
func TestWfCache_CacheExternalDisabled(t *testing.T) {
ctrl := gomock.NewController(t)

domainCache := cache.NewMockDomainCache(ctrl)
domainCache.EXPECT().GetDomainName(testDomainID).Return(testDomainName, nil).Times(2)

// Setup the mock logger
logger := new(log.MockLogger)
expectRatelimitLog(logger, "internal")

externalLimiterFactory := quotas.NewMockLimiterFactory(ctrl)
externalLimiter := quotas.NewMockLimiter(ctrl)
externalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(externalLimiter).Times(1)

internalLimiter := quotas.NewMockLimiter(ctrl)
internalLimiterFactory := quotas.NewMockLimiterFactory(ctrl)
internalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(internalLimiter).Times(1)

internalLimiter.EXPECT().Allow().Return(false).Times(1)

// Setup the cache, we do not need the factories, as we will mock the getCacheItemFn
sankari165 marked this conversation as resolved.
Show resolved Hide resolved
wfCache := New(Params{
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: nil,
InternalLimiterFactory: nil,
WorkflowIDCacheEnabled: func(domain string) bool { return false },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheExternalEnabled: func(domain string) bool { return false },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return true },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
})

// We fail open
assert.True(t, wfCache.AllowExternal(testDomainID, testWorkflowID))

// We use cache
assert.False(t, wfCache.AllowInternal(testDomainID, testWorkflowID))

// We log the error
logger.AssertExpectations(t)
}

// TestWfCache_CacheInternalDisabled tests that the cache will allow requests only for the requests where it is enabled
func TestWfCache_CacheInternalDisabled(t *testing.T) {
ctrl := gomock.NewController(t)

domainCache := cache.NewMockDomainCache(ctrl)
domainCache.EXPECT().GetDomainName(testDomainID).Return(testDomainName, nil).Times(2)

// Setup the mock logger
logger := new(log.MockLogger)
expectRatelimitLog(logger, "external")

externalLimiterFactory := quotas.NewMockLimiterFactory(ctrl)
externalLimiter := quotas.NewMockLimiter(ctrl)
externalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(externalLimiter).Times(1)
externalLimiter.EXPECT().Allow().Return(false).Times(1)

internalLimiter := quotas.NewMockLimiter(ctrl)
internalLimiterFactory := quotas.NewMockLimiterFactory(ctrl)
internalLimiterFactory.EXPECT().GetLimiter(testDomainName).Return(internalLimiter).Times(1)

// Setup the cache, we do not need the factories, as we will mock the getCacheItemFn
sankari165 marked this conversation as resolved.
Show resolved Hide resolved
wfCache := New(Params{
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheExternalEnabled: func(domain string) bool { return true },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return false },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
})

// We use cache
assert.False(t, wfCache.AllowExternal(testDomainID, testWorkflowID))
// We fail open
assert.True(t, wfCache.AllowInternal(testDomainID, testWorkflowID))

// We log the error
Expand Down Expand Up @@ -276,14 +336,15 @@ func TestWfCache_RejectLog(t *testing.T) {
expectRatelimitLog(logger, "internal")

wfCache := New(Params{
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheEnabled: func(domain string) bool { return true },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
TTL: time.Minute,
MaxCount: 1_000,
ExternalLimiterFactory: externalLimiterFactory,
InternalLimiterFactory: internalLimiterFactory,
WorkflowIDCacheExternalEnabled: func(domain string) bool { return true },
WorkflowIDCacheInternalEnabled: func(domain string) bool { return true },
Logger: logger,
DomainCache: domainCache,
MetricsClient: metrics.NewNoopMetricsClient(),
})

assert.False(t, wfCache.AllowExternal(testDomainID, testWorkflowID))
Expand Down