Skip to content

Commit

Permalink
Add unit tests for matching engine
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Jun 4, 2024
1 parent ab0d875 commit 0d2f603
Show file tree
Hide file tree
Showing 5 changed files with 904 additions and 49 deletions.
32 changes: 12 additions & 20 deletions service/matching/handler/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,14 @@ func (e *matchingEngineImpl) getTaskListByDomainLocked(domainID string) *types.G
decisionTaskListMap := make(map[string]*types.DescribeTaskListResponse)
activityTaskListMap := make(map[string]*types.DescribeTaskListResponse)
for tl, tlm := range e.taskLists {
if tlm.GetTaskListKind() == types.TaskListKindNormal && tl.GetDomainID() == domainID {
if tl.GetDomainID() == domainID && tlm.GetTaskListKind() == types.TaskListKindNormal {
if types.TaskListType(tl.GetType()) == types.TaskListTypeDecision {
decisionTaskListMap[tl.GetRoot()] = tlm.DescribeTaskList(false)
} else {
activityTaskListMap[tl.GetRoot()] = tlm.DescribeTaskList(false)
}
// TODO: review this logic
activityTaskListMap[tl.GetRoot()] = tlm.DescribeTaskList(false)
}
}

return &types.GetTaskListsByDomainResponse{
DecisionTaskListMap: decisionTaskListMap,
ActivityTaskListMap: activityTaskListMap,
Expand Down Expand Up @@ -712,23 +711,24 @@ func (e *matchingEngineImpl) QueryWorkflow(
queryResultCh := make(chan *queryResult, 1)
e.lockableQueryTaskMap.put(taskID, queryResultCh)
defer e.lockableQueryTaskMap.delete(taskID)
return e.waitForQueryResult(hCtx, queryRequest.GetQueryRequest().GetQueryConsistencyLevel() == types.QueryConsistencyLevelStrong, queryResultCh)
}

func (e *matchingEngineImpl) waitForQueryResult(hCtx *handlerContext, isStrongConsistencyQuery bool, queryResultCh <-chan *queryResult) (*types.QueryWorkflowResponse, error) {
select {
case result := <-queryResultCh:
if result.internalError != nil {
return nil, result.internalError
}

workerResponse := result.workerResponse
// if query was intended as consistent query check to see if worker supports consistent query
if queryRequest.GetQueryRequest().GetQueryConsistencyLevel() == types.QueryConsistencyLevelStrong {
if isStrongConsistencyQuery {
if err := e.versionChecker.SupportsConsistentQuery(
workerResponse.GetCompletedRequest().GetWorkerVersionInfo().GetImpl(),
workerResponse.GetCompletedRequest().GetWorkerVersionInfo().GetFeatureVersion()); err != nil {
return nil, err
}
}

switch workerResponse.GetCompletedRequest().GetCompletedType() {
case types.QueryTaskCompletedTypeCompleted:
return &types.QueryWorkflowResponse{QueryResult: workerResponse.GetCompletedRequest().GetQueryResult()}, nil
Expand Down Expand Up @@ -878,30 +878,22 @@ func (e *matchingEngineImpl) getAllPartitions(
request *types.MatchingListTaskListPartitionsRequest,
taskListType int,
) ([]string, error) {
var partitionKeys []string
domainID, err := e.domainCache.GetDomainID(request.GetDomain())
if err != nil {
return partitionKeys, err
return nil, err
}
taskList := request.GetTaskList()
taskListID, err := tasklist.NewIdentifier(domainID, taskList.GetName(), taskListType)
if err != nil {
return partitionKeys, err
}
rootPartition := taskListID.GetRoot()

partitionKeys = append(partitionKeys, rootPartition)

nWritePartitions := e.config.NumTasklistWritePartitions
n := nWritePartitions(request.GetDomain(), rootPartition, taskListType)
if n <= 0 {
return partitionKeys, nil
return nil, err
}

rootPartition := taskListID.GetRoot()
partitionKeys := []string{rootPartition}
n := e.config.NumTasklistWritePartitions(request.GetDomain(), rootPartition, taskListType)
for i := 1; i < n; i++ {
partitionKeys = append(partitionKeys, fmt.Sprintf("%v%v/%v", common.ReservedTaskListPrefix, rootPartition, i))
}

return partitionKeys, nil
}

Expand Down
Loading

0 comments on commit 0d2f603

Please sign in to comment.