Skip to content

Commit

Permalink
run get_spec on temporal (#2299)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Mar 4, 2021
1 parent 84eb4e3 commit f0778e0
Show file tree
Hide file tree
Showing 10 changed files with 422 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/IntegrationLauncherConfig.yaml
title: IntegrationLauncherConfig
description: integration launcher config
type: object
additionalProperties: false
required:
- jobId
- attemptId
- dockerImage
properties:
jobId:
type: integer
attemptId:
type: integer
dockerImage:
type: string
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
import com.google.common.collect.ImmutableMap.Builder;
import io.airbyte.analytics.TrackingClientSingleton;
import io.airbyte.commons.concurrency.LifecycledCallable;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSyncSchedule;
import io.airbyte.config.helpers.ScheduleHelpers;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.temporal.TemporalWorkerRunFactory;
import io.airbyte.scheduler.worker_run.SchedulerWorkerRunWithEnvironmentFactory;
import io.airbyte.scheduler.worker_run.WorkerRun;
import io.airbyte.validation.json.JsonValidationException;
Expand All @@ -61,15 +63,18 @@ public class JobSubmitter implements Runnable {
private final JobPersistence persistence;
private final ConfigRepository configRepository;
private final SchedulerWorkerRunWithEnvironmentFactory schedulerWorkerRunWithEnvironmentFactory;
private final TemporalWorkerRunFactory temporalWorkerRunFactory;

public JobSubmitter(final ExecutorService threadPool,
final JobPersistence persistence,
final ConfigRepository configRepository,
final SchedulerWorkerRunWithEnvironmentFactory schedulerWorkerRunWithEnvironmentFactory) {
final SchedulerWorkerRunWithEnvironmentFactory schedulerWorkerRunWithEnvironmentFactory,
final TemporalWorkerRunFactory temporalWorkerRunFactory) {
this.threadPool = threadPool;
this.persistence = persistence;
this.configRepository = configRepository;
this.schedulerWorkerRunWithEnvironmentFactory = schedulerWorkerRunWithEnvironmentFactory;
this.temporalWorkerRunFactory = temporalWorkerRunFactory;
}

@Override
Expand All @@ -93,7 +98,15 @@ public void run() {

@VisibleForTesting
void submitJob(Job job) {
final WorkerRun workerRun = schedulerWorkerRunWithEnvironmentFactory.create(job);
// todo (cgardens) - this conditional goes away when all workers are run in temporal.
final WorkerRun workerRun;
if (job.getConfigType() == ConfigType.GET_SPEC) {
LOGGER.info("Using temporal runner.");
workerRun = temporalWorkerRunFactory.create(job);
} else {
LOGGER.info("Using scheduler runner.");
workerRun = schedulerWorkerRunWithEnvironmentFactory.create(job);
}
// we need to know the attempt number before we begin the job lifecycle. thus we state what the
// attempt number should be. if it is not, that the lifecycle will fail. this should not happen as
// long as job submission for a single job is single threaded. this is a compromise to allow the job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
import io.airbyte.db.Databases;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.temporal.TemporalClient;
import io.airbyte.scheduler.temporal.TemporalPool;
import io.airbyte.scheduler.temporal.TemporalUtils;
import io.airbyte.scheduler.temporal.TemporalWorkerRunFactory;
import io.airbyte.scheduler.worker_run.SchedulerWorkerRunWithEnvironmentFactory;
import io.airbyte.workers.process.DockerProcessBuilderFactory;
import io.airbyte.workers.process.KubeProcessBuilderFactory;
Expand Down Expand Up @@ -92,13 +96,23 @@ public SchedulerApp(Path workspaceRoot,
}

public void start() throws IOException {
final TemporalPool temporalPool = new TemporalPool(workspaceRoot, pbf);
// todo (cgardens) - i do not need to set up a thread pool for this, right?
temporalPool.run();
final TemporalClient temporalClient = new TemporalClient(TemporalUtils.TEMPORAL_CLIENT);

final ExecutorService workerThreadPool = Executors.newFixedThreadPool(MAX_WORKERS, THREAD_FACTORY);
final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor();
final SchedulerWorkerRunWithEnvironmentFactory workerRunWithEnvironmentFactory = new SchedulerWorkerRunWithEnvironmentFactory(workspaceRoot, pbf);

final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot);
final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now);
final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository);
final JobSubmitter jobSubmitter = new JobSubmitter(workerThreadPool, jobPersistence, configRepository, workerRunWithEnvironmentFactory);
final JobSubmitter jobSubmitter = new JobSubmitter(
workerThreadPool,
jobPersistence,
configRepository,
workerRunWithEnvironmentFactory,
temporalWorkerRunFactory);

Map<String, String> mdc = MDC.getCopyOfContextMap();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.scheduler.temporal;

import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.config.IntegrationLauncherConfig;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.process.ProcessBuilderFactory;
import io.airbyte.workers.protocols.airbyte.AirbyteStreamFactory;
import io.airbyte.workers.protocols.airbyte.DefaultAirbyteStreamFactory;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import io.temporal.activity.ActivityOptions;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.io.InputStream;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@WorkflowInterface
public interface SpecWorkflow {

@WorkflowMethod
ConnectorSpecification run(IntegrationLauncherConfig launcherConfig);

class WorkflowImpl implements SpecWorkflow {

ActivityOptions options = ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofMinutes(2)) // todo
.build();

private final SpecActivity activity = Workflow.newActivityStub(SpecActivity.class, options);

@Override
public ConnectorSpecification run(IntegrationLauncherConfig launcherConfig) {
return activity.run(launcherConfig);
}

}

@ActivityInterface
interface SpecActivity {

@ActivityMethod
ConnectorSpecification run(IntegrationLauncherConfig launcherConfig);

}

class SpecActivityImpl implements SpecActivity {

private static final Logger LOGGER = LoggerFactory.getLogger(SpecActivityImpl.class);

private final ProcessBuilderFactory pbf;
private final Path workspaceRoot;

public SpecActivityImpl(ProcessBuilderFactory pbf, Path workspaceRoot) {
this.pbf = pbf;
this.workspaceRoot = workspaceRoot;
}

public ConnectorSpecification run(IntegrationLauncherConfig launcherConfig) {
try {
// todo (cgardens) - we need to find a way to standardize log paths sanely across all workflow.
// right now we have this in temporal workflow.
final Path jobRoot = workspaceRoot
.resolve("spec")
.resolve(launcherConfig.getDockerImage().replaceAll("[^A-Za-z0-9]", ""))
.resolve(String.valueOf(Instant.now().getEpochSecond()));

final IntegrationLauncher integrationLauncher =
new AirbyteIntegrationLauncher(launcherConfig.getJobId(), launcherConfig.getAttemptId().intValue(), launcherConfig.getDockerImage(), pbf);
final Process process = integrationLauncher.spec(jobRoot).start();

LineGobbler.gobble(process.getErrorStream(), LOGGER::error);

final AirbyteStreamFactory streamFactory = new DefaultAirbyteStreamFactory();

Optional<ConnectorSpecification> spec;
try (InputStream stdout = process.getInputStream()) {
spec = streamFactory.create(IOs.newBufferedReader(stdout))
.filter(message -> message.getType() == AirbyteMessage.Type.SPEC)
.map(AirbyteMessage::getSpec)
.findFirst();

// todo (cgardens) - let's pre-fetch the images outside of the worker so we don't need account for
// this.
// retrieving spec should generally be instantaneous, but since docker images might not be pulled
// it could take a while longer depending on internet conditions as well.
WorkerUtils.gentleClose(process, 30, TimeUnit.MINUTES);
}

int exitCode = process.exitValue();
if (exitCode == 0) {
if (spec.isEmpty()) {
throw new RuntimeException("Spec job failed to output a spec struct.");
} else {
return spec.get();
}
} else {
throw new RuntimeException(String.format("Spec job subprocess finished with exit code %s", exitCode));
}
} catch (Exception e) {
throw new RuntimeException("Spec job failed with an exception", e);
}
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.scheduler.temporal;

import io.airbyte.config.IntegrationLauncherConfig;
import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.temporal.TemporalUtils.TemporalJobType;
import io.temporal.client.WorkflowClient;

public class TemporalClient {

private final WorkflowClient client;

public TemporalClient(WorkflowClient client) {
this.client = client;
}

public ConnectorSpecification submitGetSpec(long jobId, int attempt, JobGetSpecConfig config) {
final IntegrationLauncherConfig integrationLauncherConfig = new IntegrationLauncherConfig()
.withJobId(jobId)
.withAttemptId((long) attempt)
.withDockerImage(config.getDockerImage());
return getWorkflowStub(SpecWorkflow.class, TemporalJobType.GET_SPEC).run(integrationLauncherConfig);
}

private <T> T getWorkflowStub(Class<T> workflowClass, TemporalJobType jobType) {
return client.newWorkflowStub(workflowClass, TemporalUtils.getWorkflowOptions(jobType));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.scheduler.temporal;

import io.airbyte.scheduler.temporal.TemporalUtils.TemporalJobType;
import io.airbyte.workers.process.ProcessBuilderFactory;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import java.nio.file.Path;

public class TemporalPool implements Runnable {

private final Path workspaceRoot;
private final ProcessBuilderFactory pbf;

public TemporalPool(Path workspaceRoot, ProcessBuilderFactory pbf) {
this.workspaceRoot = workspaceRoot;
this.pbf = pbf;
}

@Override
public void run() {
WorkerFactory factory = WorkerFactory.newInstance(TemporalUtils.TEMPORAL_CLIENT);

final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name());
specWorker.registerWorkflowImplementationTypes(SpecWorkflow.WorkflowImpl.class);
specWorker.registerActivitiesImplementations(new SpecWorkflow.SpecActivityImpl(pbf, workspaceRoot));

// todo (cgardens) - these will come back once we use temporal for these workers.
// Worker discoverWorker = factory.newWorker(TemporalUtils.DISCOVER_WORKFLOW_QUEUE);
// discoverWorker.registerWorkflowImplementationTypes(DiscoverWorkflow.WorkflowImpl.class);
// discoverWorker.registerActivitiesImplementations(new DiscoverWorkflow.DiscoverActivityImpl(pbf,
// configs.getWorkspaceRoot()));
//
// Worker checkConnectionWorker = factory.newWorker(TemporalUtils.CHECK_CONNECTION_WORKFLOW_QUEUE);
// checkConnectionWorker.registerWorkflowImplementationTypes(CheckConnectionWorkflow.WorkflowImpl.class);
// checkConnectionWorker.registerActivitiesImplementations(new
// CheckConnectionWorkflow.CheckConnectionActivityImpl(pbf, configs.getWorkspaceRoot()));
//
// Worker syncWorker = factory.newWorker(TemporalUtils.SYNC_WORKFLOW_QUEUE);
// syncWorker.registerWorkflowImplementationTypes(SyncWorkflow.WorkflowImpl.class);
// syncWorker.registerActivitiesImplementations(new SyncWorkflow.SyncActivityImpl(pbf,
// configs.getWorkspaceRoot()));

factory.start();
}

}
Loading

0 comments on commit f0778e0

Please sign in to comment.