From 4b561fe8e67f99e99ad98ea7cdaa465f297b36ff Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Fri, 19 Jul 2024 09:53:38 -0700 Subject: [PATCH] chore: don't run discovery if it is a clear (#13176) --- .../temporal/sync/SyncWorkflowImpl.java | 26 ++++++++++++------- .../temporal/sync/SyncWorkflowTest.java | 24 +++++++++++++++-- 2 files changed, 39 insertions(+), 11 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index 3a7edf65def..cab0b18822e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -97,7 +97,7 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, final var useWorkloadOutputDocStore = checkUseWorkloadOutputFlag(syncInput); final var sendRunTimeMetrics = shouldReportRuntime(); final var shouldRunAsChildWorkflow = shouldRunAsAChildWorkflow(connectionId, syncInput.getWorkspaceId(), - syncInput.getConnectionContext().getSourceDefinitionId()); + syncInput.getConnectionContext().getSourceDefinitionId(), syncInput.getIsReset()); ApmTraceUtils .addTagsToTrace(Map.of( @@ -230,17 +230,25 @@ public RefreshSchemaActivityOutput runDiscoverAsChildWorkflow(final JobRunConfig } } - private boolean shouldRunAsAChildWorkflow(final UUID connectionId, final UUID workspaceId, final UUID sourceDefinitionId) { - final int shouldRunAsChildWorkflowVersion = Workflow.getVersion("SHOULD_RUN_AS_CHILD", Workflow.DEFAULT_VERSION, 1); - + private boolean shouldRunAsAChildWorkflow(final UUID connectionId, final UUID workspaceId, final UUID sourceDefinitionId, final boolean isReset) { + final int shouldRunAsChildWorkflowVersion = Workflow.getVersion("SHOULD_RUN_AS_CHILD", Workflow.DEFAULT_VERSION, 2); + final int versionWithoutResetCheck = 1; if (shouldRunAsChildWorkflowVersion == Workflow.DEFAULT_VERSION) { return false; + } else if (shouldRunAsChildWorkflowVersion == versionWithoutResetCheck) { + return checkUseWorkloadApiFlag(workspaceId) + && syncFeatureFlagFetcherActivity.shouldRunAsChildWorkflow(new SyncFeatureFlagFetcherInput( + Optional.ofNullable(connectionId).orElse(DEFAULT_UUID), + sourceDefinitionId, + workspaceId)); + } else { + return !isReset && checkUseWorkloadApiFlag(workspaceId) + && syncFeatureFlagFetcherActivity.shouldRunAsChildWorkflow(new SyncFeatureFlagFetcherInput( + Optional.ofNullable(connectionId).orElse(DEFAULT_UUID), + sourceDefinitionId, + workspaceId)); } - return checkUseWorkloadApiFlag(workspaceId) - && syncFeatureFlagFetcherActivity.shouldRunAsChildWorkflow(new SyncFeatureFlagFetcherInput( - Optional.ofNullable(connectionId).orElse(DEFAULT_UUID), - sourceDefinitionId, - workspaceId)); + } private boolean shouldReportRuntime() { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index 215b14471f3..aea719da11f 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -222,8 +222,12 @@ public void tearDown() { testEnv.close(); } - // bundle up all the temporal worker setup / execution into one method. private StandardSyncOutput execute() { + return execute(false); + } + + // bundle up all the temporal worker setup / execution into one method. + private StandardSyncOutput execute(final boolean isReset) { syncWorker.registerActivitiesImplementations(replicationActivity, webhookOperationActivity, refreshSchemaActivity, @@ -236,7 +240,7 @@ private StandardSyncOutput execute() { final SyncWorkflow workflow = client.newWorkflowStub(SyncWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(SYNC_QUEUE).build()); - return workflow.run(JOB_RUN_CONFIG, SOURCE_LAUNCHER_CONFIG, DESTINATION_LAUNCHER_CONFIG, syncInput, sync.getConnectionId()); + return workflow.run(JOB_RUN_CONFIG, SOURCE_LAUNCHER_CONFIG, DESTINATION_LAUNCHER_CONFIG, syncInput.withIsReset(isReset), sync.getConnectionId()); } @Test @@ -270,6 +274,22 @@ void testSuccessWithChildWorkflow() { removeRefreshTime(actualOutput.getStandardSyncSummary())); } + @Test + void testNoChildWorkflowWithReset() { + doReturn(replicationSuccessOutput).when(replicationActivity).replicateV2(any()); + doReturn(true).when(workloadFeatureFlagActivity).useWorkloadApi(any()); + doReturn(true).when(syncFeatureFlagFetcherActivity).shouldRunAsChildWorkflow(any()); + + final StandardSyncOutput actualOutput = execute(true); + + verifyReplication(replicationActivity, syncInput, true, false, null); + verifyShouldRefreshSchema(refreshSchemaActivity); + verify(reportRunTimeActivity).reportRunTime(any()); + assertEquals( + replicationSuccessOutput.getStandardSyncSummary(), + removeRefreshTime(actualOutput.getStandardSyncSummary())); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void passesThroughFFCall(final boolean useWorkloadApi) throws Exception {