diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 1321c04b039..33d3d5ad3e4 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1825,6 +1825,12 @@ const ( // Default value: false // Allowed filters: N/A EnableESAnalyzer + // EnableAsyncWorkflowConsumption decides whether to enable system workers for processing async workflows + // KeyName: system.enableAsyncWorkflowConsumption + // Value type: Bool + // Default value: false + // Allowed filters: N/A + EnableAsyncWorkflowConsumption // EnableStickyQuery indicates if sticky query should be enabled per domain // KeyName: system.enableStickyQuery @@ -4063,6 +4069,11 @@ var BoolKeys = map[BoolKey]DynamicBool{ Description: "EnableESAnalyzer decides whether to enable system workers for processing ElasticSearch Analyzer", DefaultValue: false, }, + EnableAsyncWorkflowConsumption: DynamicBool{ + KeyName: "worker.enableAsyncWorkflowConsumption", + Description: "EnableAsyncWorkflowConsumption decides whether to enable async workflows", + DefaultValue: false, + }, EnableStickyQuery: DynamicBool{ KeyName: "system.enableStickyQuery", Filters: []Filter{DomainName}, diff --git a/common/log/tag/values.go b/common/log/tag/values.go index a52f1b0fd98..a421d8ea976 100644 --- a/common/log/tag/values.go +++ b/common/log/tag/values.go @@ -133,6 +133,7 @@ var ( ComponentShardScanner = component("shardscanner-scanner") ComponentShardFixer = component("shardscanner-fixer") ComponentPinotVisibilityManager = component("pinot-visibility-manager") + ComponentAsyncWFConsumptionManager = component("async-wf-consumption-manager") ) // Pre-defined values for TagSysLifecycle diff --git a/common/messaging/noop_consumer.go b/common/messaging/noop_consumer.go new file mode 100644 index 00000000000..81e9176060c --- /dev/null +++ b/common/messaging/noop_consumer.go @@ -0,0 +1,40 @@ +// The MIT License (MIT) +// +// Copyright (c) 2017-2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package messaging + +type noopConsumer struct{} + +// NewNoopProducer returns a no-op message consumer +func NewNoopConsumer() Consumer { + return &noopConsumer{} +} + +func (c *noopConsumer) Start() error { + return nil +} + +func (c *noopConsumer) Stop() {} + +func (c *noopConsumer) Messages() <-chan Message { + return nil +} diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 5686056fddf..6936cc3be6b 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -1341,6 +1341,8 @@ const ( CheckDataCorruptionWorkflowScope // ESAnalyzerScope is scope used by ElasticSearch Analyzer (esanalyzer) workflow ESAnalyzerScope + // AsyncWorkflowConsumerScope is scope used by async workflow consumer + AsyncWorkflowConsumerScope NumWorkerScopes ) @@ -1954,6 +1956,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ BatcherScope: {operation: "batcher"}, ParentClosePolicyProcessorScope: {operation: "ParentClosePolicyProcessor"}, ESAnalyzerScope: {operation: "ESAnalyzer"}, + AsyncWorkflowConsumerScope: {operation: "AsyncWorkflowConsumer"}, }, } @@ -2565,6 +2568,7 @@ const ( ESAnalyzerNumStuckWorkflowsRefreshed ESAnalyzerNumStuckWorkflowsFailedToRefresh ESAnalyzerNumLongRunningWorkflows + AsyncWorkflowConsumerCount NumWorkerMetrics ) @@ -3183,6 +3187,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ ESAnalyzerNumStuckWorkflowsRefreshed: {metricName: "es_analyzer_num_stuck_workflows_refreshed", metricType: Counter}, ESAnalyzerNumStuckWorkflowsFailedToRefresh: {metricName: "es_analyzer_num_stuck_workflows_failed_to_refresh", metricType: Counter}, ESAnalyzerNumLongRunningWorkflows: {metricName: "es_analyzer_num_long_running_workflows", metricType: Counter}, + AsyncWorkflowConsumerCount: {metricName: "async_workflow_consumer_count", metricType: Gauge}, }, } diff --git a/service/frontend/api/producer_manager.go b/service/frontend/api/producer_manager.go index 6fa1edcc41f..e33131c49eb 100644 --- a/service/frontend/api/producer_manager.go +++ b/service/frontend/api/producer_manager.go @@ -37,6 +37,8 @@ import ( ) type ( + // ProducerManager is used to create a producer for a domain. + // Producer is used for Async APIs such as StartWorkflowExecutionAsync ProducerManager interface { GetProducerByDomain(domain string) (messaging.Producer, error) } diff --git a/service/worker/asyncworkflow/async_workflow_consumer_manager.go b/service/worker/asyncworkflow/async_workflow_consumer_manager.go new file mode 100644 index 00000000000..de6c614a030 --- /dev/null +++ b/service/worker/asyncworkflow/async_workflow_consumer_manager.go @@ -0,0 +1,213 @@ +// The MIT License (MIT) +// +// Copyright (c) 2017-2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package asyncworkflow + +import ( + "context" + "sync" + "time" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/asyncworkflow/queue" + "github.com/uber/cadence/common/asyncworkflow/queue/provider" + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/messaging" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/types" +) + +const ( + defaultRefreshInterval = 5 * time.Minute + defaultShutdownTimeout = 5 * time.Second +) + +type ConsumerManagerOptions func(*ConsumerManager) + +func WithTimeSource(timeSrc clock.TimeSource) ConsumerManagerOptions { + return func(c *ConsumerManager) { + c.timeSrc = timeSrc + } +} + +func NewConsumerManager( + logger log.Logger, + metricsClient metrics.Client, + domainCache cache.DomainCache, + queueProvider queue.Provider, + options ...ConsumerManagerOptions, +) *ConsumerManager { + ctx, cancel := context.WithCancel(context.Background()) + cm := &ConsumerManager{ + logger: logger.WithTags(tag.ComponentAsyncWFConsumptionManager), + metricsClient: metricsClient, + domainCache: domainCache, + queueProvider: queueProvider, + refreshInterval: defaultRefreshInterval, + shutdownTimeout: defaultShutdownTimeout, + ctx: ctx, + cancelFn: cancel, + activeConsumers: make(map[string]messaging.Consumer), + timeSrc: clock.NewRealTimeSource(), + } + + for _, opt := range options { + opt(cm) + } + return cm +} + +type ConsumerManager struct { + logger log.Logger + metricsClient metrics.Client + timeSrc clock.TimeSource + domainCache cache.DomainCache + queueProvider queue.Provider + refreshInterval time.Duration + shutdownTimeout time.Duration + ctx context.Context + cancelFn context.CancelFunc + wg sync.WaitGroup + activeConsumers map[string]messaging.Consumer +} + +func (c *ConsumerManager) Start() { + c.logger.Info("Starting ConsumerManager") + c.wg.Add(1) + go c.run() +} + +func (c *ConsumerManager) Stop() { + c.logger.Info("Stopping ConsumerManager") + c.cancelFn() + c.wg.Wait() + if !common.AwaitWaitGroup(&c.wg, c.shutdownTimeout) { + c.logger.Warn("ConsumerManager timed out on shutdown", tag.Dynamic("timeout", c.shutdownTimeout)) + return + } + + for qID, consumer := range c.activeConsumers { + consumer.Stop() + c.logger.Info("Stopped consumer", tag.Dynamic("queue-id", qID)) + } + + c.logger.Info("Stopped ConsumerManager") +} + +func (c *ConsumerManager) run() { + defer c.wg.Done() + + timer := c.timeSrc.NewTimer(c.refreshInterval) + defer timer.Stop() + c.logger.Info("ConsumerManager background loop started", tag.Dynamic("refresh-interval", c.refreshInterval)) + + c.refreshConsumers() + + for { + select { + case <-timer.Chan(): + c.refreshConsumers() + case <-c.ctx.Done(): + c.logger.Info("ConsumerManager background loop stopped because context is done") + return + } + } +} + +func (c *ConsumerManager) refreshConsumers() { + domains := c.domainCache.GetAllDomain() + c.logger.Info("Refreshing consumers", tag.Dynamic("domain-count", len(domains)), tag.Dynamic("consumer-count", len(c.activeConsumers))) + refCounts := make(map[string]int, len(c.activeConsumers)) + + for _, domain := range domains { + select { + default: + case <-c.ctx.Done(): + c.logger.Info("refreshConsumers is terminating because context is done") + return + } + + // domain config is not set or async workflow config is not set + if domain.GetConfig() == nil || domain.GetConfig().AsyncWorkflowConfig == (types.AsyncWorkflowConfiguration{}) { + continue + } + + cfg := domain.GetConfig().AsyncWorkflowConfig + queue, err := c.getQueue(cfg) + if err != nil { + c.logger.Error("Failed to get queue", tag.Error(err), tag.WorkflowDomainName(domain.GetInfo().Name)) + continue + } + + if !cfg.Enabled { + // Already running active consumers for such queues will be stopped in the next loop + continue + } + + // async workflow config is enabled. check if consumer is already running + if c.activeConsumers[queue.ID()] != nil { + c.logger.Debug("Consumer already running", tag.WorkflowDomainName(domain.GetInfo().Name), tag.Dynamic("queue-id", queue.ID())) + refCounts[queue.ID()]++ + continue + } + + c.logger.Info("Starting consumer", tag.WorkflowDomainName(domain.GetInfo().Name), tag.Dynamic("queue-id", queue.ID())) + consumer, err := queue.CreateConsumer(&provider.Params{ + Logger: c.logger, + MetricsClient: c.metricsClient, + }) + if err != nil { + c.logger.Error("Failed to create consumer", tag.Error(err), tag.WorkflowDomainName(domain.GetInfo().Name), tag.Dynamic("queue-id", queue.ID())) + continue + } + + c.activeConsumers[queue.ID()] = consumer + refCounts[queue.ID()]++ + c.logger.Info("Created consumer", tag.WorkflowDomainName(domain.GetInfo().Name), tag.Dynamic("queue-id", queue.ID())) + } + + // stop consumers that are not needed + for qID, consumer := range c.activeConsumers { + if refCounts[qID] > 0 { + continue + } + + c.logger.Info("Stopping consumer because it's not needed", tag.Dynamic("queue-id", qID)) + consumer.Stop() + delete(c.activeConsumers, qID) + c.logger.Info("Stopped consumer", tag.Dynamic("queue-id", qID)) + } + + c.logger.Info("Refreshed consumers", tag.Dynamic("consumer-count", len(c.activeConsumers))) + c.metricsClient.Scope(metrics.AsyncWorkflowConsumerScope).UpdateGauge(metrics.AsyncWorkflowConsumerCount, float64(len(c.activeConsumers))) +} + +func (c *ConsumerManager) getQueue(cfg types.AsyncWorkflowConfiguration) (provider.Queue, error) { + if cfg.PredefinedQueueName != "" { + return c.queueProvider.GetPredefinedQueue(cfg.PredefinedQueueName) + } + + return c.queueProvider.GetQueue(cfg.QueueType, cfg.QueueConfig) +} diff --git a/service/worker/asyncworkflow/async_workflow_consumer_manager_test.go b/service/worker/asyncworkflow/async_workflow_consumer_manager_test.go new file mode 100644 index 00000000000..43587b6da57 --- /dev/null +++ b/service/worker/asyncworkflow/async_workflow_consumer_manager_test.go @@ -0,0 +1,343 @@ +// The MIT License (MIT) +// +// Copyright (c) 2017-2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package asyncworkflow + +import ( + "errors" + "fmt" + "sort" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/google/go-cmp/cmp" + + "github.com/uber/cadence/common/asyncworkflow/queue" + "github.com/uber/cadence/common/asyncworkflow/queue/provider" + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/messaging" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/types" +) + +type domainWithConfig struct { + name string + asyncWFCfg types.AsyncWorkflowConfiguration + failQueueCreation bool + failConsumerCreation bool +} + +func TestConsumerManager(t *testing.T) { + tests := []struct { + name string + // firstRoundDomains will be returned by the domain cache when the consumer manager starts + firstRoundDomains []domainWithConfig + // wantFirstRoundConsumers is the list of consumers that should be created after the first round + wantFirstRoundConsumers []string + // secondRoundDomains will be returned by the domain cache after the first refresh interval + secondRoundDomains []domainWithConfig + // wantSecondRoundConsumers is the list of consumers that should be created after the second round + wantSecondRoundConsumers []string + }{ + { + name: "no domains", + }, + { + name: "empty queue config", + firstRoundDomains: []domainWithConfig{ + { + name: "domain1", + asyncWFCfg: types.AsyncWorkflowConfiguration{}, + }, + }, + wantFirstRoundConsumers: []string{}, + }, + { + name: "queue creation fails", + firstRoundDomains: []domainWithConfig{ + { + name: "domain1", + asyncWFCfg: types.AsyncWorkflowConfiguration{ + Enabled: true, + PredefinedQueueName: "queue1", + }, + failQueueCreation: true, + }, + }, + wantFirstRoundConsumers: []string{}, + }, + { + name: "consumer creation fails", + firstRoundDomains: []domainWithConfig{ + { + name: "domain1", + asyncWFCfg: types.AsyncWorkflowConfiguration{ + Enabled: true, + PredefinedQueueName: "queue1", + }, + failConsumerCreation: true, + }, + }, + wantFirstRoundConsumers: []string{}, + }, + { + name: "queue disabled", + firstRoundDomains: []domainWithConfig{ + { + name: "domain1", + asyncWFCfg: types.AsyncWorkflowConfiguration{ + Enabled: false, + PredefinedQueueName: "queue1", + }, + }, + }, + wantFirstRoundConsumers: []string{}, + }, + { + name: "queue disabled in second round", + firstRoundDomains: []domainWithConfig{ + { + name: "domain1", + asyncWFCfg: types.AsyncWorkflowConfiguration{ + Enabled: true, + PredefinedQueueName: "queue1", + }, + }, + }, + wantFirstRoundConsumers: []string{"queue1"}, + secondRoundDomains: []domainWithConfig{ + { + name: "domain1", + asyncWFCfg: types.AsyncWorkflowConfiguration{ + Enabled: false, + PredefinedQueueName: "queue1", + }, + }, + }, + wantSecondRoundConsumers: []string{}, + }, + { + name: "same consumers for both rounds", + firstRoundDomains: []domainWithConfig{ + { + name: "domain1", + asyncWFCfg: types.AsyncWorkflowConfiguration{ + Enabled: true, + PredefinedQueueName: "queue1", + }, + }, + }, + wantFirstRoundConsumers: []string{"queue1"}, + secondRoundDomains: []domainWithConfig{ + { + name: "domain1", + asyncWFCfg: types.AsyncWorkflowConfiguration{ + Enabled: true, + PredefinedQueueName: "queue1", + }, + }, + }, + wantSecondRoundConsumers: []string{"queue1"}, + }, + { + name: "shared queue by multiple domains and different enable-disable states", + firstRoundDomains: []domainWithConfig{ + { + name: "domain1", + asyncWFCfg: types.AsyncWorkflowConfiguration{ + Enabled: true, + PredefinedQueueName: "shared_queue", + }, + }, + { + name: "domain2", + asyncWFCfg: types.AsyncWorkflowConfiguration{ + Enabled: false, + PredefinedQueueName: "shared_queue", + }, + }, + { + name: "domain3", + asyncWFCfg: types.AsyncWorkflowConfiguration{ + Enabled: true, + QueueType: "kafka", + QueueConfig: &types.DataBlob{ + EncodingType: types.EncodingTypeJSON.Ptr(), + Data: []byte(`{"brokers":["localhost:9092"],"topics":["test-topic"]}`), + }, + }, + }, + }, + wantFirstRoundConsumers: []string{ + "shared_queue", + `queuetype:kafka,queueconfig:{"brokers":["localhost:9092"],"topics":["test-topic"]}`, + }, + secondRoundDomains: []domainWithConfig{ + { + name: "domain1", + asyncWFCfg: types.AsyncWorkflowConfiguration{ + Enabled: true, + PredefinedQueueName: "shared_queue", + }, + }, + { + name: "domain2", + asyncWFCfg: types.AsyncWorkflowConfiguration{ + Enabled: true, + PredefinedQueueName: "shared_queue", + }, + }, + }, + wantSecondRoundConsumers: []string{"shared_queue"}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockTimeSrc := clock.NewMockedTimeSource() + mockDomainCache := cache.NewMockDomainCache(ctrl) + + // setup mocks for 2 rounds of domain cache refresh + mockQueueProvider := queue.NewMockProvider(ctrl) + for _, domainsForRound := range [][]domainWithConfig{tc.firstRoundDomains, tc.secondRoundDomains} { + mockDomainCache.EXPECT(). + GetAllDomain(). + Return(toDomainCacheEntries(domainsForRound)). + Times(1) + for _, dwc := range domainsForRound { + if dwc.asyncWFCfg == (types.AsyncWorkflowConfiguration{}) { + continue + } + + if dwc.failQueueCreation { + mockQueueProvider.EXPECT(). + GetPredefinedQueue(dwc.asyncWFCfg.PredefinedQueueName). + Return(nil, errors.New("queue creation failed")) + } else { + queueMock := provider.NewMockQueue(ctrl) + queueMock.EXPECT().ID().Return(queueID(dwc.asyncWFCfg)).AnyTimes() + + if dwc.asyncWFCfg.PredefinedQueueName != "" { + mockQueueProvider.EXPECT(). + GetPredefinedQueue(dwc.asyncWFCfg.PredefinedQueueName). + Return(queueMock, nil).AnyTimes() + } else { + mockQueueProvider.EXPECT(). + GetQueue(gomock.Any(), gomock.Any()). + Return(queueMock, nil).AnyTimes() + } + if !dwc.asyncWFCfg.Enabled { + continue + } + + if dwc.failConsumerCreation { + queueMock.EXPECT().CreateConsumer(gomock.Any()). + Return(nil, errors.New("consumer creation failed")).AnyTimes() + } else { + queueMock.EXPECT().CreateConsumer(gomock.Any()). + Return(messaging.NewNoopConsumer(), nil).AnyTimes() + } + } + } + } + + // create consumer manager + cm := NewConsumerManager( + testlogger.New(t), + metrics.NewNoopMetricsClient(), + mockDomainCache, + mockQueueProvider, + WithTimeSource(mockTimeSrc), + ) + + cm.Start() + defer cm.Stop() + + // wait for the first round of consumers to be created + time.Sleep(50 * time.Millisecond) + // verify consumers + t.Log("first round comparison") + if diff := cmpQueueIDs(cm.activeConsumers, tc.wantFirstRoundConsumers); diff != "" { + t.Fatalf("Consumer mismatch after first round (-want +got):\n%s", diff) + } + + // wait for the second round of consumers to be created + mockTimeSrc.Advance(defaultRefreshInterval) + time.Sleep(50 * time.Millisecond) + // verify consumers + t.Log("second round comparison") + if diff := cmpQueueIDs(cm.activeConsumers, tc.wantSecondRoundConsumers); diff != "" { + t.Fatalf("Consumer mismatch after second round (-want +got):\n%s", diff) + } + }) + } +} + +func toDomainCacheEntries(domains []domainWithConfig) map[string]*cache.DomainCacheEntry { + result := make(map[string]*cache.DomainCacheEntry, len(domains)) + for _, d := range domains { + result[d.name] = cache.NewGlobalDomainCacheEntryForTest( + &persistence.DomainInfo{ + Name: d.name, + }, + &persistence.DomainConfig{ + AsyncWorkflowConfig: d.asyncWFCfg, + }, + nil, + 0, + ) + } + return result +} + +func queueID(asyncWFCfg types.AsyncWorkflowConfiguration) string { + if asyncWFCfg.PredefinedQueueName != "" { + return asyncWFCfg.PredefinedQueueName + } + + if asyncWFCfg.QueueConfig == nil { + return "" + } + + return fmt.Sprintf("queuetype:%s,queueconfig:%s", asyncWFCfg.QueueType, string(asyncWFCfg.QueueConfig.Data)) +} + +func cmpQueueIDs(activeConsumers map[string]messaging.Consumer, want []string) string { + got := make([]string, 0, len(activeConsumers)) + for qID := range activeConsumers { + got = append(got, qID) + } + + if want == nil { + want = []string{} + } + + sort.Strings(got) + sort.Strings(want) + + return cmp.Diff(want, got) +} diff --git a/service/worker/service.go b/service/worker/service.go index 1f7d6c0d47c..f15599631fb 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -37,6 +37,7 @@ import ( "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/worker/archiver" + "github.com/uber/cadence/service/worker/asyncworkflow" "github.com/uber/cadence/service/worker/batcher" "github.com/uber/cadence/service/worker/esanalyzer" "github.com/uber/cadence/service/worker/failovermanager" @@ -84,17 +85,14 @@ type ( DomainReplicationMaxRetryDuration dynamicconfig.DurationPropertyFn EnableESAnalyzer dynamicconfig.BoolPropertyFn EnableWatchDog dynamicconfig.BoolPropertyFn + EnableAsyncWorkflowConsumption dynamicconfig.BoolPropertyFn HostName string } ) // NewService builds a new cadence-worker service -func NewService( - params *resource.Params, -) (resource.Resource, error) { - +func NewService(params *resource.Params) (resource.Resource, error) { serviceConfig := NewConfig(params) - serviceResource, err := resource.New( params, service.Worker, @@ -184,6 +182,7 @@ func NewConfig(params *resource.Params) *Config { PersistenceGlobalMaxQPS: dc.GetIntProperty(dynamicconfig.WorkerPersistenceGlobalMaxQPS), PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.WorkerPersistenceMaxQPS), DomainReplicationMaxRetryDuration: dc.GetDurationProperty(dynamicconfig.WorkerReplicationTaskMaxRetryDuration), + EnableAsyncWorkflowConsumption: dc.GetBoolProperty(dynamicconfig.EnableAsyncWorkflowConsumption), HostName: params.HostName, } advancedVisWritingMode := dc.GetStringProperty( @@ -244,6 +243,11 @@ func (s *Service) Start() { s.startWorkflowShadower() } + if s.config.EnableAsyncWorkflowConsumption() { + cm := s.startAsyncWorkflowConsumerManager() + defer cm.Stop() + } + logger.Info("worker started", tag.ComponentWorker) <-s.stopC } @@ -254,12 +258,14 @@ func (s *Service) Stop() { return } + s.GetLogger().Info("worker stopping", tag.ComponentWorker) + close(s.stopC) s.Resource.Stop() s.Resource.GetDomainReplicationQueue().Stop() - s.params.Logger.Info("worker stopped", tag.ComponentWorker) + s.GetLogger().Info("worker stopped", tag.ComponentWorker) } func (s *Service) startParentClosePolicyProcessor() { @@ -413,6 +419,17 @@ func (s *Service) startWorkflowShadower() { } } +func (s *Service) startAsyncWorkflowConsumerManager() common.Daemon { + cm := asyncworkflow.NewConsumerManager( + s.GetLogger(), + s.GetMetricsClient(), + s.GetDomainCache(), + s.Resource.GetAsyncWorkflowQueueProvider(), + ) + cm.Start() + return cm +} + func (s *Service) ensureDomainExists(domain string) { _, err := s.GetDomainManager().GetDomain(context.Background(), &persistence.GetDomainRequest{Name: domain}) switch err.(type) {