Skip to content

Commit

Permalink
chore: don't run discovery if it is a clear (#13176)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Jul 19, 2024
1 parent 041fa95 commit 4b561fe
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
final var useWorkloadOutputDocStore = checkUseWorkloadOutputFlag(syncInput);
final var sendRunTimeMetrics = shouldReportRuntime();
final var shouldRunAsChildWorkflow = shouldRunAsAChildWorkflow(connectionId, syncInput.getWorkspaceId(),
syncInput.getConnectionContext().getSourceDefinitionId());
syncInput.getConnectionContext().getSourceDefinitionId(), syncInput.getIsReset());

ApmTraceUtils
.addTagsToTrace(Map.of(
Expand Down Expand Up @@ -230,17 +230,25 @@ public RefreshSchemaActivityOutput runDiscoverAsChildWorkflow(final JobRunConfig
}
}

private boolean shouldRunAsAChildWorkflow(final UUID connectionId, final UUID workspaceId, final UUID sourceDefinitionId) {
final int shouldRunAsChildWorkflowVersion = Workflow.getVersion("SHOULD_RUN_AS_CHILD", Workflow.DEFAULT_VERSION, 1);

private boolean shouldRunAsAChildWorkflow(final UUID connectionId, final UUID workspaceId, final UUID sourceDefinitionId, final boolean isReset) {
final int shouldRunAsChildWorkflowVersion = Workflow.getVersion("SHOULD_RUN_AS_CHILD", Workflow.DEFAULT_VERSION, 2);
final int versionWithoutResetCheck = 1;
if (shouldRunAsChildWorkflowVersion == Workflow.DEFAULT_VERSION) {
return false;
} else if (shouldRunAsChildWorkflowVersion == versionWithoutResetCheck) {
return checkUseWorkloadApiFlag(workspaceId)
&& syncFeatureFlagFetcherActivity.shouldRunAsChildWorkflow(new SyncFeatureFlagFetcherInput(
Optional.ofNullable(connectionId).orElse(DEFAULT_UUID),
sourceDefinitionId,
workspaceId));
} else {
return !isReset && checkUseWorkloadApiFlag(workspaceId)
&& syncFeatureFlagFetcherActivity.shouldRunAsChildWorkflow(new SyncFeatureFlagFetcherInput(
Optional.ofNullable(connectionId).orElse(DEFAULT_UUID),
sourceDefinitionId,
workspaceId));
}
return checkUseWorkloadApiFlag(workspaceId)
&& syncFeatureFlagFetcherActivity.shouldRunAsChildWorkflow(new SyncFeatureFlagFetcherInput(
Optional.ofNullable(connectionId).orElse(DEFAULT_UUID),
sourceDefinitionId,
workspaceId));

}

private boolean shouldReportRuntime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,12 @@ public void tearDown() {
testEnv.close();
}

// bundle up all the temporal worker setup / execution into one method.
private StandardSyncOutput execute() {
return execute(false);
}

// bundle up all the temporal worker setup / execution into one method.
private StandardSyncOutput execute(final boolean isReset) {
syncWorker.registerActivitiesImplementations(replicationActivity,
webhookOperationActivity,
refreshSchemaActivity,
Expand All @@ -236,7 +240,7 @@ private StandardSyncOutput execute() {
final SyncWorkflow workflow =
client.newWorkflowStub(SyncWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(SYNC_QUEUE).build());

return workflow.run(JOB_RUN_CONFIG, SOURCE_LAUNCHER_CONFIG, DESTINATION_LAUNCHER_CONFIG, syncInput, sync.getConnectionId());
return workflow.run(JOB_RUN_CONFIG, SOURCE_LAUNCHER_CONFIG, DESTINATION_LAUNCHER_CONFIG, syncInput.withIsReset(isReset), sync.getConnectionId());
}

@Test
Expand Down Expand Up @@ -270,6 +274,22 @@ void testSuccessWithChildWorkflow() {
removeRefreshTime(actualOutput.getStandardSyncSummary()));
}

@Test
void testNoChildWorkflowWithReset() {
doReturn(replicationSuccessOutput).when(replicationActivity).replicateV2(any());
doReturn(true).when(workloadFeatureFlagActivity).useWorkloadApi(any());
doReturn(true).when(syncFeatureFlagFetcherActivity).shouldRunAsChildWorkflow(any());

final StandardSyncOutput actualOutput = execute(true);

verifyReplication(replicationActivity, syncInput, true, false, null);
verifyShouldRefreshSchema(refreshSchemaActivity);
verify(reportRunTimeActivity).reportRunTime(any());
assertEquals(
replicationSuccessOutput.getStandardSyncSummary(),
removeRefreshTime(actualOutput.getStandardSyncSummary()));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void passesThroughFFCall(final boolean useWorkloadApi) throws Exception {
Expand Down

0 comments on commit 4b561fe

Please sign in to comment.