diff --git a/client/history/peer_resolver.go b/client/history/peer_resolver.go index 2c0c41b4e1c..14e83f34454 100644 --- a/client/history/peer_resolver.go +++ b/client/history/peer_resolver.go @@ -24,6 +24,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/service" + "github.com/uber/cadence/service/history/lookup" ) // PeerResolver is used to resolve history peers. @@ -74,8 +75,7 @@ func (pr peerResolver) FromDomainID(domainID string) (string, error) { // It uses our membership provider to lookup which instance currently owns the given shard. // FromHostAddress is used for further resolving. func (pr peerResolver) FromShardID(shardID int) (string, error) { - shardIDString := string(rune(shardID)) - host, err := pr.resolver.Lookup(service.History, shardIDString) + host, err := lookup.HistoryServerByShardID(pr.resolver, shardID) if err != nil { return "", common.ToServiceTransientError(err) } diff --git a/service/frontend/admin/handler.go b/service/frontend/admin/handler.go index 5000549ac98..b3f180b4667 100644 --- a/service/frontend/admin/handler.go +++ b/service/frontend/admin/handler.go @@ -53,6 +53,7 @@ import ( "github.com/uber/cadence/service/frontend/config" "github.com/uber/cadence/service/frontend/validate" "github.com/uber/cadence/service/history/execution" + "github.com/uber/cadence/service/history/lookup" ) const ( @@ -258,10 +259,9 @@ func (adh *adminHandlerImpl) DescribeWorkflowExecution( } shardID := common.WorkflowIDToHistoryShard(request.Execution.WorkflowID, adh.numberOfHistoryShards) - shardIDstr := string(rune(shardID)) // originally `string(int_shard_id)`, but changing it will change the ring hashing shardIDForOutput := strconv.Itoa(shardID) - historyHost, err := adh.GetMembershipResolver().Lookup(service.History, shardIDstr) + historyHost, err := lookup.HistoryServerByShardID(adh.GetMembershipResolver(), shardID) if err != nil { return nil, adh.error(err, scope) } @@ -663,16 +663,15 @@ func (adh *adminHandlerImpl) DescribeShardDistribution( _, sw := adh.startRequestProfile(ctx, metrics.AdminDescribeShardDistributionScope) defer sw.Stop() - numShards := adh.config.NumHistoryShards resp = &types.DescribeShardDistributionResponse{ - NumberOfShards: int32(numShards), + NumberOfShards: int32(adh.numberOfHistoryShards), Shards: make(map[int32]string), } offset := int(request.PageID * request.PageSize) nextPageStart := offset + int(request.PageSize) - for shardID := offset; shardID < numShards && shardID < nextPageStart; shardID++ { - info, err := adh.GetMembershipResolver().Lookup(service.History, string(rune(shardID))) + for shardID := offset; shardID < adh.numberOfHistoryShards && shardID < nextPageStart; shardID++ { + info, err := lookup.HistoryServerByShardID(adh.GetMembershipResolver(), shardID) if err != nil { resp.Shards[int32(shardID)] = "unknown" } else { diff --git a/service/frontend/admin/handler_test.go b/service/frontend/admin/handler_test.go index c91ce3d0c44..4f661d19024 100644 --- a/service/frontend/admin/handler_test.go +++ b/service/frontend/admin/handler_test.go @@ -694,6 +694,24 @@ func (s *adminHandlerSuite) Test_ConfigStore_NilRequest() { s.Error(err) } +func (s *adminHandlerSuite) Test_DescribeShardDistribution() { + s.mockResource.MembershipResolver.EXPECT().Lookup(service.History, string(rune(0))). + Return(membership.NewHostInfo("127.0.0.1:1234"), nil) + + res, err := s.handler.DescribeShardDistribution( + context.Background(), + &types.DescribeShardDistributionRequest{PageSize: 10}, + ) + s.Require().NoError(err) + s.Equal( + &types.DescribeShardDistributionResponse{ + NumberOfShards: 1, + Shards: map[int32]string{0: "127.0.0.1:1234"}, + }, + res, + ) +} + func (s *adminHandlerSuite) Test_ConfigStore_InvalidKey() { ctx := context.Background() handler := s.handler diff --git a/service/history/handler/handler.go b/service/history/handler/handler.go index cd99ef1faa6..644630a18ba 100644 --- a/service/history/handler/handler.go +++ b/service/history/handler/handler.go @@ -24,7 +24,6 @@ import ( "context" "errors" "fmt" - "strconv" "sync" "sync/atomic" "time" @@ -43,7 +42,6 @@ import ( "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/quotas" - "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" "github.com/uber/cadence/common/types/mapper/proto" "github.com/uber/cadence/service/history/config" @@ -52,6 +50,7 @@ import ( "github.com/uber/cadence/service/history/engine/engineimpl" "github.com/uber/cadence/service/history/events" "github.com/uber/cadence/service/history/failover" + "github.com/uber/cadence/service/history/lookup" "github.com/uber/cadence/service/history/replication" "github.com/uber/cadence/service/history/resource" "github.com/uber/cadence/service/history/shard" @@ -1956,13 +1955,13 @@ func (h *handlerImpl) GetCrossClusterTasks( for _, shardID := range request.ShardIDs { future, settable := future.NewFuture() futureByShardID[shardID] = future - go func(shardID int32) { - logger := h.GetLogger().WithTags(tag.ShardID(int(shardID))) - engine, err := h.controller.GetEngineForShard(int(shardID)) + go func(shardID int) { + logger := h.GetLogger().WithTags(tag.ShardID(shardID)) + engine, err := h.controller.GetEngineForShard(shardID) if err != nil { logger.Error("History engine not found for shard", tag.Error(err)) var owner membership.HostInfo - if info, err := h.GetMembershipResolver().Lookup(service.History, strconv.Itoa(int(shardID))); err == nil { + if info, err := lookup.HistoryServerByShardID(h.GetMembershipResolver(), shardID); err == nil { owner = info } settable.Set(nil, shard.CreateShardOwnershipLostError(h.GetHostInfo(), owner)) @@ -1975,7 +1974,7 @@ func (h *handlerImpl) GetCrossClusterTasks( } else { settable.Set(tasks, nil) } - }(shardID) + }(int(shardID)) } response := &types.GetCrossClusterTasksResponse{ @@ -2069,7 +2068,7 @@ func (h *handlerImpl) RatelimitUpdate( func (h *handlerImpl) convertError(err error) error { switch err := err.(type) { case *persistence.ShardOwnershipLostError: - info, err2 := h.GetMembershipResolver().Lookup(service.History, strconv.Itoa(err.ShardID)) + info, err2 := lookup.HistoryServerByShardID(h.GetMembershipResolver(), err.ShardID) if err2 != nil { return shard.CreateShardOwnershipLostError(h.GetHostInfo(), membership.HostInfo{}) } diff --git a/service/history/handler/handler_test.go b/service/history/handler/handler_test.go index 8da6d8e4b66..9c2c8336ef7 100644 --- a/service/history/handler/handler_test.go +++ b/service/history/handler/handler_test.go @@ -38,7 +38,9 @@ import ( "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/metrics/mocks" + "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/quotas" + "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/constants" @@ -89,7 +91,6 @@ func (s *handlerSuite) SetupTest() { s.mockResource.Logger = testlogger.New(s.Suite.T()) s.mockShardController = shard.NewMockController(s.controller) s.mockEngine = engine.NewMockEngine(s.controller) - s.mockShardController.EXPECT().GetEngineForShard(gomock.Any()).Return(s.mockEngine, nil).AnyTimes() s.mockWFCache = workflowcache.NewMockWFCache(s.controller) internalRequestRateLimitingEnabledConfig := func(domainName string) bool { return false } s.handler = NewHandler(s.mockResource, config.NewForTest(), s.mockWFCache, internalRequestRateLimitingEnabledConfig).(*handlerImpl) @@ -341,13 +342,11 @@ func (s *handlerSuite) TestRecordActivityTaskStarted() { func (s *handlerSuite) TestRecordDecisionTaskStarted() { testInput := map[string]struct { - caseName string input *types.RecordDecisionTaskStartedRequest expected *types.RecordDecisionTaskStartedResponse expectedError bool }{ "valid input": { - caseName: "valid input", input: &types.RecordDecisionTaskStartedRequest{ DomainUUID: testDomainID, WorkflowExecution: &types.WorkflowExecution{ @@ -369,7 +368,6 @@ func (s *handlerSuite) TestRecordDecisionTaskStarted() { expectedError: false, }, "empty domainID": { - caseName: "empty domainID", input: &types.RecordDecisionTaskStartedRequest{ DomainUUID: "", WorkflowExecution: &types.WorkflowExecution{ @@ -381,7 +379,6 @@ func (s *handlerSuite) TestRecordDecisionTaskStarted() { expectedError: true, }, "ratelimit exceeded": { - caseName: "ratelimit exceeded", input: &types.RecordDecisionTaskStartedRequest{ DomainUUID: testDomainID, WorkflowExecution: &types.WorkflowExecution{ @@ -398,7 +395,6 @@ func (s *handlerSuite) TestRecordDecisionTaskStarted() { expectedError: true, }, "get engine error": { - caseName: "get engine error", input: &types.RecordDecisionTaskStartedRequest{ DomainUUID: testDomainID, WorkflowExecution: &types.WorkflowExecution{ @@ -415,7 +411,22 @@ func (s *handlerSuite) TestRecordDecisionTaskStarted() { expectedError: true, }, "engine error": { - caseName: "engine error", + input: &types.RecordDecisionTaskStartedRequest{ + DomainUUID: testDomainID, + WorkflowExecution: &types.WorkflowExecution{ + WorkflowID: testWorkflowID, + RunID: testValidUUID, + }, + PollRequest: &types.PollForDecisionTaskRequest{ + TaskList: &types.TaskList{ + Name: "test-task-list", + }, + }, + }, + expected: nil, + expectedError: true, + }, + "engine error with ShardOwnershipLost": { input: &types.RecordDecisionTaskStartedRequest{ DomainUUID: testDomainID, WorkflowExecution: &types.WorkflowExecution{ @@ -432,7 +443,6 @@ func (s *handlerSuite) TestRecordDecisionTaskStarted() { expectedError: true, }, "empty poll request": { - caseName: "empty poll request", input: &types.RecordDecisionTaskStartedRequest{ DomainUUID: testDomainID, WorkflowExecution: &types.WorkflowExecution{ @@ -447,7 +457,7 @@ func (s *handlerSuite) TestRecordDecisionTaskStarted() { for name, input := range testInput { s.Run(name, func() { - switch input.caseName { + switch name { case "valid input": s.mockShardController.EXPECT().GetEngine(gomock.Any()).Return(s.mockEngine, nil).Times(1) s.mockEngine.EXPECT().RecordDecisionTaskStarted(gomock.Any(), input.input).Return(input.expected, nil).Times(1) @@ -462,9 +472,15 @@ func (s *handlerSuite) TestRecordDecisionTaskStarted() { s.mockShardController.EXPECT().GetEngine(testWorkflowID).Return(s.mockEngine, nil).Times(1) s.mockEngine.EXPECT().RecordDecisionTaskStarted(gomock.Any(), input.input).Return(nil, errors.New("error")).Times(1) s.mockRatelimiter.EXPECT().Allow().Return(true).Times(1) + case "engine error with ShardOwnershipLost": + s.mockShardController.EXPECT().GetEngine(testWorkflowID).Return(s.mockEngine, nil).Times(1) + s.mockRatelimiter.EXPECT().Allow().Return(true).Times(1) + s.mockEngine.EXPECT().RecordDecisionTaskStarted(gomock.Any(), input.input).Return(nil, &persistence.ShardOwnershipLostError{ShardID: 123}).Times(1) + s.mockResource.MembershipResolver.EXPECT().Lookup(service.History, string(rune(123))) case "empty poll request": s.mockRatelimiter.EXPECT().Allow().Return(true).Times(1) } + response, err := s.handler.RecordDecisionTaskStarted(context.Background(), input.input) s.Equal(input.expected, response) if input.expectedError { @@ -754,6 +770,8 @@ func (s *handlerSuite) TestGetCrossClusterTasks() { var shardIDs []int32 numSucceeded := int32(0) numTasksPerShard := rand.Intn(10) + + s.mockShardController.EXPECT().GetEngineForShard(gomock.Any()).Return(s.mockEngine, nil).Times(numShards) s.mockEngine.EXPECT().GetCrossClusterTasks(gomock.Any(), targetCluster).DoAndReturn( func(_ context.Context, _ string) ([]*types.CrossClusterTaskRequest, error) { succeeded := rand.Intn(2) == 0 @@ -764,6 +782,7 @@ func (s *handlerSuite) TestGetCrossClusterTasks() { return nil, errors.New("some random error") }, ).MaxTimes(numShards) + for i := 0; i != numShards; i++ { shardIDs = append(shardIDs, int32(i)) } @@ -783,6 +802,35 @@ func (s *handlerSuite) TestGetCrossClusterTasks() { } } +func (s *handlerSuite) TestGetCrossClusterTasksFails_IfGetEngineFails() { + numShards := 10 + targetCluster := cluster.TestAlternativeClusterName + var shardIDs []int32 + + for i := 0; i != numShards; i++ { + shardIDs = append(shardIDs, int32(i)) + s.mockShardController.EXPECT().GetEngineForShard(i). + Return(nil, errors.New("failed to get engine")) + + // as response to the above failure we're looking up for the current shard owner + s.mockResource.MembershipResolver.EXPECT().Lookup(service.History, string(rune(i))) + } + + request := &types.GetCrossClusterTasksRequest{ + ShardIDs: shardIDs, + TargetCluster: targetCluster, + } + + response, err := s.handler.GetCrossClusterTasks(context.Background(), request) + s.NoError(err) + s.NotNil(response) + + s.Len(response.FailedCauseByShard, numShards, "we fail GetEngineForShard every time") + for _, failure := range response.FailedCauseByShard { + s.IsType(types.GetTaskFailedCauseShardOwnershipLost, failure) + } +} + func (s *handlerSuite) TestRespondCrossClusterTaskCompleted_FetchNewTask() { s.testRespondCrossClusterTaskCompleted(true) } @@ -802,6 +850,7 @@ func (s *handlerSuite) testRespondCrossClusterTaskCompleted( TaskResponses: make([]*types.CrossClusterTaskResponse, numTasks), FetchNewTasks: fetchNewTask, } + s.mockShardController.EXPECT().GetEngineForShard(0).Return(s.mockEngine, nil) s.mockEngine.EXPECT().RespondCrossClusterTasksCompleted(gomock.Any(), targetCluster, request.TaskResponses).Return(nil).Times(1) if fetchNewTask { s.mockEngine.EXPECT().GetCrossClusterTasks(gomock.Any(), targetCluster).Return(make([]*types.CrossClusterTaskRequest, numTasks), nil).Times(1) diff --git a/service/history/lookup/lookup.go b/service/history/lookup/lookup.go new file mode 100644 index 00000000000..746d3aeae1c --- /dev/null +++ b/service/history/lookup/lookup.go @@ -0,0 +1,33 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package lookup + +import ( + "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/service" +) + +// HistoryServerByShardID calls resolver.Lookup with key based on provided shardID +func HistoryServerByShardID(resolver membership.Resolver, shardID int) (membership.HostInfo, error) { + return resolver.Lookup(service.History, string(rune(shardID))) +} diff --git a/service/history/lookup/lookup_test.go b/service/history/lookup/lookup_test.go new file mode 100644 index 00000000000..9ef896c21fc --- /dev/null +++ b/service/history/lookup/lookup_test.go @@ -0,0 +1,60 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package lookup + +import ( + "errors" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/service" +) + +func TestHistoryServerByShardID_Succeeds(t *testing.T) { + ctrl := gomock.NewController(t) + mockResolver := membership.NewMockResolver(ctrl) + + mockResolver.EXPECT().Lookup(service.History, string(rune(65))). + Return(membership.NewHostInfo("127.0.0.1:1234"), nil) + + host, err := HistoryServerByShardID(mockResolver, 65) + require.NoError(t, err) + assert.Equal(t, "127.0.0.1:1234", host.GetAddress()) +} + +func TestHistoryServerByShardID_PreservesError(t *testing.T) { + lookupError := errors.New("lookup failed") + ctrl := gomock.NewController(t) + mockResolver := membership.NewMockResolver(ctrl) + + mockResolver.EXPECT().Lookup(service.History, gomock.Any()). + Return(membership.HostInfo{}, lookupError) + + host, err := HistoryServerByShardID(mockResolver, 65) + assert.Equal(t, lookupError, err, "error should not be modified") + assert.Empty(t, host) +} diff --git a/service/history/shard/controller.go b/service/history/shard/controller.go index fa6feac991a..44f7e1208e4 100644 --- a/service/history/shard/controller.go +++ b/service/history/shard/controller.go @@ -39,6 +39,7 @@ import ( "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/engine" + "github.com/uber/cadence/service/history/lookup" "github.com/uber/cadence/service/history/resource" ) @@ -293,7 +294,7 @@ func (c *controller) getOrCreateHistoryShardItem(shardID int) (*historyShardsIte if c.isShuttingDown() || atomic.LoadInt32(&c.status) == common.DaemonStatusStopped { return nil, fmt.Errorf("controller for host '%v' shutting down", c.GetHostInfo().Identity()) } - info, err := c.GetMembershipResolver().Lookup(service.History, string(rune(shardID))) + info, err := lookup.HistoryServerByShardID(c.GetMembershipResolver(), shardID) if err != nil { return nil, err } @@ -394,7 +395,7 @@ func (c *controller) acquireShards() { concurrency := common.MaxInt(c.config.AcquireShardConcurrency(), 1) var wg sync.WaitGroup wg.Add(concurrency) - // Spawn workers that would lookup and add/remove shards concurrently. + // Spawn workers that would do lookup and add/remove shards concurrently. for i := 0; i < concurrency; i++ { go func() { defer wg.Done() @@ -402,7 +403,7 @@ func (c *controller) acquireShards() { if c.isShuttingDown() { return } - info, err := c.GetMembershipResolver().Lookup(service.History, string(rune(shardID))) + info, err := lookup.HistoryServerByShardID(c.GetMembershipResolver(), shardID) if err != nil { c.logger.Error("Error looking up host for shardID", tag.Error(err), tag.OperationFailed, tag.ShardID(shardID)) } else {