diff --git a/airbyte-config/models/src/main/resources/types/IntegrationLauncherConfig.yaml b/airbyte-config/models/src/main/resources/types/IntegrationLauncherConfig.yaml new file mode 100644 index 000000000000..146a460d0130 --- /dev/null +++ b/airbyte-config/models/src/main/resources/types/IntegrationLauncherConfig.yaml @@ -0,0 +1,18 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/IntegrationLauncherConfig.yaml +title: IntegrationLauncherConfig +description: integration launcher config +type: object +additionalProperties: false +required: + - jobId + - attemptId + - dockerImage +properties: + jobId: + type: integer + attemptId: + type: integer + dockerImage: + type: string diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/JobSubmitter.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/JobSubmitter.java index d75d2fde59dc..6cdd47f16c6d 100644 --- a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/JobSubmitter.java +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/JobSubmitter.java @@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableMap.Builder; import io.airbyte.analytics.TrackingClientSingleton; import io.airbyte.commons.concurrency.LifecycledCallable; +import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSyncSchedule; @@ -38,6 +39,7 @@ import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.scheduler.persistence.JobPersistence; +import io.airbyte.scheduler.temporal.TemporalWorkerRunFactory; import io.airbyte.scheduler.worker_run.SchedulerWorkerRunWithEnvironmentFactory; import io.airbyte.scheduler.worker_run.WorkerRun; import io.airbyte.validation.json.JsonValidationException; @@ -61,15 +63,18 @@ public class JobSubmitter implements Runnable { private final JobPersistence persistence; private final ConfigRepository configRepository; private final SchedulerWorkerRunWithEnvironmentFactory schedulerWorkerRunWithEnvironmentFactory; + private final TemporalWorkerRunFactory temporalWorkerRunFactory; public JobSubmitter(final ExecutorService threadPool, final JobPersistence persistence, final ConfigRepository configRepository, - final SchedulerWorkerRunWithEnvironmentFactory schedulerWorkerRunWithEnvironmentFactory) { + final SchedulerWorkerRunWithEnvironmentFactory schedulerWorkerRunWithEnvironmentFactory, + final TemporalWorkerRunFactory temporalWorkerRunFactory) { this.threadPool = threadPool; this.persistence = persistence; this.configRepository = configRepository; this.schedulerWorkerRunWithEnvironmentFactory = schedulerWorkerRunWithEnvironmentFactory; + this.temporalWorkerRunFactory = temporalWorkerRunFactory; } @Override @@ -93,7 +98,15 @@ public void run() { @VisibleForTesting void submitJob(Job job) { - final WorkerRun workerRun = schedulerWorkerRunWithEnvironmentFactory.create(job); + // todo (cgardens) - this conditional goes away when all workers are run in temporal. + final WorkerRun workerRun; + if (job.getConfigType() == ConfigType.GET_SPEC) { + LOGGER.info("Using temporal runner."); + workerRun = temporalWorkerRunFactory.create(job); + } else { + LOGGER.info("Using scheduler runner."); + workerRun = schedulerWorkerRunWithEnvironmentFactory.create(job); + } // we need to know the attempt number before we begin the job lifecycle. thus we state what the // attempt number should be. if it is not, that the lifecycle will fail. this should not happen as // long as job submission for a single job is single threaded. this is a compromise to allow the job diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerApp.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerApp.java index f0f6ac360dd8..01765ae8e8d2 100644 --- a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerApp.java +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerApp.java @@ -38,6 +38,10 @@ import io.airbyte.db.Databases; import io.airbyte.scheduler.persistence.DefaultJobPersistence; import io.airbyte.scheduler.persistence.JobPersistence; +import io.airbyte.scheduler.temporal.TemporalClient; +import io.airbyte.scheduler.temporal.TemporalPool; +import io.airbyte.scheduler.temporal.TemporalUtils; +import io.airbyte.scheduler.temporal.TemporalWorkerRunFactory; import io.airbyte.scheduler.worker_run.SchedulerWorkerRunWithEnvironmentFactory; import io.airbyte.workers.process.DockerProcessBuilderFactory; import io.airbyte.workers.process.KubeProcessBuilderFactory; @@ -92,13 +96,23 @@ public SchedulerApp(Path workspaceRoot, } public void start() throws IOException { + final TemporalPool temporalPool = new TemporalPool(workspaceRoot, pbf); + // todo (cgardens) - i do not need to set up a thread pool for this, right? + temporalPool.run(); + final TemporalClient temporalClient = new TemporalClient(TemporalUtils.TEMPORAL_CLIENT); + final ExecutorService workerThreadPool = Executors.newFixedThreadPool(MAX_WORKERS, THREAD_FACTORY); final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor(); final SchedulerWorkerRunWithEnvironmentFactory workerRunWithEnvironmentFactory = new SchedulerWorkerRunWithEnvironmentFactory(workspaceRoot, pbf); - + final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot); final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now); final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository); - final JobSubmitter jobSubmitter = new JobSubmitter(workerThreadPool, jobPersistence, configRepository, workerRunWithEnvironmentFactory); + final JobSubmitter jobSubmitter = new JobSubmitter( + workerThreadPool, + jobPersistence, + configRepository, + workerRunWithEnvironmentFactory, + temporalWorkerRunFactory); Map mdc = MDC.getCopyOfContextMap(); diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/temporal/SpecWorkflow.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/temporal/SpecWorkflow.java new file mode 100644 index 000000000000..efc5606be308 --- /dev/null +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/temporal/SpecWorkflow.java @@ -0,0 +1,142 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.scheduler.temporal; + +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.io.LineGobbler; +import io.airbyte.config.IntegrationLauncherConfig; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.workers.WorkerUtils; +import io.airbyte.workers.process.AirbyteIntegrationLauncher; +import io.airbyte.workers.process.IntegrationLauncher; +import io.airbyte.workers.process.ProcessBuilderFactory; +import io.airbyte.workers.protocols.airbyte.AirbyteStreamFactory; +import io.airbyte.workers.protocols.airbyte.DefaultAirbyteStreamFactory; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.activity.ActivityOptions; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.io.InputStream; +import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@WorkflowInterface +public interface SpecWorkflow { + + @WorkflowMethod + ConnectorSpecification run(IntegrationLauncherConfig launcherConfig); + + class WorkflowImpl implements SpecWorkflow { + + ActivityOptions options = ActivityOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofMinutes(2)) // todo + .build(); + + private final SpecActivity activity = Workflow.newActivityStub(SpecActivity.class, options); + + @Override + public ConnectorSpecification run(IntegrationLauncherConfig launcherConfig) { + return activity.run(launcherConfig); + } + + } + + @ActivityInterface + interface SpecActivity { + + @ActivityMethod + ConnectorSpecification run(IntegrationLauncherConfig launcherConfig); + + } + + class SpecActivityImpl implements SpecActivity { + + private static final Logger LOGGER = LoggerFactory.getLogger(SpecActivityImpl.class); + + private final ProcessBuilderFactory pbf; + private final Path workspaceRoot; + + public SpecActivityImpl(ProcessBuilderFactory pbf, Path workspaceRoot) { + this.pbf = pbf; + this.workspaceRoot = workspaceRoot; + } + + public ConnectorSpecification run(IntegrationLauncherConfig launcherConfig) { + try { + // todo (cgardens) - we need to find a way to standardize log paths sanely across all workflow. + // right now we have this in temporal workflow. + final Path jobRoot = workspaceRoot + .resolve("spec") + .resolve(launcherConfig.getDockerImage().replaceAll("[^A-Za-z0-9]", "")) + .resolve(String.valueOf(Instant.now().getEpochSecond())); + + final IntegrationLauncher integrationLauncher = + new AirbyteIntegrationLauncher(launcherConfig.getJobId(), launcherConfig.getAttemptId().intValue(), launcherConfig.getDockerImage(), pbf); + final Process process = integrationLauncher.spec(jobRoot).start(); + + LineGobbler.gobble(process.getErrorStream(), LOGGER::error); + + final AirbyteStreamFactory streamFactory = new DefaultAirbyteStreamFactory(); + + Optional spec; + try (InputStream stdout = process.getInputStream()) { + spec = streamFactory.create(IOs.newBufferedReader(stdout)) + .filter(message -> message.getType() == AirbyteMessage.Type.SPEC) + .map(AirbyteMessage::getSpec) + .findFirst(); + + // todo (cgardens) - let's pre-fetch the images outside of the worker so we don't need account for + // this. + // retrieving spec should generally be instantaneous, but since docker images might not be pulled + // it could take a while longer depending on internet conditions as well. + WorkerUtils.gentleClose(process, 30, TimeUnit.MINUTES); + } + + int exitCode = process.exitValue(); + if (exitCode == 0) { + if (spec.isEmpty()) { + throw new RuntimeException("Spec job failed to output a spec struct."); + } else { + return spec.get(); + } + } else { + throw new RuntimeException(String.format("Spec job subprocess finished with exit code %s", exitCode)); + } + } catch (Exception e) { + throw new RuntimeException("Spec job failed with an exception", e); + } + } + + } + +} diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/temporal/TemporalClient.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/temporal/TemporalClient.java new file mode 100644 index 000000000000..ed71b1b1afc7 --- /dev/null +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/temporal/TemporalClient.java @@ -0,0 +1,53 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.scheduler.temporal; + +import io.airbyte.config.IntegrationLauncherConfig; +import io.airbyte.config.JobGetSpecConfig; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.scheduler.temporal.TemporalUtils.TemporalJobType; +import io.temporal.client.WorkflowClient; + +public class TemporalClient { + + private final WorkflowClient client; + + public TemporalClient(WorkflowClient client) { + this.client = client; + } + + public ConnectorSpecification submitGetSpec(long jobId, int attempt, JobGetSpecConfig config) { + final IntegrationLauncherConfig integrationLauncherConfig = new IntegrationLauncherConfig() + .withJobId(jobId) + .withAttemptId((long) attempt) + .withDockerImage(config.getDockerImage()); + return getWorkflowStub(SpecWorkflow.class, TemporalJobType.GET_SPEC).run(integrationLauncherConfig); + } + + private T getWorkflowStub(Class workflowClass, TemporalJobType jobType) { + return client.newWorkflowStub(workflowClass, TemporalUtils.getWorkflowOptions(jobType)); + } + +} diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/temporal/TemporalPool.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/temporal/TemporalPool.java new file mode 100644 index 000000000000..3c0024f39aa6 --- /dev/null +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/temporal/TemporalPool.java @@ -0,0 +1,70 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.scheduler.temporal; + +import io.airbyte.scheduler.temporal.TemporalUtils.TemporalJobType; +import io.airbyte.workers.process.ProcessBuilderFactory; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import java.nio.file.Path; + +public class TemporalPool implements Runnable { + + private final Path workspaceRoot; + private final ProcessBuilderFactory pbf; + + public TemporalPool(Path workspaceRoot, ProcessBuilderFactory pbf) { + this.workspaceRoot = workspaceRoot; + this.pbf = pbf; + } + + @Override + public void run() { + WorkerFactory factory = WorkerFactory.newInstance(TemporalUtils.TEMPORAL_CLIENT); + + final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name()); + specWorker.registerWorkflowImplementationTypes(SpecWorkflow.WorkflowImpl.class); + specWorker.registerActivitiesImplementations(new SpecWorkflow.SpecActivityImpl(pbf, workspaceRoot)); + + // todo (cgardens) - these will come back once we use temporal for these workers. + // Worker discoverWorker = factory.newWorker(TemporalUtils.DISCOVER_WORKFLOW_QUEUE); + // discoverWorker.registerWorkflowImplementationTypes(DiscoverWorkflow.WorkflowImpl.class); + // discoverWorker.registerActivitiesImplementations(new DiscoverWorkflow.DiscoverActivityImpl(pbf, + // configs.getWorkspaceRoot())); + // + // Worker checkConnectionWorker = factory.newWorker(TemporalUtils.CHECK_CONNECTION_WORKFLOW_QUEUE); + // checkConnectionWorker.registerWorkflowImplementationTypes(CheckConnectionWorkflow.WorkflowImpl.class); + // checkConnectionWorker.registerActivitiesImplementations(new + // CheckConnectionWorkflow.CheckConnectionActivityImpl(pbf, configs.getWorkspaceRoot())); + // + // Worker syncWorker = factory.newWorker(TemporalUtils.SYNC_WORKFLOW_QUEUE); + // syncWorker.registerWorkflowImplementationTypes(SyncWorkflow.WorkflowImpl.class); + // syncWorker.registerActivitiesImplementations(new SyncWorkflow.SyncActivityImpl(pbf, + // configs.getWorkspaceRoot())); + + factory.start(); + } + +} diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/TemporalUtils.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/temporal/TemporalUtils.java similarity index 72% rename from airbyte-scheduler/src/main/java/io/airbyte/scheduler/TemporalUtils.java rename to airbyte-scheduler/src/main/java/io/airbyte/scheduler/temporal/TemporalUtils.java index fcfde4561540..f78a66004988 100644 --- a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/TemporalUtils.java +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/temporal/TemporalUtils.java @@ -22,15 +22,19 @@ * SOFTWARE. */ -package io.airbyte.scheduler; +package io.airbyte.scheduler.temporal; import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import java.io.Serializable; +import java.util.UUID; public class TemporalUtils { private static final WorkflowServiceStubsOptions TEMPORAL_OPTIONS = WorkflowServiceStubsOptions.newBuilder() + // todo move to env. .setTarget("temporal:7233") .build(); @@ -38,4 +42,25 @@ public class TemporalUtils { public static final WorkflowClient TEMPORAL_CLIENT = WorkflowClient.newInstance(TEMPORAL_SERVICE); + @FunctionalInterface + public interface TemporalJobCreator { + + UUID create(WorkflowClient workflowClient, long jobId, int attempt, T config); + + } + + static enum TemporalJobType { + GET_SPEC, + CHECK_CONNECTION, + DISCOVER_SCHEMA, + SYNC, + RESET_CONNECTION + } + + public static WorkflowOptions getWorkflowOptions(TemporalJobType jobType) { + return WorkflowOptions.newBuilder() + .setTaskQueue(jobType.name()) + .build(); + } + } diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/temporal/TemporalWorkerRunFactory.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/temporal/TemporalWorkerRunFactory.java new file mode 100644 index 000000000000..825e0c340e77 --- /dev/null +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/temporal/TemporalWorkerRunFactory.java @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.scheduler.temporal; + +import io.airbyte.commons.enums.Enums; +import io.airbyte.commons.functional.CheckedSupplier; +import io.airbyte.config.JobOutput; +import io.airbyte.config.StandardGetSpecOutput; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.scheduler.Job; +import io.airbyte.scheduler.temporal.TemporalUtils.TemporalJobType; +import io.airbyte.scheduler.worker_run.WorkerRun; +import io.airbyte.workers.JobStatus; +import io.airbyte.workers.OutputAndStatus; +import java.nio.file.Path; + +public class TemporalWorkerRunFactory { + + private final TemporalClient temporalClient; + private final Path workspaceRoot; + + public TemporalWorkerRunFactory(TemporalClient temporalClient, Path workspaceRoot) { + this.temporalClient = temporalClient; + this.workspaceRoot = workspaceRoot; + } + + public WorkerRun create(Job job) { + final int attemptId = job.getAttempts().size(); + return WorkerRun.create(workspaceRoot, job.getId(), attemptId, createSupplier(job, attemptId)); + } + + public CheckedSupplier, Exception> createSupplier(Job job, int attemptId) { + final TemporalJobType temporalJobType = Enums.convertTo(job.getConfigType(), TemporalJobType.class); + return switch (job.getConfigType()) { + case GET_SPEC -> () -> { + final ConnectorSpecification connectorSpecification = temporalClient + .submitGetSpec(job.getId(), attemptId, job.getConfig().getGetSpec()); + final JobOutput jobOutput = new JobOutput().withGetSpec(new StandardGetSpecOutput().withSpecification(connectorSpecification)); + return new OutputAndStatus<>(JobStatus.SUCCEEDED, jobOutput); + }; + default -> throw new IllegalArgumentException("Does not support job type: " + temporalJobType); + }; + } + +} diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/WorkerRun.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/WorkerRun.java index 9e96f4571597..7024cbf1363b 100644 --- a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/WorkerRun.java +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/WorkerRun.java @@ -46,11 +46,19 @@ public class WorkerRun implements Callable> { private final Path jobRoot; private final CheckedSupplier, Exception> workerRun; - public WorkerRun(final Path jobRoot, - final InputType input, - final Worker worker) { + public static WorkerRun create(Path workspaceRoot, long jobId, int attempt, CheckedSupplier, Exception> workerRun) { + final Path jobRoot = workspaceRoot.resolve(String.valueOf(jobId)).resolve(String.valueOf(attempt)); + return new WorkerRun(jobRoot, workerRun); + } + + // todo (cgardens) - remove this once the scheduler worker is dead. + public WorkerRun(final Path jobRoot, final InputType input, final Worker worker) { + this(jobRoot, () -> worker.run(input, jobRoot)); + } + + public WorkerRun(final Path jobRoot, final CheckedSupplier, Exception> workerRun) { this.jobRoot = jobRoot; - this.workerRun = () -> worker.run(input, jobRoot); + this.workerRun = workerRun; } @Override diff --git a/airbyte-scheduler/src/test/java/io/airbyte/scheduler/JobSubmitterTest.java b/airbyte-scheduler/src/test/java/io/airbyte/scheduler/JobSubmitterTest.java index ce04bdefcbc7..bade6af0ff33 100644 --- a/airbyte-scheduler/src/test/java/io/airbyte/scheduler/JobSubmitterTest.java +++ b/airbyte-scheduler/src/test/java/io/airbyte/scheduler/JobSubmitterTest.java @@ -46,6 +46,7 @@ import io.airbyte.config.JobOutput; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.scheduler.persistence.JobPersistence; +import io.airbyte.scheduler.temporal.TemporalWorkerRunFactory; import io.airbyte.scheduler.worker_run.SchedulerWorkerRunWithEnvironmentFactory; import io.airbyte.scheduler.worker_run.WorkerRun; import io.airbyte.workers.JobStatus; @@ -101,7 +102,8 @@ public void setup() throws IOException { MoreExecutors.newDirectExecutorService(), persistence, configRepository, - schedulerWorkerRunWithEnvironmentFactory)); + schedulerWorkerRunWithEnvironmentFactory, + mock(TemporalWorkerRunFactory.class))); // by default, turn off the internals of the tracking code. we will test it separate below. doNothing().when(jobSubmitter).trackSubmission(any());