Skip to content

Commit

Permalink
switch catalog discovery and syncs to temporal (#2333)
Browse files Browse the repository at this point in the history
* add discover worker

* oops forgot to merge this

* add sync workflow

* add sync to job submitter

* working after these I mean

* make workerrun use shared path, run formatting
  • Loading branch information
jrhizor authored Mar 5, 2021
1 parent 4d95c5e commit c3d0442
Show file tree
Hide file tree
Showing 12 changed files with 351 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ void submitJob(Job job) {
// todo (cgardens) - this conditional goes away when all workers are run in temporal.
final WorkerRun workerRun;
if (job.getConfigType() == ConfigType.GET_SPEC || job.getConfigType() == ConfigType.CHECK_CONNECTION_SOURCE
|| job.getConfigType() == ConfigType.CHECK_CONNECTION_DESTINATION) {
|| job.getConfigType() == ConfigType.CHECK_CONNECTION_DESTINATION
|| job.getConfigType() == ConfigType.DISCOVER_SCHEMA
|| job.getConfigType() == ConfigType.SYNC) {
LOGGER.info("Using temporal runner.");
workerRun = temporalWorkerRunFactory.create(job);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import io.airbyte.workers.DefaultCheckConnectionWorker;
import io.airbyte.workers.JobStatus;
import io.airbyte.workers.OutputAndStatus;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.process.ProcessBuilderFactory;
Expand All @@ -48,7 +48,6 @@
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

@WorkflowInterface
public interface CheckConnectionWorkflow {
Expand All @@ -58,10 +57,9 @@ public interface CheckConnectionWorkflow {

class WorkflowImpl implements CheckConnectionWorkflow {

ActivityOptions options = ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofMinutes(2)) // todo
final ActivityOptions options = ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofHours(1))
.build();

private final CheckConnectionActivity activity = Workflow.newActivityStub(CheckConnectionActivity.class, options);

@Override
Expand Down Expand Up @@ -93,15 +91,8 @@ public CheckConnectionActivityImpl(ProcessBuilderFactory pbf, Path workspaceRoot

public JobOutput run(IntegrationLauncherConfig launcherConfig, StandardCheckConnectionInput connectionConfiguration) throws TemporalJobException {
try {
// todo (cgardens) - there are 2 sources of truth for job path. we need to reduce this down to one,
// once we are fully on temporal.
final Path jobRoot = workspaceRoot
.resolve(String.valueOf(launcherConfig.getJobId()))
.resolve(String.valueOf(launcherConfig.getAttemptId().intValue()));

MDC.put("job_id", String.valueOf(launcherConfig.getJobId()));
MDC.put("job_root", jobRoot.toString());
MDC.put("job_log_filename", WorkerConstants.LOG_FILENAME);
final Path jobRoot = WorkerUtils.getJobRoot(workspaceRoot, launcherConfig);
WorkerUtils.setJobMdc(jobRoot, launcherConfig.getJobId());

final IntegrationLauncher integrationLauncher =
new AirbyteIntegrationLauncher(launcherConfig.getJobId(), launcherConfig.getAttemptId().intValue(), launcherConfig.getDockerImage(), pbf);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.scheduler.temporal;

import com.google.common.base.Preconditions;
import io.airbyte.config.IntegrationLauncherConfig;
import io.airbyte.config.JobOutput;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.workers.DefaultDiscoverCatalogWorker;
import io.airbyte.workers.JobStatus;
import io.airbyte.workers.OutputAndStatus;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.process.ProcessBuilderFactory;
import io.airbyte.workers.protocols.airbyte.AirbyteStreamFactory;
import io.airbyte.workers.protocols.airbyte.DefaultAirbyteStreamFactory;
import io.airbyte.workers.wrappers.JobOutputDiscoverSchemaWorker;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import io.temporal.activity.ActivityOptions;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.nio.file.Path;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@WorkflowInterface
public interface DiscoverCatalogWorkflow {

@WorkflowMethod
JobOutput run(IntegrationLauncherConfig launcherConfig, StandardDiscoverCatalogInput config) throws TemporalJobException;

class WorkflowImpl implements DiscoverCatalogWorkflow {

final ActivityOptions options = ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofHours(2))
.build();
private final DiscoverCatalogActivity activity = Workflow.newActivityStub(DiscoverCatalogActivity.class, options);

@Override
public JobOutput run(IntegrationLauncherConfig launcherConfig, StandardDiscoverCatalogInput config) throws TemporalJobException {
return activity.run(launcherConfig, config);
}

}

@ActivityInterface
interface DiscoverCatalogActivity {

@ActivityMethod
JobOutput run(IntegrationLauncherConfig launcherConfig, StandardDiscoverCatalogInput config) throws TemporalJobException;

}

class DiscoverCatalogActivityImpl implements DiscoverCatalogActivity {

private static final Logger LOGGER = LoggerFactory.getLogger(DiscoverCatalogActivityImpl.class);

private final ProcessBuilderFactory pbf;
private final Path workspaceRoot;

public DiscoverCatalogActivityImpl(ProcessBuilderFactory pbf, Path workspaceRoot) {
this.pbf = pbf;
this.workspaceRoot = workspaceRoot;
}

public JobOutput run(IntegrationLauncherConfig launcherConfig, StandardDiscoverCatalogInput config) {
try {
final Path jobRoot = WorkerUtils.getJobRoot(workspaceRoot, launcherConfig);
WorkerUtils.setJobMdc(jobRoot, launcherConfig.getJobId());

final IntegrationLauncher integrationLauncher =
new AirbyteIntegrationLauncher(launcherConfig.getJobId(), launcherConfig.getAttemptId().intValue(), launcherConfig.getDockerImage(), pbf);
final AirbyteStreamFactory streamFactory = new DefaultAirbyteStreamFactory();

final OutputAndStatus<JobOutput> run =
new JobOutputDiscoverSchemaWorker(
new DefaultDiscoverCatalogWorker(integrationLauncher, streamFactory)).run(config, jobRoot);
if (run.getStatus() == JobStatus.SUCCEEDED) {
Preconditions.checkState(run.getOutput().isPresent());
LOGGER.info("job output {}", run.getOutput().get());
return run.getOutput().get();
} else {
throw new RuntimeException("Discover catalog worker completed with a FAILED status.");
}

} catch (Exception e) {
throw new RuntimeException("Discover catalog job failed with an exception", e);
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import io.airbyte.workers.DefaultGetSpecWorker;
import io.airbyte.workers.JobStatus;
import io.airbyte.workers.OutputAndStatus;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.process.ProcessBuilderFactory;
Expand All @@ -46,7 +46,6 @@
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

@WorkflowInterface
public interface SpecWorkflow {
Expand All @@ -56,10 +55,9 @@ public interface SpecWorkflow {

class WorkflowImpl implements SpecWorkflow {

ActivityOptions options = ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofMinutes(2)) // todo
final ActivityOptions options = ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofHours(1))
.build();

private final SpecActivity activity = Workflow.newActivityStub(SpecActivity.class, options);

@Override
Expand Down Expand Up @@ -91,15 +89,8 @@ public SpecActivityImpl(ProcessBuilderFactory pbf, Path workspaceRoot) {

public JobOutput run(IntegrationLauncherConfig launcherConfig) {
try {
// todo (cgardens) - there are 2 sources of truth for job path. we need to reduce this down to one,
// once we are fully on temporal.
final Path jobRoot = workspaceRoot
.resolve(String.valueOf(launcherConfig.getJobId()))
.resolve(String.valueOf(launcherConfig.getAttemptId().intValue()));

MDC.put("job_id", String.valueOf(launcherConfig.getJobId()));
MDC.put("job_root", jobRoot.toString());
MDC.put("job_log_filename", WorkerConstants.LOG_FILENAME);
final Path jobRoot = WorkerUtils.getJobRoot(workspaceRoot, launcherConfig);
WorkerUtils.setJobMdc(jobRoot, launcherConfig.getJobId());

final IntegrationLauncher integrationLauncher =
new AirbyteIntegrationLauncher(launcherConfig.getJobId(), launcherConfig.getAttemptId().intValue(), launcherConfig.getDockerImage(), pbf);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.scheduler.temporal;

import com.google.common.base.Preconditions;
import io.airbyte.config.JobOutput;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.workers.DefaultSyncWorker;
import io.airbyte.workers.JobStatus;
import io.airbyte.workers.OutputAndStatus;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.process.ProcessBuilderFactory;
import io.airbyte.workers.protocols.airbyte.AirbyteMessageTracker;
import io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination;
import io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource;
import io.airbyte.workers.protocols.airbyte.NamespacingMapper;
import io.airbyte.workers.wrappers.JobOutputSyncWorker;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import io.temporal.activity.ActivityOptions;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.nio.file.Path;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@WorkflowInterface
public interface SyncWorkflow {

@WorkflowMethod
JobOutput run(long jobId, long attemptId, String sourceDockerImage, String destinationDockerImage, StandardSyncInput syncInput)
throws TemporalJobException;

class WorkflowImpl implements SyncWorkflow {

final ActivityOptions options = ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofDays(3))
.build();
private final SyncActivity activity = Workflow.newActivityStub(SyncActivity.class, options);

@Override
public JobOutput run(long jobId, long attemptId, String sourceDockerImage, String destinationDockerImage, StandardSyncInput syncInput)
throws TemporalJobException {
return activity.run(jobId, attemptId, sourceDockerImage, destinationDockerImage, syncInput);
}

}

@ActivityInterface
interface SyncActivity {

@ActivityMethod
JobOutput run(long jobId, long attemptId, String sourceDockerImage, String destinationDockerImage, StandardSyncInput syncInput)
throws TemporalJobException;

}

class SyncActivityImpl implements SyncActivity {

private static final Logger LOGGER = LoggerFactory.getLogger(SyncActivityImpl.class);

private final ProcessBuilderFactory pbf;
private final Path workspaceRoot;

public SyncActivityImpl(ProcessBuilderFactory pbf, Path workspaceRoot) {
this.pbf = pbf;
this.workspaceRoot = workspaceRoot;
}

public JobOutput run(long jobId, long attemptId, String sourceDockerImage, String destinationDockerImage, StandardSyncInput syncInput) {
try {
final Path jobRoot = WorkerUtils.getJobRoot(workspaceRoot, jobId, attemptId);
WorkerUtils.setJobMdc(jobRoot, jobId);

final int intAttemptId = Math.toIntExact(attemptId);

final IntegrationLauncher sourceLauncher =
new AirbyteIntegrationLauncher(jobId, intAttemptId, sourceDockerImage, pbf);
final IntegrationLauncher destinationLauncher =
new AirbyteIntegrationLauncher(jobId, intAttemptId, destinationDockerImage, pbf);

final DefaultAirbyteSource airbyteSource = new DefaultAirbyteSource(sourceLauncher);

final OutputAndStatus<JobOutput> run =
new JobOutputSyncWorker(
new DefaultSyncWorker(
jobId,
intAttemptId,
airbyteSource,
new NamespacingMapper(syncInput.getDefaultNamespace()),
new DefaultAirbyteDestination(destinationLauncher),
new AirbyteMessageTracker(),
NormalizationRunnerFactory.create(
destinationDockerImage,
pbf,
syncInput.getDestinationConfiguration()))).run(syncInput, jobRoot);
if (run.getStatus() == JobStatus.SUCCEEDED) {
Preconditions.checkState(run.getOutput().isPresent());
LOGGER.info("job output {}", run.getOutput().get());
return run.getOutput().get();
} else {
throw new RuntimeException("Sync worker completed with a FAILED status.");
}

} catch (Exception e) {
throw new RuntimeException("Sync job failed with an exception", e);
}
}

}

}
Loading

0 comments on commit c3d0442

Please sign in to comment.