Skip to content

Commit

Permalink
Bugfix: we address hosts using string(rune(shardID)), not by itoa(sha…
Browse files Browse the repository at this point in the history
…rdD) (#5952)

* Bugfix: we address hosts using string(rune(shardID)), not by
itoa(shardD)

I wish it be the other way, but we can't change how it works now since
in all the other places we already use rune(shardID).
This was not a critical issue - we just returned ShardOwnershipLostError
with bad target host in very specific cases. For instance, at the
persistence engine error - when we got an error from transaction.

Extracting the weird string(rune(shardID)) logic to a separate function
so to prevent the same issue in future.
  • Loading branch information
dkrotx authored Apr 29, 2024
1 parent b858891 commit 8871abc
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 28 deletions.
4 changes: 2 additions & 2 deletions client/history/peer_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/service/history/lookup"
)

// PeerResolver is used to resolve history peers.
Expand Down Expand Up @@ -74,8 +75,7 @@ func (pr peerResolver) FromDomainID(domainID string) (string, error) {
// It uses our membership provider to lookup which instance currently owns the given shard.
// FromHostAddress is used for further resolving.
func (pr peerResolver) FromShardID(shardID int) (string, error) {
shardIDString := string(rune(shardID))
host, err := pr.resolver.Lookup(service.History, shardIDString)
host, err := lookup.HistoryServerByShardID(pr.resolver, shardID)
if err != nil {
return "", common.ToServiceTransientError(err)
}
Expand Down
11 changes: 5 additions & 6 deletions service/frontend/admin/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/uber/cadence/service/frontend/config"
"github.com/uber/cadence/service/frontend/validate"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/lookup"
)

const (
Expand Down Expand Up @@ -258,10 +259,9 @@ func (adh *adminHandlerImpl) DescribeWorkflowExecution(
}

shardID := common.WorkflowIDToHistoryShard(request.Execution.WorkflowID, adh.numberOfHistoryShards)
shardIDstr := string(rune(shardID)) // originally `string(int_shard_id)`, but changing it will change the ring hashing
shardIDForOutput := strconv.Itoa(shardID)

historyHost, err := adh.GetMembershipResolver().Lookup(service.History, shardIDstr)
historyHost, err := lookup.HistoryServerByShardID(adh.GetMembershipResolver(), shardID)
if err != nil {
return nil, adh.error(err, scope)
}
Expand Down Expand Up @@ -663,16 +663,15 @@ func (adh *adminHandlerImpl) DescribeShardDistribution(
_, sw := adh.startRequestProfile(ctx, metrics.AdminDescribeShardDistributionScope)
defer sw.Stop()

numShards := adh.config.NumHistoryShards
resp = &types.DescribeShardDistributionResponse{
NumberOfShards: int32(numShards),
NumberOfShards: int32(adh.numberOfHistoryShards),
Shards: make(map[int32]string),
}

offset := int(request.PageID * request.PageSize)
nextPageStart := offset + int(request.PageSize)
for shardID := offset; shardID < numShards && shardID < nextPageStart; shardID++ {
info, err := adh.GetMembershipResolver().Lookup(service.History, string(rune(shardID)))
for shardID := offset; shardID < adh.numberOfHistoryShards && shardID < nextPageStart; shardID++ {
info, err := lookup.HistoryServerByShardID(adh.GetMembershipResolver(), shardID)
if err != nil {
resp.Shards[int32(shardID)] = "unknown"
} else {
Expand Down
18 changes: 18 additions & 0 deletions service/frontend/admin/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,24 @@ func (s *adminHandlerSuite) Test_ConfigStore_NilRequest() {
s.Error(err)
}

func (s *adminHandlerSuite) Test_DescribeShardDistribution() {
s.mockResource.MembershipResolver.EXPECT().Lookup(service.History, string(rune(0))).
Return(membership.NewHostInfo("127.0.0.1:1234"), nil)

res, err := s.handler.DescribeShardDistribution(
context.Background(),
&types.DescribeShardDistributionRequest{PageSize: 10},
)
s.Require().NoError(err)
s.Equal(
&types.DescribeShardDistributionResponse{
NumberOfShards: 1,
Shards: map[int32]string{0: "127.0.0.1:1234"},
},
res,
)
}

func (s *adminHandlerSuite) Test_ConfigStore_InvalidKey() {
ctx := context.Background()
handler := s.handler
Expand Down
15 changes: 7 additions & 8 deletions service/history/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
Expand All @@ -43,7 +42,6 @@ import (
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/types/mapper/proto"
"github.com/uber/cadence/service/history/config"
Expand All @@ -52,6 +50,7 @@ import (
"github.com/uber/cadence/service/history/engine/engineimpl"
"github.com/uber/cadence/service/history/events"
"github.com/uber/cadence/service/history/failover"
"github.com/uber/cadence/service/history/lookup"
"github.com/uber/cadence/service/history/replication"
"github.com/uber/cadence/service/history/resource"
"github.com/uber/cadence/service/history/shard"
Expand Down Expand Up @@ -1956,13 +1955,13 @@ func (h *handlerImpl) GetCrossClusterTasks(
for _, shardID := range request.ShardIDs {
future, settable := future.NewFuture()
futureByShardID[shardID] = future
go func(shardID int32) {
logger := h.GetLogger().WithTags(tag.ShardID(int(shardID)))
engine, err := h.controller.GetEngineForShard(int(shardID))
go func(shardID int) {
logger := h.GetLogger().WithTags(tag.ShardID(shardID))
engine, err := h.controller.GetEngineForShard(shardID)
if err != nil {
logger.Error("History engine not found for shard", tag.Error(err))
var owner membership.HostInfo
if info, err := h.GetMembershipResolver().Lookup(service.History, strconv.Itoa(int(shardID))); err == nil {
if info, err := lookup.HistoryServerByShardID(h.GetMembershipResolver(), shardID); err == nil {
owner = info
}
settable.Set(nil, shard.CreateShardOwnershipLostError(h.GetHostInfo(), owner))
Expand All @@ -1975,7 +1974,7 @@ func (h *handlerImpl) GetCrossClusterTasks(
} else {
settable.Set(tasks, nil)
}
}(shardID)
}(int(shardID))
}

response := &types.GetCrossClusterTasksResponse{
Expand Down Expand Up @@ -2069,7 +2068,7 @@ func (h *handlerImpl) RatelimitUpdate(
func (h *handlerImpl) convertError(err error) error {
switch err := err.(type) {
case *persistence.ShardOwnershipLostError:
info, err2 := h.GetMembershipResolver().Lookup(service.History, strconv.Itoa(err.ShardID))
info, err2 := lookup.HistoryServerByShardID(h.GetMembershipResolver(), err.ShardID)
if err2 != nil {
return shard.CreateShardOwnershipLostError(h.GetHostInfo(), membership.HostInfo{})
}
Expand Down
67 changes: 58 additions & 9 deletions service/history/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ import (
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/metrics/mocks"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/config"
"github.com/uber/cadence/service/history/constants"
Expand Down Expand Up @@ -89,7 +91,6 @@ func (s *handlerSuite) SetupTest() {
s.mockResource.Logger = testlogger.New(s.Suite.T())
s.mockShardController = shard.NewMockController(s.controller)
s.mockEngine = engine.NewMockEngine(s.controller)
s.mockShardController.EXPECT().GetEngineForShard(gomock.Any()).Return(s.mockEngine, nil).AnyTimes()
s.mockWFCache = workflowcache.NewMockWFCache(s.controller)
internalRequestRateLimitingEnabledConfig := func(domainName string) bool { return false }
s.handler = NewHandler(s.mockResource, config.NewForTest(), s.mockWFCache, internalRequestRateLimitingEnabledConfig).(*handlerImpl)
Expand Down Expand Up @@ -341,13 +342,11 @@ func (s *handlerSuite) TestRecordActivityTaskStarted() {

func (s *handlerSuite) TestRecordDecisionTaskStarted() {
testInput := map[string]struct {
caseName string
input *types.RecordDecisionTaskStartedRequest
expected *types.RecordDecisionTaskStartedResponse
expectedError bool
}{
"valid input": {
caseName: "valid input",
input: &types.RecordDecisionTaskStartedRequest{
DomainUUID: testDomainID,
WorkflowExecution: &types.WorkflowExecution{
Expand All @@ -369,7 +368,6 @@ func (s *handlerSuite) TestRecordDecisionTaskStarted() {
expectedError: false,
},
"empty domainID": {
caseName: "empty domainID",
input: &types.RecordDecisionTaskStartedRequest{
DomainUUID: "",
WorkflowExecution: &types.WorkflowExecution{
Expand All @@ -381,7 +379,6 @@ func (s *handlerSuite) TestRecordDecisionTaskStarted() {
expectedError: true,
},
"ratelimit exceeded": {
caseName: "ratelimit exceeded",
input: &types.RecordDecisionTaskStartedRequest{
DomainUUID: testDomainID,
WorkflowExecution: &types.WorkflowExecution{
Expand All @@ -398,7 +395,6 @@ func (s *handlerSuite) TestRecordDecisionTaskStarted() {
expectedError: true,
},
"get engine error": {
caseName: "get engine error",
input: &types.RecordDecisionTaskStartedRequest{
DomainUUID: testDomainID,
WorkflowExecution: &types.WorkflowExecution{
Expand All @@ -415,7 +411,22 @@ func (s *handlerSuite) TestRecordDecisionTaskStarted() {
expectedError: true,
},
"engine error": {
caseName: "engine error",
input: &types.RecordDecisionTaskStartedRequest{
DomainUUID: testDomainID,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: testWorkflowID,
RunID: testValidUUID,
},
PollRequest: &types.PollForDecisionTaskRequest{
TaskList: &types.TaskList{
Name: "test-task-list",
},
},
},
expected: nil,
expectedError: true,
},
"engine error with ShardOwnershipLost": {
input: &types.RecordDecisionTaskStartedRequest{
DomainUUID: testDomainID,
WorkflowExecution: &types.WorkflowExecution{
Expand All @@ -432,7 +443,6 @@ func (s *handlerSuite) TestRecordDecisionTaskStarted() {
expectedError: true,
},
"empty poll request": {
caseName: "empty poll request",
input: &types.RecordDecisionTaskStartedRequest{
DomainUUID: testDomainID,
WorkflowExecution: &types.WorkflowExecution{
Expand All @@ -447,7 +457,7 @@ func (s *handlerSuite) TestRecordDecisionTaskStarted() {

for name, input := range testInput {
s.Run(name, func() {
switch input.caseName {
switch name {
case "valid input":
s.mockShardController.EXPECT().GetEngine(gomock.Any()).Return(s.mockEngine, nil).Times(1)
s.mockEngine.EXPECT().RecordDecisionTaskStarted(gomock.Any(), input.input).Return(input.expected, nil).Times(1)
Expand All @@ -462,9 +472,15 @@ func (s *handlerSuite) TestRecordDecisionTaskStarted() {
s.mockShardController.EXPECT().GetEngine(testWorkflowID).Return(s.mockEngine, nil).Times(1)
s.mockEngine.EXPECT().RecordDecisionTaskStarted(gomock.Any(), input.input).Return(nil, errors.New("error")).Times(1)
s.mockRatelimiter.EXPECT().Allow().Return(true).Times(1)
case "engine error with ShardOwnershipLost":
s.mockShardController.EXPECT().GetEngine(testWorkflowID).Return(s.mockEngine, nil).Times(1)
s.mockRatelimiter.EXPECT().Allow().Return(true).Times(1)
s.mockEngine.EXPECT().RecordDecisionTaskStarted(gomock.Any(), input.input).Return(nil, &persistence.ShardOwnershipLostError{ShardID: 123}).Times(1)
s.mockResource.MembershipResolver.EXPECT().Lookup(service.History, string(rune(123)))
case "empty poll request":
s.mockRatelimiter.EXPECT().Allow().Return(true).Times(1)
}

response, err := s.handler.RecordDecisionTaskStarted(context.Background(), input.input)
s.Equal(input.expected, response)
if input.expectedError {
Expand Down Expand Up @@ -754,6 +770,8 @@ func (s *handlerSuite) TestGetCrossClusterTasks() {
var shardIDs []int32
numSucceeded := int32(0)
numTasksPerShard := rand.Intn(10)

s.mockShardController.EXPECT().GetEngineForShard(gomock.Any()).Return(s.mockEngine, nil).Times(numShards)
s.mockEngine.EXPECT().GetCrossClusterTasks(gomock.Any(), targetCluster).DoAndReturn(
func(_ context.Context, _ string) ([]*types.CrossClusterTaskRequest, error) {
succeeded := rand.Intn(2) == 0
Expand All @@ -764,6 +782,7 @@ func (s *handlerSuite) TestGetCrossClusterTasks() {
return nil, errors.New("some random error")
},
).MaxTimes(numShards)

for i := 0; i != numShards; i++ {
shardIDs = append(shardIDs, int32(i))
}
Expand All @@ -783,6 +802,35 @@ func (s *handlerSuite) TestGetCrossClusterTasks() {
}
}

func (s *handlerSuite) TestGetCrossClusterTasksFails_IfGetEngineFails() {
numShards := 10
targetCluster := cluster.TestAlternativeClusterName
var shardIDs []int32

for i := 0; i != numShards; i++ {
shardIDs = append(shardIDs, int32(i))
s.mockShardController.EXPECT().GetEngineForShard(i).
Return(nil, errors.New("failed to get engine"))

// as response to the above failure we're looking up for the current shard owner
s.mockResource.MembershipResolver.EXPECT().Lookup(service.History, string(rune(i)))
}

request := &types.GetCrossClusterTasksRequest{
ShardIDs: shardIDs,
TargetCluster: targetCluster,
}

response, err := s.handler.GetCrossClusterTasks(context.Background(), request)
s.NoError(err)
s.NotNil(response)

s.Len(response.FailedCauseByShard, numShards, "we fail GetEngineForShard every time")
for _, failure := range response.FailedCauseByShard {
s.IsType(types.GetTaskFailedCauseShardOwnershipLost, failure)
}
}

func (s *handlerSuite) TestRespondCrossClusterTaskCompleted_FetchNewTask() {
s.testRespondCrossClusterTaskCompleted(true)
}
Expand All @@ -802,6 +850,7 @@ func (s *handlerSuite) testRespondCrossClusterTaskCompleted(
TaskResponses: make([]*types.CrossClusterTaskResponse, numTasks),
FetchNewTasks: fetchNewTask,
}
s.mockShardController.EXPECT().GetEngineForShard(0).Return(s.mockEngine, nil)
s.mockEngine.EXPECT().RespondCrossClusterTasksCompleted(gomock.Any(), targetCluster, request.TaskResponses).Return(nil).Times(1)
if fetchNewTask {
s.mockEngine.EXPECT().GetCrossClusterTasks(gomock.Any(), targetCluster).Return(make([]*types.CrossClusterTaskRequest, numTasks), nil).Times(1)
Expand Down
33 changes: 33 additions & 0 deletions service/history/lookup/lookup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package lookup

import (
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/service"
)

// HistoryServerByShardID calls resolver.Lookup with key based on provided shardID
func HistoryServerByShardID(resolver membership.Resolver, shardID int) (membership.HostInfo, error) {
return resolver.Lookup(service.History, string(rune(shardID)))
}
Loading

0 comments on commit 8871abc

Please sign in to comment.