Skip to content

Commit

Permalink
fix: avoid uncatched exception (#13137)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Jul 18, 2024
1 parent 763603f commit 993395c
Show file tree
Hide file tree
Showing 11 changed files with 276 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,19 @@ class WorkloadClient(private val workloadApiClient: WorkloadApiClient, private v
fun waitForWorkload(
workloadId: String,
pollingFrequencyInSeconds: Int,
) {
waitForWorkload(workloadId, pollingFrequencyInSeconds) {}
}

fun waitForWorkload(
workloadId: String,
pollingFrequencyInSeconds: Int,
loopingAction: () -> Unit,
) {
try {
var workload = workloadApiClient.workloadApi.workloadGet(workloadId)
while (!isWorkloadTerminal(workload)) {
loopingAction()
Thread.sleep(pollingFrequencyInSeconds.seconds.inWholeMilliseconds)
workload = workloadApiClient.workloadApi.workloadGet(workloadId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogActivity;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogHelperActivity;
import io.airbyte.workers.temporal.scheduling.activities.AppendToAttemptLogActivity;
import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity;
import io.airbyte.workers.temporal.scheduling.activities.CheckRunProgressActivity;
Expand Down Expand Up @@ -96,8 +97,9 @@ public PayloadChecker payloadChecker(final MetricClient metricClient) {
@Singleton
@Named("discoverActivities")
public List<Object> discoverActivities(
final DiscoverCatalogActivity discoverCatalogActivity) {
return List.of(discoverCatalogActivity);
final DiscoverCatalogActivity discoverCatalogActivity,
final DiscoverCatalogHelperActivity discoverCatalogHelperActivity) {
return List.of(discoverCatalogActivity, discoverCatalogHelperActivity);
}

@Singleton
Expand Down Expand Up @@ -142,6 +144,18 @@ public ActivityOptions discoveryActivityOptions(@Property(name = "airbyte.activi
.build();
}

@Singleton
@Named("discoveryActivityOptionsWithRetry")
public ActivityOptions discoveryActivityOptionsWithRetry(@Property(name = "airbyte.activity.discovery-timeout",
defaultValue = "30") final Integer discoveryTimeoutMinutes,
@Named("shortRetryOptions") final RetryOptions retryOptions) {
return ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofMinutes(discoveryTimeoutMinutes))
.setRetryOptions(retryOptions)
.setHeartbeatTimeout(TemporalConstants.HEARTBEAT_TIMEOUT)
.build();
}

@Singleton
@Named("refreshSchemaActivityOptions")
public ActivityOptions refreshSchemaActivityOptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.models.DiscoverCatalogInput;
import io.airbyte.workers.models.PostprocessCatalogInput;
import io.airbyte.workers.models.PostprocessCatalogOutput;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import java.util.UUID;

/**
* DiscoverCatalogActivity.
Expand All @@ -30,20 +27,4 @@ ConnectorJobOutput run(JobRunConfig jobRunConfig,
@ActivityMethod
ConnectorJobOutput runWithWorkload(final DiscoverCatalogInput input) throws WorkerException;

@ActivityMethod
boolean shouldUseWorkload(final UUID workspaceId);

@ActivityMethod
void reportSuccess(final Boolean workloadEnabled);

@ActivityMethod
void reportFailure(final Boolean workloadEnabled);

/**
* Perform catalog diffing, subsequent disabling of the connection and any other necessary
* operations after performing the discover.
*/
@ActivityMethod
PostprocessCatalogOutput postprocess(final PostprocessCatalogInput input);

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.client.model.generated.Geography;
import io.airbyte.api.client.model.generated.PostprocessDiscoveredCatalogRequestBody;
import io.airbyte.api.client.model.generated.PostprocessDiscoveredCatalogResult;
import io.airbyte.api.client.model.generated.ScopeType;
import io.airbyte.api.client.model.generated.SecretPersistenceConfig;
import io.airbyte.api.client.model.generated.SecretPersistenceConfigGetRequestBody;
Expand All @@ -43,33 +41,24 @@
import io.airbyte.config.helpers.ResourceRequirementsUtils;
import io.airbyte.config.secrets.SecretsRepositoryReader;
import io.airbyte.config.secrets.persistence.RuntimeSecretPersistence;
import io.airbyte.featureflag.Empty;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.Organization;
import io.airbyte.featureflag.UseRuntimeSecretPersistence;
import io.airbyte.featureflag.UseWorkloadApi;
import io.airbyte.featureflag.WorkloadApiServerEnabled;
import io.airbyte.featureflag.WorkloadCheckFrequencyInSeconds;
import io.airbyte.featureflag.WorkloadLauncherEnabled;
import io.airbyte.featureflag.Workspace;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.Worker;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.general.DefaultDiscoverCatalogWorker;
import io.airbyte.workers.helper.CatalogDiffConverter;
import io.airbyte.workers.helper.GsonPksExtractor;
import io.airbyte.workers.helper.SecretPersistenceConfigHelper;
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.VersionedAirbyteStreamFactory;
import io.airbyte.workers.models.DiscoverCatalogInput;
import io.airbyte.workers.models.PostprocessCatalogInput;
import io.airbyte.workers.models.PostprocessCatalogOutput;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.process.Metadata;
Expand All @@ -92,7 +81,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -249,7 +237,12 @@ public ConnectorJobOutput runWithWorkload(final DiscoverCatalogInput input) thro

final int checkFrequencyInSeconds =
featureFlagClient.intVariation(WorkloadCheckFrequencyInSeconds.INSTANCE, new Workspace(workspaceId));
workloadClient.waitForWorkload(workloadId, checkFrequencyInSeconds);

final ActivityExecutionContext context = getActivityContext();
workloadClient.waitForWorkload(workloadId, checkFrequencyInSeconds, () -> {
context.heartbeat("waiting for workload to complete");
return null;
});

return workloadClient.getConnectorJobOutput(
workloadId,
Expand All @@ -259,49 +252,9 @@ public ConnectorJobOutput runWithWorkload(final DiscoverCatalogInput input) thro
.withFailureReason(failureReason));
}

@Override
public boolean shouldUseWorkload(final UUID workspaceId) {
var ffCheck = featureFlagClient.boolVariation(UseWorkloadApi.INSTANCE, new Workspace(workspaceId));
var envCheck = featureFlagClient.boolVariation(WorkloadLauncherEnabled.INSTANCE, Empty.INSTANCE)
&& featureFlagClient.boolVariation(WorkloadApiServerEnabled.INSTANCE, Empty.INSTANCE);

return ffCheck || envCheck;
}

@Override
public void reportSuccess(final Boolean workloadEnabled) {
final var workloadEnabledStr = workloadEnabled != null ? workloadEnabled.toString() : "unknown";
metricClient.count(OssMetricsRegistry.CATALOG_DISCOVERY, 1,
new MetricAttribute(MetricTags.STATUS, "success"),
new MetricAttribute("workload_enabled", workloadEnabledStr));
}

@Override
public void reportFailure(final Boolean workloadEnabled) {
final var workloadEnabledStr = workloadEnabled != null ? workloadEnabled.toString() : "unknown";
metricClient.count(OssMetricsRegistry.CATALOG_DISCOVERY, 1,
new MetricAttribute(MetricTags.STATUS, "failed"),
new MetricAttribute("workload_enabled", workloadEnabledStr));
}

@Override
public PostprocessCatalogOutput postprocess(final PostprocessCatalogInput input) {
try {
Objects.requireNonNull(input.getConnectionId());
Objects.requireNonNull(input.getCatalogId());

final var reqBody = new PostprocessDiscoveredCatalogRequestBody(
input.getCatalogId(),
input.getConnectionId());

final PostprocessDiscoveredCatalogResult resp = airbyteApiClient.getConnectionApi().postprocessDiscoveredCatalogForConnection(reqBody);

final var domainDiff = resp.getAppliedDiff() != null ? CatalogDiffConverter.toDomain(resp.getAppliedDiff()) : null;

return PostprocessCatalogOutput.Companion.success(domainDiff);
} catch (final Exception e) {
return PostprocessCatalogOutput.Companion.failure(e);
}
@VisibleForTesting
ActivityExecutionContext getActivityContext() {
return Activity.getExecutionContext();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.discover.catalog;

import io.airbyte.workers.models.PostprocessCatalogInput;
import io.airbyte.workers.models.PostprocessCatalogOutput;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import java.util.UUID;

@ActivityInterface
public interface DiscoverCatalogHelperActivity {

@ActivityMethod
boolean shouldUseWorkload(final UUID workspaceId);

@ActivityMethod
void reportSuccess(final Boolean workloadEnabled);

@ActivityMethod
void reportFailure(final Boolean workloadEnabled);

/**
* Perform catalog diffing, subsequent disabling of the connection and any other necessary
* operations after performing the discover.
*/
@ActivityMethod
PostprocessCatalogOutput postprocess(final PostprocessCatalogInput input);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.discover.catalog;

import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.model.generated.PostprocessDiscoveredCatalogRequestBody;
import io.airbyte.api.client.model.generated.PostprocessDiscoveredCatalogResult;
import io.airbyte.featureflag.Empty;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.UseWorkloadApi;
import io.airbyte.featureflag.WorkloadApiServerEnabled;
import io.airbyte.featureflag.WorkloadLauncherEnabled;
import io.airbyte.featureflag.Workspace;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.workers.helper.CatalogDiffConverter;
import io.airbyte.workers.models.PostprocessCatalogInput;
import io.airbyte.workers.models.PostprocessCatalogOutput;
import jakarta.inject.Singleton;
import java.util.Objects;
import java.util.UUID;

@Singleton
public class DiscoverCatalogHelperActivityImpl implements DiscoverCatalogHelperActivity {

private final AirbyteApiClient airbyteApiClient;
private final FeatureFlagClient featureFlagClient;
private final MetricClient metricClient;

public DiscoverCatalogHelperActivityImpl(AirbyteApiClient airbyteApiClient, FeatureFlagClient featureFlagClient, MetricClient metricClient) {
this.airbyteApiClient = airbyteApiClient;
this.featureFlagClient = featureFlagClient;
this.metricClient = metricClient;
}

@Override
public boolean shouldUseWorkload(final UUID workspaceId) {
var ffCheck = featureFlagClient.boolVariation(UseWorkloadApi.INSTANCE, new Workspace(workspaceId));
var envCheck = featureFlagClient.boolVariation(WorkloadLauncherEnabled.INSTANCE, Empty.INSTANCE)
&& featureFlagClient.boolVariation(WorkloadApiServerEnabled.INSTANCE, Empty.INSTANCE);

return ffCheck || envCheck;
}

@Override
public void reportSuccess(final Boolean workloadEnabled) {
final var workloadEnabledStr = workloadEnabled != null ? workloadEnabled.toString() : "unknown";
metricClient.count(OssMetricsRegistry.CATALOG_DISCOVERY, 1,
new MetricAttribute(MetricTags.STATUS, "success"),
new MetricAttribute("workload_enabled", workloadEnabledStr));
}

@Override
public void reportFailure(final Boolean workloadEnabled) {
final var workloadEnabledStr = workloadEnabled != null ? workloadEnabled.toString() : "unknown";
metricClient.count(OssMetricsRegistry.CATALOG_DISCOVERY, 1,
new MetricAttribute(MetricTags.STATUS, "failed"),
new MetricAttribute("workload_enabled", workloadEnabledStr));
}

@Override
public PostprocessCatalogOutput postprocess(final PostprocessCatalogInput input) {
try {
Objects.requireNonNull(input.getConnectionId());
Objects.requireNonNull(input.getCatalogId());

final var reqBody = new PostprocessDiscoveredCatalogRequestBody(
input.getCatalogId(),
input.getConnectionId());

final PostprocessDiscoveredCatalogResult resp = airbyteApiClient.getConnectionApi().postprocessDiscoveredCatalogForConnection(reqBody);

final var domainDiff = resp.getAppliedDiff() != null ? CatalogDiffConverter.toDomain(resp.getAppliedDiff()) : null;

return PostprocessCatalogOutput.Companion.success(domainDiff);
} catch (final Exception e) {
return PostprocessCatalogOutput.Companion.failure(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class DiscoverCatalogWorkflowImpl implements DiscoverCatalogWorkflow {

@TemporalActivityStub(activityOptionsBeanName = "discoveryActivityOptions")
private DiscoverCatalogActivity activity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private DiscoverCatalogHelperActivity reportActivity;

@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
@Override
Expand All @@ -48,17 +50,17 @@ public ConnectorJobOutput run(final JobRunConfig jobRunConfig,
result = activity.runWithWorkload(new DiscoverCatalogInput(
jobRunConfig, launcherConfig, config));
} catch (WorkerException e) {
activity.reportFailure(true);
reportActivity.reportFailure(true);
throw new RuntimeException(e);
}
} else {
result = activity.run(jobRunConfig, launcherConfig, config);
}

if (result.getDiscoverCatalogId() != null) {
activity.reportSuccess(shouldRunWithWorkload);
reportActivity.reportSuccess(shouldRunWithWorkload);
} else {
activity.reportFailure(shouldRunWithWorkload);
reportActivity.reportFailure(shouldRunWithWorkload);
}

return result;
Expand All @@ -72,7 +74,7 @@ private boolean checkUseWorkloadApiFlag(final UUID workspaceId) {
return false;
}

return activity.shouldUseWorkload(workspaceId);
return reportActivity.shouldUseWorkload(workspaceId);
}

}
Loading

0 comments on commit 993395c

Please sign in to comment.