Skip to content

Commit

Permalink
add tests for each worker run factory
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed Mar 3, 2021
1 parent 9dc10eb commit 7749367
Show file tree
Hide file tree
Showing 8 changed files with 351 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.scheduler.worker_run;

import io.airbyte.config.JobOutput;
import io.airbyte.workers.Worker;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.process.ProcessBuilderFactory;
import java.nio.file.Path;

/**
* This classes exists to make testing WorkerRunFactories easier. For testing the extending class
* can use the protected constructor to pass in mocks. In production, the public constructor should
* be preferred.
*
* @param <T> Input config type
*/
public abstract class BaseWorkerRunFactory<T> implements WorkerRunFactory<T> {

final IntegrationLauncherFactory integrationLauncherFactory;
final WorkerRunCreator workerRunCreator;

public BaseWorkerRunFactory() {
this(WorkerRunFactoryUtils::createLauncher, WorkerRun::new);
}

BaseWorkerRunFactory(IntegrationLauncherFactory integrationLauncherFactory, WorkerRunCreator workerRunCreator) {
this.integrationLauncherFactory = integrationLauncherFactory;
this.workerRunCreator = workerRunCreator;
}

// exists to make testing easier.
@FunctionalInterface
interface IntegrationLauncherFactory {

IntegrationLauncher create(long jobId, int attempt, final String image, ProcessBuilderFactory pbf);

}

// exists to make testing easier.
@FunctionalInterface
interface WorkerRunCreator {

<T> WorkerRun create(Path jobRoot, T input, Worker<T, JobOutput> worker);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package io.airbyte.scheduler.worker_run;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.config.JobCheckConnectionConfig;
import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.workers.DefaultCheckConnectionWorker;
Expand All @@ -34,17 +35,27 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CheckConnectionWorkerRunFactory implements WorkerRunFactory<JobCheckConnectionConfig> {
public class CheckConnectionWorkerRunFactory extends BaseWorkerRunFactory<JobCheckConnectionConfig>
implements WorkerRunFactory<JobCheckConnectionConfig> {

private static final Logger LOGGER = LoggerFactory.getLogger(CheckConnectionWorkerRunFactory.class);

public CheckConnectionWorkerRunFactory() {
super();
}

@VisibleForTesting
CheckConnectionWorkerRunFactory(IntegrationLauncherFactory integrationLauncherFactory, WorkerRunCreator workerRunCreator) {
super(integrationLauncherFactory, workerRunCreator);
}

@Override
public WorkerRun create(Path jobRoot, ProcessBuilderFactory pbf, long jobId, int attempt, JobCheckConnectionConfig config) {
final StandardCheckConnectionInput checkConnectionInput = createCheckConnectionInput(config);

final IntegrationLauncher launcher = WorkerRunFactoryUtils.createLauncher(jobId, attempt, config.getDockerImage(), pbf);
final IntegrationLauncher launcher = integrationLauncherFactory.create(jobId, attempt, config.getDockerImage(), pbf);

return new WorkerRun(
return workerRunCreator.create(
jobRoot,
checkConnectionInput,
new JobOutputCheckConnectionWorker(new DefaultCheckConnectionWorker(launcher)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package io.airbyte.scheduler.worker_run;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.config.JobDiscoverCatalogConfig;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.workers.DefaultDiscoverCatalogWorker;
Expand All @@ -34,17 +35,26 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DiscoverWorkerRunFactory implements WorkerRunFactory<JobDiscoverCatalogConfig> {
public class DiscoverWorkerRunFactory extends BaseWorkerRunFactory<JobDiscoverCatalogConfig> implements WorkerRunFactory<JobDiscoverCatalogConfig> {

private static final Logger LOGGER = LoggerFactory.getLogger(DiscoverWorkerRunFactory.class);

public DiscoverWorkerRunFactory() {
super();
}

@VisibleForTesting
DiscoverWorkerRunFactory(IntegrationLauncherFactory integrationLauncherFactory, WorkerRunCreator workerRunCreator) {
super(integrationLauncherFactory, workerRunCreator);
}

@Override
public WorkerRun create(Path jobRoot, ProcessBuilderFactory pbf, long jobId, int attempt, JobDiscoverCatalogConfig config) {
final StandardDiscoverCatalogInput discoverSchemaInput = createDiscoverCatalogInput(config);

final IntegrationLauncher launcher = WorkerRunFactoryUtils.createLauncher(jobId, attempt, config.getDockerImage(), pbf);
final IntegrationLauncher launcher = integrationLauncherFactory.create(jobId, attempt, config.getDockerImage(), pbf);

return new WorkerRun(
return workerRunCreator.create(
jobRoot,
discoverSchemaInput,
new JobOutputDiscoverSchemaWorker(new DefaultDiscoverCatalogWorker(launcher)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package io.airbyte.scheduler.worker_run;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.workers.DefaultGetSpecWorker;
import io.airbyte.workers.process.IntegrationLauncher;
Expand All @@ -33,15 +34,24 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GetSpecWorkerRunFactory implements WorkerRunFactory<JobGetSpecConfig> {
public class GetSpecWorkerRunFactory extends BaseWorkerRunFactory<JobGetSpecConfig> implements WorkerRunFactory<JobGetSpecConfig> {

private static final Logger LOGGER = LoggerFactory.getLogger(GetSpecWorkerRunFactory.class);

public GetSpecWorkerRunFactory() {
super();
}

@VisibleForTesting
GetSpecWorkerRunFactory(IntegrationLauncherFactory integrationLauncherFactory, WorkerRunCreator workerRunCreator) {
super(integrationLauncherFactory, workerRunCreator);
}

@Override
public WorkerRun create(Path jobRoot, ProcessBuilderFactory pbf, long jobId, int attempt, JobGetSpecConfig config) {
final IntegrationLauncher launcher = WorkerRunFactoryUtils.createLauncher(jobId, attempt, config.getDockerImage(), pbf);
final IntegrationLauncher launcher = integrationLauncherFactory.create(jobId, attempt, config.getDockerImage(), pbf);

return new WorkerRun(
return workerRunCreator.create(
jobRoot,
config,
new JobOutputGetSpecWorker(new DefaultGetSpecWorker(launcher)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Creates WorkerRunFactories for each job type in the context of the Scheduler environment (as
* opposed to the Temporal enviornment). When we move to Temporal, we will replace this class with
* whatever the necessary Temporal logic is.
*/
public class SchedulerWorkerRunWithEnvironmentFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerWorkerRunWithEnvironmentFactory.class);
Expand Down Expand Up @@ -76,9 +81,7 @@ public static <T> WorkerRun workRunWithEnvironmentCreate(Path workspaceRoot,
.create(jobId, attempt, config);
}

/*
* This class is here to help with the testing
*/
// exists to make testing easier.
@FunctionalInterface
interface Creator {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@

package io.airbyte.scheduler.worker_run;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.JobResetConnectionConfig;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.scheduler.worker_run.BaseWorkerRunFactory.IntegrationLauncherFactory;
import io.airbyte.scheduler.worker_run.BaseWorkerRunFactory.WorkerRunCreator;
import io.airbyte.workers.DefaultSyncWorker;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.airbyte.workers.process.IntegrationLauncher;
Expand All @@ -46,7 +49,17 @@ public class SyncWorkerRunFactories {

private static final Logger LOGGER = LoggerFactory.getLogger(SyncWorkerRunFactories.class);

public static class ResetConnectionWorkerRunFactory implements WorkerRunFactory<JobResetConnectionConfig> {
public static class ResetConnectionWorkerRunFactory extends BaseWorkerRunFactory<JobResetConnectionConfig>
implements WorkerRunFactory<JobResetConnectionConfig> {

public ResetConnectionWorkerRunFactory() {
super();
}

@VisibleForTesting
ResetConnectionWorkerRunFactory(IntegrationLauncherFactory integrationLauncherFactory, WorkerRunCreator workerRunCreator) {
super(integrationLauncherFactory, workerRunCreator);
}

@Override
public WorkerRun create(Path jobRoot, ProcessBuilderFactory pbf, long jobId, int attempt, JobResetConnectionConfig config) {
Expand All @@ -57,7 +70,9 @@ public WorkerRun create(Path jobRoot, ProcessBuilderFactory pbf, long jobId, int
config.getDestinationDockerImage(),
createSyncInputFromResetConfig(config),
jobRoot,
pbf);
pbf,
integrationLauncherFactory,
workerRunCreator);
}

private static StandardSyncInput createSyncInputFromResetConfig(JobResetConnectionConfig config) {
Expand All @@ -69,20 +84,31 @@ private static StandardSyncInput createSyncInputFromResetConfig(JobResetConnecti

}

public static class SyncWorkerRunFactory implements WorkerRunFactory<JobSyncConfig> {
public static class SyncWorkerRunFactory extends BaseWorkerRunFactory<JobSyncConfig> implements WorkerRunFactory<JobSyncConfig> {

public SyncWorkerRunFactory() {
super();
}

@VisibleForTesting
SyncWorkerRunFactory(IntegrationLauncherFactory integrationLauncherFactory, WorkerRunCreator workerRunCreator) {
super(integrationLauncherFactory, workerRunCreator);
}

@Override
public WorkerRun create(Path jobRoot, ProcessBuilderFactory pbf, long jobId, int attempt, JobSyncConfig config) {
final DefaultAirbyteSource airbyteSource =
new DefaultAirbyteSource(WorkerRunFactoryUtils.createLauncher(jobId, attempt, config.getSourceDockerImage(), pbf));
new DefaultAirbyteSource(integrationLauncherFactory.create(jobId, attempt, config.getSourceDockerImage(), pbf));
return createSyncWorker(
jobId,
attempt,
airbyteSource,
config.getDestinationDockerImage(),
createSyncInputSyncConfig(config),
jobRoot,
pbf);
pbf,
integrationLauncherFactory,
workerRunCreator);
}

private static StandardSyncInput createSyncInputSyncConfig(JobSyncConfig config) {
Expand All @@ -101,12 +127,13 @@ private static WorkerRun createSyncWorker(long jobId,
String destinationDockerImage,
StandardSyncInput syncInput,
Path jobRoot,
ProcessBuilderFactory pbf
ProcessBuilderFactory pbf,
IntegrationLauncherFactory integrationLauncherFactory,

) {
final IntegrationLauncher destinationLauncher = WorkerRunFactoryUtils.createLauncher(jobId, attempt, destinationDockerImage, pbf);
WorkerRunCreator workerRunCreator) {
final IntegrationLauncher destinationLauncher = integrationLauncherFactory.create(jobId, attempt, destinationDockerImage, pbf);

return new WorkerRun(
return workerRunCreator.create(
jobRoot,
syncInput,
new JobOutputSyncWorker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ void setUp() throws IOException {
@EnumSource(value = JobConfig.ConfigType.class,
names = {"CHECK_CONNECTION_SOURCE", "CHECK_CONNECTION_DESTINATION"})
void testConnection(JobConfig.ConfigType value) {
final JobCheckConnectionConfig expectedInput = new JobCheckConnectionConfig().withConnectionConfiguration(CONFIG);
final JobCheckConnectionConfig expectedInput = new JobCheckConnectionConfig()
.withDockerImage("airbyte/source-earth:0.1.0")
.withConnectionConfiguration(CONFIG);
when(job.getConfig().getConfigType()).thenReturn(value);
when(job.getConfig().getCheckConnection()).thenReturn(expectedInput);

Expand All @@ -107,7 +109,9 @@ void testConnection(JobConfig.ConfigType value) {
@SuppressWarnings("unchecked")
@Test
void testSchema() {
final JobDiscoverCatalogConfig expectedInput = new JobDiscoverCatalogConfig().withConnectionConfiguration(CONFIG);
final JobDiscoverCatalogConfig expectedInput = new JobDiscoverCatalogConfig()
.withDockerImage("airbyte/source-earth:0.1.0")
.withConnectionConfiguration(CONFIG);
when(job.getConfig().getConfigType()).thenReturn(JobConfig.ConfigType.DISCOVER_SCHEMA);
when(job.getConfig().getDiscoverCatalog()).thenReturn(expectedInput);

Expand Down
Loading

0 comments on commit 7749367

Please sign in to comment.