Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Pinto query validator failed log, minor refactor pinot visibility store to remove panics #5664

Merged
merged 5 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 59 additions & 32 deletions common/persistence/pinot/pinotVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,10 @@ func (v *pinotVisibilityStore) ListOpenWorkflowExecutions(
isRecordValid := func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool {
return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime)
}
query := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, false)
query, err := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, false)
if err != nil {
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -302,7 +305,10 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutions(
return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime)
}

query := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, true)
query, err := getListWorkflowExecutionsQuery(v.pinotClient.GetTableName(), request, true)
if err != nil {
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -320,7 +326,10 @@ func (v *pinotVisibilityStore) ListOpenWorkflowExecutionsByType(ctx context.Cont
return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime)
}

query := getListWorkflowExecutionsByTypeQuery(v.pinotClient.GetTableName(), request, false)
query, err := getListWorkflowExecutionsByTypeQuery(v.pinotClient.GetTableName(), request, false)
if err != nil {
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -338,7 +347,10 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByType(ctx context.Co
return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime)
}

query := getListWorkflowExecutionsByTypeQuery(v.pinotClient.GetTableName(), request, true)
query, err := getListWorkflowExecutionsByTypeQuery(v.pinotClient.GetTableName(), request, true)
if err != nil {
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -356,7 +368,10 @@ func (v *pinotVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(ctx contex
return !request.EarliestTime.After(rec.StartTime) && !rec.StartTime.After(request.LatestTime)
}

query := getListWorkflowExecutionsByWorkflowIDQuery(v.pinotClient.GetTableName(), request, false)
query, err := getListWorkflowExecutionsByWorkflowIDQuery(v.pinotClient.GetTableName(), request, false)
if err != nil {
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -374,7 +389,10 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByWorkflowID(ctx cont
return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime)
}

query := getListWorkflowExecutionsByWorkflowIDQuery(v.pinotClient.GetTableName(), request, true)
query, err := getListWorkflowExecutionsByWorkflowIDQuery(v.pinotClient.GetTableName(), request, true)
if err != nil {
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -392,7 +410,10 @@ func (v *pinotVisibilityStore) ListClosedWorkflowExecutionsByStatus(ctx context.
return !request.EarliestTime.After(rec.CloseTime) && !rec.CloseTime.After(request.LatestTime)
}

query := getListWorkflowExecutionsByStatusQuery(v.pinotClient.GetTableName(), request)
query, err := getListWorkflowExecutionsByStatusQuery(v.pinotClient.GetTableName(), request)
if err != nil {
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand Down Expand Up @@ -439,7 +460,10 @@ func (v *pinotVisibilityStore) GetClosedWorkflowExecution(ctx context.Context, r
func (v *pinotVisibilityStore) ListWorkflowExecutions(ctx context.Context, request *p.ListWorkflowExecutionsByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
checkPageSize(request)

query := v.getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request)
query, err := v.getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request)
if err != nil {
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand All @@ -462,7 +486,10 @@ func (v *pinotVisibilityStore) ListWorkflowExecutions(ctx context.Context, reque
func (v *pinotVisibilityStore) ScanWorkflowExecutions(ctx context.Context, request *p.ListWorkflowExecutionsByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
checkPageSize(request)

query := v.getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request)
query, err := v.getListWorkflowExecutionsByQueryQuery(v.pinotClient.GetTableName(), request)
if err != nil {
return nil, err
}

req := &pnt.SearchRequest{
Query: query,
Expand Down Expand Up @@ -749,14 +776,14 @@ func (v *pinotVisibilityStore) getCountWorkflowExecutionsQuery(tableName string,
return query.String()
}

func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName string, request *p.ListWorkflowExecutionsByQueryRequest) string {
func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName string, request *p.ListWorkflowExecutionsByQueryRequest) (string, error) {
if request == nil {
return ""
return "", nil
}

token, err := pnt.GetNextPageToken(request.NextPageToken)
if err != nil {
panic(fmt.Sprintf("deserialize next page token error: %s", err))
return "", fmt.Errorf("next page token: %w", err)
neil-xie marked this conversation as resolved.
Show resolved Hide resolved
}

query := NewPinotQuery(tableName)
Expand All @@ -770,20 +797,20 @@ func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName s
// if customized query is empty, directly return
if requestQuery == "" {
query.addOffsetAndLimits(token.From, request.PageSize)
return query.String()
return query.String(), nil
}

requestQuery = filterPrefix(requestQuery)
if common.IsJustOrderByClause(requestQuery) {
query.concatSorter(requestQuery)
query.addOffsetAndLimits(token.From, request.PageSize)
return query.String()
return query.String(), nil
}

comparExpr, orderBy := parseOrderBy(requestQuery)
comparExpr, err = v.pinotQueryValidator.ValidateQuery(comparExpr)
if err != nil {
v.logger.Error(fmt.Sprintf("pinot query validator error: %s", err))
return "", fmt.Errorf(fmt.Sprintf("pinot query validator error: %w, query: %s", err, request.Query))
}

comparExpr = filterPrefix(comparExpr)
Expand All @@ -800,7 +827,7 @@ func (v *pinotVisibilityStore) getListWorkflowExecutionsByQueryQuery(tableName s
}

query.addOffsetAndLimits(token.From, request.PageSize)
return query.String()
return query.String(), nil
}

func filterPrefix(query string) string {
Expand Down Expand Up @@ -907,14 +934,14 @@ func findLastOrderBy(list []string) int {
return 0
}

func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWorkflowExecutionsRequest, isClosed bool) string {
func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWorkflowExecutionsRequest, isClosed bool) (string, error) {
if request == nil {
return ""
return "", nil
}

token, err := pnt.GetNextPageToken(request.NextPageToken)
if err != nil {
panic(fmt.Sprintf("deserialize next page token error: %s", err))
return "", fmt.Errorf("next page token: %w", err)
}

from := token.From
Expand All @@ -939,12 +966,12 @@ func getListWorkflowExecutionsQuery(tableName string, request *p.InternalListWor
query.addPinotSorter(StartTime, DescendingOrder)
query.addOffsetAndLimits(from, pageSize)

return query.String()
return query.String(), nil
}

func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalListWorkflowExecutionsByTypeRequest, isClosed bool) string {
func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalListWorkflowExecutionsByTypeRequest, isClosed bool) (string, error) {
if request == nil {
return ""
return "", nil
}

query := NewPinotQuery(tableName)
Expand All @@ -968,19 +995,19 @@ func getListWorkflowExecutionsByTypeQuery(tableName string, request *p.InternalL

token, err := pnt.GetNextPageToken(request.NextPageToken)
if err != nil {
panic(fmt.Sprintf("deserialize next page token error: %s", err))
return "", fmt.Errorf("next page token: %w", err)
}

from := token.From
pageSize := request.PageSize
query.addOffsetAndLimits(from, pageSize)

return query.String()
return query.String(), nil
}

func getListWorkflowExecutionsByWorkflowIDQuery(tableName string, request *p.InternalListWorkflowExecutionsByWorkflowIDRequest, isClosed bool) string {
func getListWorkflowExecutionsByWorkflowIDQuery(tableName string, request *p.InternalListWorkflowExecutionsByWorkflowIDRequest, isClosed bool) (string, error) {
if request == nil {
return ""
return "", nil
}

query := NewPinotQuery(tableName)
Expand All @@ -1004,19 +1031,19 @@ func getListWorkflowExecutionsByWorkflowIDQuery(tableName string, request *p.Int

token, err := pnt.GetNextPageToken(request.NextPageToken)
if err != nil {
panic(fmt.Sprintf("deserialize next page token error: %s", err))
return "", fmt.Errorf("next page token: %w", err)
}

from := token.From
pageSize := request.PageSize
query.addOffsetAndLimits(from, pageSize)

return query.String()
return query.String(), nil
}

func getListWorkflowExecutionsByStatusQuery(tableName string, request *p.InternalListClosedWorkflowExecutionsByStatusRequest) string {
func getListWorkflowExecutionsByStatusQuery(tableName string, request *p.InternalListClosedWorkflowExecutionsByStatusRequest) (string, error) {
if request == nil {
return ""
return "", nil
}

query := NewPinotQuery(tableName)
Expand Down Expand Up @@ -1047,14 +1074,14 @@ func getListWorkflowExecutionsByStatusQuery(tableName string, request *p.Interna

token, err := pnt.GetNextPageToken(request.NextPageToken)
if err != nil {
panic(fmt.Sprintf("deserialize next page token error: %s", err))
return "", fmt.Errorf("next page token: %w", err)
}

from := token.From
pageSize := request.PageSize
query.addOffsetAndLimits(from, pageSize)

return query.String()
return query.String(), nil
}

func getGetClosedWorkflowExecutionQuery(tableName string, request *p.InternalGetClosedWorkflowExecutionRequest) string {
Expand Down
40 changes: 25 additions & 15 deletions common/persistence/pinot/pinotVisibilityStore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,9 @@ LIMIT 0, 0

for name, test := range tests {
t.Run(name, func(t *testing.T) {
assert.NotPanics(t, func() {
output := visibilityStore.getListWorkflowExecutionsByQueryQuery(testTableName, test.input)
assert.Equal(t, test.expectedOutput, output)
})
output, err := visibilityStore.getListWorkflowExecutionsByQueryQuery(testTableName, test.input)
assert.Equal(t, test.expectedOutput, output)
assert.NoError(t, err)
})
}
}
Expand All @@ -329,9 +328,9 @@ func TestGetListWorkflowExecutionsQuery(t *testing.T) {
NextPageToken: nil,
}

closeResult := getListWorkflowExecutionsQuery(testTableName, request, true)
openResult := getListWorkflowExecutionsQuery(testTableName, request, false)
nilResult := getListWorkflowExecutionsQuery(testTableName, nil, true)
closeResult, err1 := getListWorkflowExecutionsQuery(testTableName, request, true)
openResult, err2 := getListWorkflowExecutionsQuery(testTableName, request, false)
nilResult, err3 := getListWorkflowExecutionsQuery(testTableName, nil, true)
expectCloseResult := fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
Expand All @@ -356,6 +355,9 @@ LIMIT 0, 10
assert.Equal(t, closeResult, expectCloseResult)
assert.Equal(t, openResult, expectOpenResult)
assert.Equal(t, nilResult, expectNilResult)
assert.NoError(t, err1)
assert.NoError(t, err2)
assert.NoError(t, err3)
}

func TestGetListWorkflowExecutionsByTypeQuery(t *testing.T) {
Expand All @@ -371,9 +373,9 @@ func TestGetListWorkflowExecutionsByTypeQuery(t *testing.T) {
WorkflowTypeName: testWorkflowType,
}

closeResult := getListWorkflowExecutionsByTypeQuery(testTableName, request, true)
openResult := getListWorkflowExecutionsByTypeQuery(testTableName, request, false)
nilResult := getListWorkflowExecutionsByTypeQuery(testTableName, nil, true)
closeResult, err1 := getListWorkflowExecutionsByTypeQuery(testTableName, request, true)
openResult, err2 := getListWorkflowExecutionsByTypeQuery(testTableName, request, false)
nilResult, err3 := getListWorkflowExecutionsByTypeQuery(testTableName, nil, true)
expectCloseResult := fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
Expand All @@ -400,6 +402,9 @@ LIMIT 0, 10
assert.Equal(t, closeResult, expectCloseResult)
assert.Equal(t, openResult, expectOpenResult)
assert.Equal(t, nilResult, expectNilResult)
assert.NoError(t, err1)
assert.NoError(t, err2)
assert.NoError(t, err3)
}

func TestGetListWorkflowExecutionsByWorkflowIDQuery(t *testing.T) {
Expand All @@ -415,9 +420,9 @@ func TestGetListWorkflowExecutionsByWorkflowIDQuery(t *testing.T) {
WorkflowID: testWorkflowID,
}

closeResult := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, true)
openResult := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, false)
nilResult := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, nil, true)
closeResult, err1 := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, true)
openResult, err2 := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, request, false)
nilResult, err3 := getListWorkflowExecutionsByWorkflowIDQuery(testTableName, nil, true)
expectCloseResult := fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
Expand All @@ -444,6 +449,9 @@ LIMIT 0, 10
assert.Equal(t, closeResult, expectCloseResult)
assert.Equal(t, openResult, expectOpenResult)
assert.Equal(t, nilResult, expectNilResult)
assert.NoError(t, err1)
assert.NoError(t, err2)
assert.NoError(t, err3)
}

func TestGetListWorkflowExecutionsByStatusQuery(t *testing.T) {
Expand All @@ -459,8 +467,8 @@ func TestGetListWorkflowExecutionsByStatusQuery(t *testing.T) {
Status: types.WorkflowExecutionCloseStatus(0),
}

closeResult := getListWorkflowExecutionsByStatusQuery(testTableName, request)
nilResult := getListWorkflowExecutionsByStatusQuery(testTableName, nil)
closeResult, err1 := getListWorkflowExecutionsByStatusQuery(testTableName, request)
nilResult, err2 := getListWorkflowExecutionsByStatusQuery(testTableName, nil)
expectCloseResult := fmt.Sprintf(`SELECT *
FROM %s
WHERE DomainID = 'bfd5c907-f899-4baf-a7b2-2ab85e623ebd'
Expand All @@ -474,6 +482,8 @@ LIMIT 0, 10

assert.Equal(t, expectCloseResult, closeResult)
assert.Equal(t, expectNilResult, nilResult)
assert.NoError(t, err1)
assert.NoError(t, err2)
}

func TestGetGetClosedWorkflowExecutionQuery(t *testing.T) {
Expand Down
Loading