Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor visibility triple manager #6267

Merged
merged 3 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 88 additions & 89 deletions common/persistence/visibility_triple_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ import (

type (
visibilityTripleManager struct {
logger log.Logger
dbVisibilityManager VisibilityManager
pinotVisibilityManager VisibilityManager
esVisibilityManager VisibilityManager
readModeIsFromPinot dynamicconfig.BoolPropertyFnWithDomainFilter
readModeIsFromES dynamicconfig.BoolPropertyFnWithDomainFilter
writeMode dynamicconfig.StringPropertyFn
logCustomerQueryParameter dynamicconfig.BoolPropertyFnWithDomainFilter
readModeIsDouble dynamicconfig.BoolPropertyFnWithDomainFilter
logger log.Logger
dbVisibilityManager VisibilityManager
primaryVisibilityManager VisibilityManager
secondaryVisibilityManager VisibilityManager
readModeIsFromPrimary dynamicconfig.BoolPropertyFnWithDomainFilter
readModeIsFromSecondary dynamicconfig.BoolPropertyFnWithDomainFilter
neil-xie marked this conversation as resolved.
Show resolved Hide resolved
writeMode dynamicconfig.StringPropertyFn
logCustomerQueryParameter dynamicconfig.BoolPropertyFnWithDomainFilter
readModeIsDouble dynamicconfig.BoolPropertyFnWithDomainFilter
}
)

Expand All @@ -69,52 +69,54 @@ var Operation = struct {

var _ VisibilityManager = (*visibilityTripleManager)(nil)

// NewPinotVisibilityTripleManager create a visibility manager that operate on DB or Pinot based on dynamic config.
// NewVisibilityTripleManager create a visibility manager that operate on DB or advanced visibility based on dynamic config.
// For Pinot migration, Pinot is the primary visibility manager, ES is the secondary visibility manager, and DB is the fallback.
// For OpenSearch migration, OS is the primary visibility manager, ES is the secondary visibility manager, and DB is the fallback.
func NewVisibilityTripleManager(
dbVisibilityManager VisibilityManager, // one of the VisibilityManager can be nil
pinotVisibilityManager VisibilityManager,
esVisibilityManager VisibilityManager,
readModeIsFromPinot dynamicconfig.BoolPropertyFnWithDomainFilter,
readModeIsFromES dynamicconfig.BoolPropertyFnWithDomainFilter,
primaryVisibilityManager VisibilityManager,
secondaryVisibilityManager VisibilityManager,
readModeIsFromPrimary dynamicconfig.BoolPropertyFnWithDomainFilter,
readModeIsFromSecondary dynamicconfig.BoolPropertyFnWithDomainFilter,
visWritingMode dynamicconfig.StringPropertyFn,
logCustomerQueryParameter dynamicconfig.BoolPropertyFnWithDomainFilter,
readModeIsDouble dynamicconfig.BoolPropertyFnWithDomainFilter,
logger log.Logger,
) VisibilityManager {
if dbVisibilityManager == nil && pinotVisibilityManager == nil && esVisibilityManager == nil {
if dbVisibilityManager == nil && primaryVisibilityManager == nil && secondaryVisibilityManager == nil {
logger.Fatal("require one of dbVisibilityManager or pinotVisibilityManager or esVisibilityManager")
return nil
}
return &visibilityTripleManager{
dbVisibilityManager: dbVisibilityManager,
pinotVisibilityManager: pinotVisibilityManager,
esVisibilityManager: esVisibilityManager,
readModeIsFromPinot: readModeIsFromPinot,
readModeIsFromES: readModeIsFromES,
writeMode: visWritingMode,
logger: logger,
logCustomerQueryParameter: logCustomerQueryParameter,
readModeIsDouble: readModeIsDouble,
dbVisibilityManager: dbVisibilityManager,
primaryVisibilityManager: primaryVisibilityManager,
secondaryVisibilityManager: secondaryVisibilityManager,
readModeIsFromPrimary: readModeIsFromPrimary,
readModeIsFromSecondary: readModeIsFromSecondary,
writeMode: visWritingMode,
logger: logger,
logCustomerQueryParameter: logCustomerQueryParameter,
readModeIsDouble: readModeIsDouble,
}
}

func (v *visibilityTripleManager) Close() {
if v.dbVisibilityManager != nil {
v.dbVisibilityManager.Close()
}
if v.pinotVisibilityManager != nil {
v.pinotVisibilityManager.Close()
if v.primaryVisibilityManager != nil {
v.primaryVisibilityManager.Close()
}
if v.esVisibilityManager != nil {
v.esVisibilityManager.Close()
if v.secondaryVisibilityManager != nil {
v.secondaryVisibilityManager.Close()
}
}

func (v *visibilityTripleManager) GetName() string {
if v.pinotVisibilityManager != nil {
return v.pinotVisibilityManager.GetName()
} else if v.esVisibilityManager != nil {
return v.esVisibilityManager.GetName()
if v.primaryVisibilityManager != nil {
return v.primaryVisibilityManager.GetName()
} else if v.secondaryVisibilityManager != nil {
return v.secondaryVisibilityManager.GetName()
}
return v.dbVisibilityManager.GetName()
}
Expand All @@ -129,10 +131,10 @@ func (v *visibilityTripleManager) RecordWorkflowExecutionStarted(
return v.dbVisibilityManager.RecordWorkflowExecutionStarted(ctx, request)
},
func() error {
return v.esVisibilityManager.RecordWorkflowExecutionStarted(ctx, request)
return v.secondaryVisibilityManager.RecordWorkflowExecutionStarted(ctx, request)
},
func() error {
return v.pinotVisibilityManager.RecordWorkflowExecutionStarted(ctx, request)
return v.primaryVisibilityManager.RecordWorkflowExecutionStarted(ctx, request)
},
)
}
Expand All @@ -147,10 +149,10 @@ func (v *visibilityTripleManager) RecordWorkflowExecutionClosed(
return v.dbVisibilityManager.RecordWorkflowExecutionClosed(ctx, request)
},
func() error {
return v.esVisibilityManager.RecordWorkflowExecutionClosed(ctx, request)
return v.secondaryVisibilityManager.RecordWorkflowExecutionClosed(ctx, request)
},
func() error {
return v.pinotVisibilityManager.RecordWorkflowExecutionClosed(ctx, request)
return v.primaryVisibilityManager.RecordWorkflowExecutionClosed(ctx, request)
},
)
}
Expand All @@ -165,10 +167,10 @@ func (v *visibilityTripleManager) RecordWorkflowExecutionUninitialized(
return v.dbVisibilityManager.RecordWorkflowExecutionUninitialized(ctx, request)
},
func() error {
return v.esVisibilityManager.RecordWorkflowExecutionUninitialized(ctx, request)
return v.secondaryVisibilityManager.RecordWorkflowExecutionUninitialized(ctx, request)
},
func() error {
return v.pinotVisibilityManager.RecordWorkflowExecutionUninitialized(ctx, request)
return v.primaryVisibilityManager.RecordWorkflowExecutionUninitialized(ctx, request)
},
)
}
Expand All @@ -183,10 +185,10 @@ func (v *visibilityTripleManager) DeleteWorkflowExecution(
return v.dbVisibilityManager.DeleteWorkflowExecution(ctx, request)
},
func() error {
return v.esVisibilityManager.DeleteWorkflowExecution(ctx, request)
return v.secondaryVisibilityManager.DeleteWorkflowExecution(ctx, request)
},
func() error {
return v.pinotVisibilityManager.DeleteWorkflowExecution(ctx, request)
return v.primaryVisibilityManager.DeleteWorkflowExecution(ctx, request)
},
)
}
Expand All @@ -201,10 +203,10 @@ func (v *visibilityTripleManager) DeleteUninitializedWorkflowExecution(
return v.dbVisibilityManager.DeleteUninitializedWorkflowExecution(ctx, request)
},
func() error {
return v.esVisibilityManager.DeleteUninitializedWorkflowExecution(ctx, request)
return v.secondaryVisibilityManager.DeleteUninitializedWorkflowExecution(ctx, request)
},
func() error {
return v.pinotVisibilityManager.DeleteUninitializedWorkflowExecution(ctx, request)
return v.primaryVisibilityManager.DeleteUninitializedWorkflowExecution(ctx, request)
},
)
}
Expand All @@ -219,21 +221,21 @@ func (v *visibilityTripleManager) UpsertWorkflowExecution(
return v.dbVisibilityManager.UpsertWorkflowExecution(ctx, request)
},
func() error {
return v.esVisibilityManager.UpsertWorkflowExecution(ctx, request)
return v.secondaryVisibilityManager.UpsertWorkflowExecution(ctx, request)
},
func() error {
return v.pinotVisibilityManager.UpsertWorkflowExecution(ctx, request)
return v.primaryVisibilityManager.UpsertWorkflowExecution(ctx, request)
},
)
}

func (v *visibilityTripleManager) chooseVisibilityModeForAdmin() string {
switch {
case v.dbVisibilityManager != nil && v.esVisibilityManager != nil && v.pinotVisibilityManager != nil:
case v.dbVisibilityManager != nil && v.secondaryVisibilityManager != nil && v.primaryVisibilityManager != nil:
return common.AdvancedVisibilityWritingModeTriple
case v.dbVisibilityManager != nil && v.pinotVisibilityManager != nil:
case v.dbVisibilityManager != nil && v.primaryVisibilityManager != nil:
return common.AdvancedVisibilityWritingModeDual
case v.pinotVisibilityManager != nil:
case v.primaryVisibilityManager != nil:
return common.AdvancedVisibilityWritingModeOn
case v.dbVisibilityManager != nil:
return common.AdvancedVisibilityWritingModeOff
Expand All @@ -242,7 +244,7 @@ func (v *visibilityTripleManager) chooseVisibilityModeForAdmin() string {
}
}

func (v *visibilityTripleManager) chooseVisibilityManagerForWrite(ctx context.Context, dbVisFunc, esVisFunc, pinotVisFunc func() error) error {
func (v *visibilityTripleManager) chooseVisibilityManagerForWrite(ctx context.Context, dbVisFunc, secondaryVisFunc, primaryVisFunc func() error) error {
var writeMode string
if v.writeMode != nil {
writeMode = v.writeMode()
Expand All @@ -255,36 +257,36 @@ func (v *visibilityTripleManager) chooseVisibilityManagerForWrite(ctx context.Co

switch writeMode {
// only perform as triple manager during migration by setting write mode to triple,
// other time perform as a dual visibility manager of pinot and db
// other time perform as a dual visibility manager and db
case common.AdvancedVisibilityWritingModeOff:
if v.dbVisibilityManager != nil {
return dbVisFunc()
}
v.logger.Warn("basic visibility is not available to write, fall back to advanced visibility")
return pinotVisFunc()
return primaryVisFunc()
case common.AdvancedVisibilityWritingModeOn:
// this is the way to make it work for migration, will clean up after migration is done
// by default the AdvancedVisibilityWritingMode is set to ON for ES
// if we change this dynamic config before deployment, ES will stop working and block task processing
// we have to change it after deployment. But need to make sure double writes are working, so the only way is changing the behavior of this function
if v.pinotVisibilityManager != nil && v.esVisibilityManager != nil {
if err := esVisFunc(); err != nil {
if v.primaryVisibilityManager != nil && v.secondaryVisibilityManager != nil {
if err := secondaryVisFunc(); err != nil {
return err
}
return pinotVisFunc()
} else if v.pinotVisibilityManager != nil {
v.logger.Warn("ES visibility is not available to write, fall back to pinot visibility")
return pinotVisFunc()
} else if v.esVisibilityManager != nil {
v.logger.Warn("Pinot visibility is not available to write, fall back to es visibility")
return esVisFunc()
return primaryVisFunc()
} else if v.primaryVisibilityManager != nil {
v.logger.Warn("Secondary advanced visibility is not available to write, fall back to primary advanced visibility")
return primaryVisFunc()
} else if v.secondaryVisibilityManager != nil {
v.logger.Warn("Primary advanced visibility is not available to write, fall back to secondary advanced visibility")
return secondaryVisFunc()
} else {
v.logger.Warn("advanced visibility is not available to write, fall back to basic visibility")
return dbVisFunc()
}
case common.AdvancedVisibilityWritingModeDual:
if v.pinotVisibilityManager != nil {
if err := pinotVisFunc(); err != nil {
if v.primaryVisibilityManager != nil {
if err := primaryVisFunc(); err != nil {
return err
}
if v.dbVisibilityManager != nil {
Expand All @@ -296,11 +298,11 @@ func (v *visibilityTripleManager) chooseVisibilityManagerForWrite(ctx context.Co
v.logger.Warn("advanced visibility is not available to write")
return dbVisFunc()
case common.AdvancedVisibilityWritingModeTriple:
if v.pinotVisibilityManager != nil && v.esVisibilityManager != nil {
if err := pinotVisFunc(); err != nil {
if v.primaryVisibilityManager != nil && v.secondaryVisibilityManager != nil {
if err := primaryVisFunc(); err != nil {
return err
}
if err := esVisFunc(); err != nil {
if err := secondaryVisFunc(); err != nil {
return err
}
if v.dbVisibilityManager != nil {
Expand Down Expand Up @@ -330,7 +332,7 @@ type userParameters struct {
latestTime int64
}

// For Pinot Migration uses. It will be a temporary usage
// For Visibility Migration uses. It will be a temporary usage
// logUserQueryParameters will log user queries' parameters so that a comparator workflow can consume
func (v *visibilityTripleManager) logUserQueryParameters(userParam userParameters, domain string, override bool) {
// Don't log if it is not enabled
Expand All @@ -344,7 +346,7 @@ func (v *visibilityTripleManager) logUserQueryParameters(userParam userParameter
return
}

v.logger.Info("Logging user query parameters for Pinot/ES response comparator...",
v.logger.Info("Logging user query parameters for visibility migration response comparator...",
tag.OperationName(userParam.operation),
tag.WorkflowDomainName(userParam.domainName),
tag.WorkflowType(userParam.workflowType),
Expand All @@ -370,22 +372,19 @@ func (v *visibilityTripleManager) getShadowMgrForDoubleRead(domain string) Visib
return nil
}
// case1: when it is double read, and both advanced visibility are not available
if v.pinotVisibilityManager == nil && v.esVisibilityManager == nil {
return nil
}
// case2: when it is double read, and only one of advanced visibility is available
if v.pinotVisibilityManager == nil || v.esVisibilityManager == nil {
if v.primaryVisibilityManager == nil || v.secondaryVisibilityManager == nil {
return nil
}

// Valid cases:
// case3: when it is double read, and both advanced visibility are available, and read mode is from Pinot
if v.readModeIsFromPinot(domain) {
return v.esVisibilityManager
// case3: when it is double read, and both advanced visibility are available, and read mode is from Primary
if v.readModeIsFromPrimary(domain) {
return v.secondaryVisibilityManager
}
// case4: when it is double read, and both advanced visibility are available, and read mode is from ES
if v.readModeIsFromES(domain) {
return v.pinotVisibilityManager
// case4: when it is double read, and both advanced visibility are available, and read mode is from Secondary
if v.readModeIsFromSecondary(domain) {
return v.primaryVisibilityManager
}
// exclude all other cases
return nil
Expand Down Expand Up @@ -668,35 +667,35 @@ func (v *visibilityTripleManager) CountWorkflowExecutions(

func (v *visibilityTripleManager) chooseVisibilityManagerForRead(ctx context.Context, domain string) VisibilityManager {
if override := ctx.Value(ContextKey); override == VisibilityOverridePrimary {
v.logger.Info("Pinot Migration log: Primary visibility manager was chosen for read.")
return v.esVisibilityManager
v.logger.Info("Visibility Migration log: Primary visibility manager was chosen for read.")
return v.secondaryVisibilityManager
} else if override == VisibilityOverrideSecondary {
v.logger.Info("Pinot Migration log: Secondary visibility manager was chosen for read.")
return v.pinotVisibilityManager
v.logger.Info("Visibility Migration log: Secondary visibility manager was chosen for read.")
return v.primaryVisibilityManager
}

var visibilityMgr VisibilityManager
if v.readModeIsFromES(domain) {
if v.esVisibilityManager != nil {
visibilityMgr = v.esVisibilityManager
if v.readModeIsFromSecondary(domain) {
if v.secondaryVisibilityManager != nil {
visibilityMgr = v.secondaryVisibilityManager
} else {
visibilityMgr = v.dbVisibilityManager
v.logger.Warn("domain is configured to read from advanced visibility(ElasticSearch based) but it's not available, fall back to basic visibility",
v.logger.Warn("domain is configured to read from advanced visibility but it's not available, fall back to basic visibility",
tag.WorkflowDomainName(domain))
}
} else if v.readModeIsFromPinot(domain) {
if v.pinotVisibilityManager != nil {
visibilityMgr = v.pinotVisibilityManager
} else if v.readModeIsFromPrimary(domain) {
if v.primaryVisibilityManager != nil {
visibilityMgr = v.primaryVisibilityManager
} else {
visibilityMgr = v.dbVisibilityManager
v.logger.Warn("domain is configured to read from advanced visibility(Pinot based) but it's not available, fall back to basic visibility",
v.logger.Warn("domain is configured to read from advanced visibility but it's not available, fall back to basic visibility",
tag.WorkflowDomainName(domain))
}
} else {
if v.dbVisibilityManager != nil {
visibilityMgr = v.dbVisibilityManager
} else {
visibilityMgr = v.pinotVisibilityManager
visibilityMgr = v.primaryVisibilityManager
v.logger.Warn("domain is configured to read from basic visibility but it's not available, fall back to advanced visibility",
tag.WorkflowDomainName(domain))
}
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/visibility_triple_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,8 @@ func TestPinotTripleChooseVisibilityModeForAdmin(t *testing.T) {
nil, nil, nil, log.NewNoop())
tripleManager := mgr.(*visibilityTripleManager)
tripleManager.dbVisibilityManager = nil
tripleManager.pinotVisibilityManager = nil
tripleManager.esVisibilityManager = nil
tripleManager.primaryVisibilityManager = nil
tripleManager.secondaryVisibilityManager = nil
assert.Equal(t, "INVALID_ADMIN_MODE", tripleManager.chooseVisibilityModeForAdmin())
}

Expand Down
Loading