diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/BaseWorkerRunFactory.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/BaseWorkerRunFactory.java new file mode 100644 index 000000000000..04a5992e2884 --- /dev/null +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/BaseWorkerRunFactory.java @@ -0,0 +1,70 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.scheduler.worker_run; + +import io.airbyte.config.JobOutput; +import io.airbyte.workers.Worker; +import io.airbyte.workers.process.IntegrationLauncher; +import io.airbyte.workers.process.ProcessBuilderFactory; +import java.nio.file.Path; + +/** + * This classes exists to make testing WorkerRunFactories easier. For testing the extending class + * can use the protected constructor to pass in mocks. In production, the public constructor should + * be preferred. + * + * @param Input config type + */ +public abstract class BaseWorkerRunFactory implements WorkerRunFactory { + + final IntegrationLauncherFactory integrationLauncherFactory; + final WorkerRunCreator workerRunCreator; + + public BaseWorkerRunFactory() { + this(WorkerRunFactoryUtils::createLauncher, WorkerRun::new); + } + + BaseWorkerRunFactory(IntegrationLauncherFactory integrationLauncherFactory, WorkerRunCreator workerRunCreator) { + this.integrationLauncherFactory = integrationLauncherFactory; + this.workerRunCreator = workerRunCreator; + } + + // exists to make testing easier. + @FunctionalInterface + interface IntegrationLauncherFactory { + + IntegrationLauncher create(long jobId, int attempt, final String image, ProcessBuilderFactory pbf); + + } + + // exists to make testing easier. + @FunctionalInterface + interface WorkerRunCreator { + + WorkerRun create(Path jobRoot, T input, Worker worker); + + } + +} diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/CheckConnectionWorkerRunFactory.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/CheckConnectionWorkerRunFactory.java index d31133c19f4c..3519f4f2655f 100644 --- a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/CheckConnectionWorkerRunFactory.java +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/CheckConnectionWorkerRunFactory.java @@ -24,6 +24,7 @@ package io.airbyte.scheduler.worker_run; +import com.google.common.annotations.VisibleForTesting; import io.airbyte.config.JobCheckConnectionConfig; import io.airbyte.config.StandardCheckConnectionInput; import io.airbyte.workers.DefaultCheckConnectionWorker; @@ -34,17 +35,27 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class CheckConnectionWorkerRunFactory implements WorkerRunFactory { +public class CheckConnectionWorkerRunFactory extends BaseWorkerRunFactory + implements WorkerRunFactory { private static final Logger LOGGER = LoggerFactory.getLogger(CheckConnectionWorkerRunFactory.class); + public CheckConnectionWorkerRunFactory() { + super(); + } + + @VisibleForTesting + CheckConnectionWorkerRunFactory(IntegrationLauncherFactory integrationLauncherFactory, WorkerRunCreator workerRunCreator) { + super(integrationLauncherFactory, workerRunCreator); + } + @Override public WorkerRun create(Path jobRoot, ProcessBuilderFactory pbf, long jobId, int attempt, JobCheckConnectionConfig config) { final StandardCheckConnectionInput checkConnectionInput = createCheckConnectionInput(config); - final IntegrationLauncher launcher = WorkerRunFactoryUtils.createLauncher(jobId, attempt, config.getDockerImage(), pbf); + final IntegrationLauncher launcher = integrationLauncherFactory.create(jobId, attempt, config.getDockerImage(), pbf); - return new WorkerRun( + return workerRunCreator.create( jobRoot, checkConnectionInput, new JobOutputCheckConnectionWorker(new DefaultCheckConnectionWorker(launcher))); diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/DiscoverWorkerRunFactory.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/DiscoverWorkerRunFactory.java index 842110c5a6f2..3c13726457ef 100644 --- a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/DiscoverWorkerRunFactory.java +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/DiscoverWorkerRunFactory.java @@ -24,6 +24,7 @@ package io.airbyte.scheduler.worker_run; +import com.google.common.annotations.VisibleForTesting; import io.airbyte.config.JobDiscoverCatalogConfig; import io.airbyte.config.StandardDiscoverCatalogInput; import io.airbyte.workers.DefaultDiscoverCatalogWorker; @@ -34,17 +35,26 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class DiscoverWorkerRunFactory implements WorkerRunFactory { +public class DiscoverWorkerRunFactory extends BaseWorkerRunFactory implements WorkerRunFactory { private static final Logger LOGGER = LoggerFactory.getLogger(DiscoverWorkerRunFactory.class); + public DiscoverWorkerRunFactory() { + super(); + } + + @VisibleForTesting + DiscoverWorkerRunFactory(IntegrationLauncherFactory integrationLauncherFactory, WorkerRunCreator workerRunCreator) { + super(integrationLauncherFactory, workerRunCreator); + } + @Override public WorkerRun create(Path jobRoot, ProcessBuilderFactory pbf, long jobId, int attempt, JobDiscoverCatalogConfig config) { final StandardDiscoverCatalogInput discoverSchemaInput = createDiscoverCatalogInput(config); - final IntegrationLauncher launcher = WorkerRunFactoryUtils.createLauncher(jobId, attempt, config.getDockerImage(), pbf); + final IntegrationLauncher launcher = integrationLauncherFactory.create(jobId, attempt, config.getDockerImage(), pbf); - return new WorkerRun( + return workerRunCreator.create( jobRoot, discoverSchemaInput, new JobOutputDiscoverSchemaWorker(new DefaultDiscoverCatalogWorker(launcher))); diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/GetSpecWorkerRunFactory.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/GetSpecWorkerRunFactory.java index b493648351db..2bc680a8cee6 100644 --- a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/GetSpecWorkerRunFactory.java +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/GetSpecWorkerRunFactory.java @@ -24,6 +24,7 @@ package io.airbyte.scheduler.worker_run; +import com.google.common.annotations.VisibleForTesting; import io.airbyte.config.JobGetSpecConfig; import io.airbyte.workers.DefaultGetSpecWorker; import io.airbyte.workers.process.IntegrationLauncher; @@ -33,15 +34,24 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class GetSpecWorkerRunFactory implements WorkerRunFactory { +public class GetSpecWorkerRunFactory extends BaseWorkerRunFactory implements WorkerRunFactory { private static final Logger LOGGER = LoggerFactory.getLogger(GetSpecWorkerRunFactory.class); + public GetSpecWorkerRunFactory() { + super(); + } + + @VisibleForTesting + GetSpecWorkerRunFactory(IntegrationLauncherFactory integrationLauncherFactory, WorkerRunCreator workerRunCreator) { + super(integrationLauncherFactory, workerRunCreator); + } + @Override public WorkerRun create(Path jobRoot, ProcessBuilderFactory pbf, long jobId, int attempt, JobGetSpecConfig config) { - final IntegrationLauncher launcher = WorkerRunFactoryUtils.createLauncher(jobId, attempt, config.getDockerImage(), pbf); + final IntegrationLauncher launcher = integrationLauncherFactory.create(jobId, attempt, config.getDockerImage(), pbf); - return new WorkerRun( + return workerRunCreator.create( jobRoot, config, new JobOutputGetSpecWorker(new DefaultGetSpecWorker(launcher))); diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/SchedulerWorkerRunWithEnvironmentFactory.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/SchedulerWorkerRunWithEnvironmentFactory.java index b79bd2ff36d3..fd4bdb8cf396 100644 --- a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/SchedulerWorkerRunWithEnvironmentFactory.java +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/SchedulerWorkerRunWithEnvironmentFactory.java @@ -32,6 +32,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Creates WorkerRunFactories for each job type in the context of the Scheduler environment (as + * opposed to the Temporal enviornment). When we move to Temporal, we will replace this class with + * whatever the necessary Temporal logic is. + */ public class SchedulerWorkerRunWithEnvironmentFactory { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerWorkerRunWithEnvironmentFactory.class); @@ -76,9 +81,7 @@ public static WorkerRun workRunWithEnvironmentCreate(Path workspaceRoot, .create(jobId, attempt, config); } - /* - * This class is here to help with the testing - */ + // exists to make testing easier. @FunctionalInterface interface Creator { diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/SyncWorkerRunFactories.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/SyncWorkerRunFactories.java index 5038b390994b..3a1a9f7ae6bc 100644 --- a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/SyncWorkerRunFactories.java +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/worker_run/SyncWorkerRunFactories.java @@ -24,10 +24,13 @@ package io.airbyte.scheduler.worker_run; +import com.google.common.annotations.VisibleForTesting; import io.airbyte.commons.json.Jsons; import io.airbyte.config.JobResetConnectionConfig; import io.airbyte.config.JobSyncConfig; import io.airbyte.config.StandardSyncInput; +import io.airbyte.scheduler.worker_run.BaseWorkerRunFactory.IntegrationLauncherFactory; +import io.airbyte.scheduler.worker_run.BaseWorkerRunFactory.WorkerRunCreator; import io.airbyte.workers.DefaultSyncWorker; import io.airbyte.workers.normalization.NormalizationRunnerFactory; import io.airbyte.workers.process.IntegrationLauncher; @@ -46,7 +49,17 @@ public class SyncWorkerRunFactories { private static final Logger LOGGER = LoggerFactory.getLogger(SyncWorkerRunFactories.class); - public static class ResetConnectionWorkerRunFactory implements WorkerRunFactory { + public static class ResetConnectionWorkerRunFactory extends BaseWorkerRunFactory + implements WorkerRunFactory { + + public ResetConnectionWorkerRunFactory() { + super(); + } + + @VisibleForTesting + ResetConnectionWorkerRunFactory(IntegrationLauncherFactory integrationLauncherFactory, WorkerRunCreator workerRunCreator) { + super(integrationLauncherFactory, workerRunCreator); + } @Override public WorkerRun create(Path jobRoot, ProcessBuilderFactory pbf, long jobId, int attempt, JobResetConnectionConfig config) { @@ -57,7 +70,9 @@ public WorkerRun create(Path jobRoot, ProcessBuilderFactory pbf, long jobId, int config.getDestinationDockerImage(), createSyncInputFromResetConfig(config), jobRoot, - pbf); + pbf, + integrationLauncherFactory, + workerRunCreator); } private static StandardSyncInput createSyncInputFromResetConfig(JobResetConnectionConfig config) { @@ -69,12 +84,21 @@ private static StandardSyncInput createSyncInputFromResetConfig(JobResetConnecti } - public static class SyncWorkerRunFactory implements WorkerRunFactory { + public static class SyncWorkerRunFactory extends BaseWorkerRunFactory implements WorkerRunFactory { + + public SyncWorkerRunFactory() { + super(); + } + + @VisibleForTesting + SyncWorkerRunFactory(IntegrationLauncherFactory integrationLauncherFactory, WorkerRunCreator workerRunCreator) { + super(integrationLauncherFactory, workerRunCreator); + } @Override public WorkerRun create(Path jobRoot, ProcessBuilderFactory pbf, long jobId, int attempt, JobSyncConfig config) { final DefaultAirbyteSource airbyteSource = - new DefaultAirbyteSource(WorkerRunFactoryUtils.createLauncher(jobId, attempt, config.getSourceDockerImage(), pbf)); + new DefaultAirbyteSource(integrationLauncherFactory.create(jobId, attempt, config.getSourceDockerImage(), pbf)); return createSyncWorker( jobId, attempt, @@ -82,7 +106,9 @@ public WorkerRun create(Path jobRoot, ProcessBuilderFactory pbf, long jobId, int config.getDestinationDockerImage(), createSyncInputSyncConfig(config), jobRoot, - pbf); + pbf, + integrationLauncherFactory, + workerRunCreator); } private static StandardSyncInput createSyncInputSyncConfig(JobSyncConfig config) { @@ -101,12 +127,13 @@ private static WorkerRun createSyncWorker(long jobId, String destinationDockerImage, StandardSyncInput syncInput, Path jobRoot, - ProcessBuilderFactory pbf + ProcessBuilderFactory pbf, + IntegrationLauncherFactory integrationLauncherFactory, - ) { - final IntegrationLauncher destinationLauncher = WorkerRunFactoryUtils.createLauncher(jobId, attempt, destinationDockerImage, pbf); + WorkerRunCreator workerRunCreator) { + final IntegrationLauncher destinationLauncher = integrationLauncherFactory.create(jobId, attempt, destinationDockerImage, pbf); - return new WorkerRun( + return workerRunCreator.create( jobRoot, syncInput, new JobOutputSyncWorker( diff --git a/airbyte-scheduler/src/test/java/io/airbyte/scheduler/worker_run/SchedulerWorkerRunWithEnvironmentFactoryTest.java b/airbyte-scheduler/src/test/java/io/airbyte/scheduler/worker_run/SchedulerWorkerRunWithEnvironmentFactoryTest.java index 0a5c7109b0de..d147775d5ea0 100644 --- a/airbyte-scheduler/src/test/java/io/airbyte/scheduler/worker_run/SchedulerWorkerRunWithEnvironmentFactoryTest.java +++ b/airbyte-scheduler/src/test/java/io/airbyte/scheduler/worker_run/SchedulerWorkerRunWithEnvironmentFactoryTest.java @@ -93,7 +93,9 @@ void setUp() throws IOException { @EnumSource(value = JobConfig.ConfigType.class, names = {"CHECK_CONNECTION_SOURCE", "CHECK_CONNECTION_DESTINATION"}) void testConnection(JobConfig.ConfigType value) { - final JobCheckConnectionConfig expectedInput = new JobCheckConnectionConfig().withConnectionConfiguration(CONFIG); + final JobCheckConnectionConfig expectedInput = new JobCheckConnectionConfig() + .withDockerImage("airbyte/source-earth:0.1.0") + .withConnectionConfiguration(CONFIG); when(job.getConfig().getConfigType()).thenReturn(value); when(job.getConfig().getCheckConnection()).thenReturn(expectedInput); @@ -107,7 +109,9 @@ void testConnection(JobConfig.ConfigType value) { @SuppressWarnings("unchecked") @Test void testSchema() { - final JobDiscoverCatalogConfig expectedInput = new JobDiscoverCatalogConfig().withConnectionConfiguration(CONFIG); + final JobDiscoverCatalogConfig expectedInput = new JobDiscoverCatalogConfig() + .withDockerImage("airbyte/source-earth:0.1.0") + .withConnectionConfiguration(CONFIG); when(job.getConfig().getConfigType()).thenReturn(JobConfig.ConfigType.DISCOVER_SCHEMA); when(job.getConfig().getDiscoverCatalog()).thenReturn(expectedInput); diff --git a/airbyte-scheduler/src/test/java/io/airbyte/scheduler/worker_run/WorkerRunFactoriesTest.java b/airbyte-scheduler/src/test/java/io/airbyte/scheduler/worker_run/WorkerRunFactoriesTest.java new file mode 100644 index 000000000000..f5cb3755b200 --- /dev/null +++ b/airbyte-scheduler/src/test/java/io/airbyte/scheduler/worker_run/WorkerRunFactoriesTest.java @@ -0,0 +1,193 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.scheduler.worker_run; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.JobCheckConnectionConfig; +import io.airbyte.config.JobConfig.ConfigType; +import io.airbyte.config.JobDiscoverCatalogConfig; +import io.airbyte.config.JobGetSpecConfig; +import io.airbyte.config.JobOutput; +import io.airbyte.config.JobResetConnectionConfig; +import io.airbyte.config.JobSyncConfig; +import io.airbyte.config.StandardCheckConnectionInput; +import io.airbyte.config.StandardDiscoverCatalogInput; +import io.airbyte.config.StandardSyncInput; +import io.airbyte.config.State; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.SyncMode; +import io.airbyte.scheduler.worker_run.BaseWorkerRunFactory.IntegrationLauncherFactory; +import io.airbyte.scheduler.worker_run.BaseWorkerRunFactory.WorkerRunCreator; +import io.airbyte.workers.Worker; +import io.airbyte.workers.process.ProcessBuilderFactory; +import io.airbyte.workers.wrappers.JobOutputCheckConnectionWorker; +import io.airbyte.workers.wrappers.JobOutputDiscoverSchemaWorker; +import io.airbyte.workers.wrappers.JobOutputGetSpecWorker; +import io.airbyte.workers.wrappers.JobOutputSyncWorker; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.mockito.ArgumentCaptor; + +/** + * Tests the WorkerRunFactory for each job type. These are all in the same test class since they are + * very similar and tend to use almost identical inputs. + */ +class WorkerRunFactoriesTest { + + private static final long JOB_ID = 1L; + private static final int ATTEMPT_ID = 2; + private static final JsonNode CONFIG = Jsons.jsonNode(1); + private static final JsonNode CONFIG2 = Jsons.jsonNode(2); + private static final JsonNode STATE = Jsons.emptyObject(); + private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = new ConfiguredAirbyteCatalog() + .withStreams(Lists.newArrayList(new ConfiguredAirbyteStream().withSyncMode(SyncMode.FULL_REFRESH))); + private static final Path TEST_ROOT = Path.of("/tmp/airbyte_tests"); + + private Path jobRoot; + private ProcessBuilderFactory pbf; + private IntegrationLauncherFactory integrationLauncherFactory; + private WorkerRunCreator workerRunCreator; + + @BeforeEach + void setUp() throws IOException { + integrationLauncherFactory = mock(IntegrationLauncherFactory.class); + workerRunCreator = mock(WorkerRunCreator.class); + + final Path rootPath = Files.createTempDirectory(Files.createDirectories(TEST_ROOT), "test"); + jobRoot = rootPath.resolve("abc"); + pbf = mock(ProcessBuilderFactory.class); + } + + @SuppressWarnings("unchecked") + @ParameterizedTest + @EnumSource(value = ConfigType.class, + names = {"CHECK_CONNECTION_SOURCE", "CHECK_CONNECTION_DESTINATION"}) + void testConnection(ConfigType value) { + final JobCheckConnectionConfig config = new JobCheckConnectionConfig() + .withDockerImage("airbyte/source-earth:0.1.0") + .withConnectionConfiguration(CONFIG); + final StandardCheckConnectionInput expectedInput = + new StandardCheckConnectionInput().withConnectionConfiguration(config.getConnectionConfiguration()); + + new CheckConnectionWorkerRunFactory(integrationLauncherFactory, workerRunCreator).create(jobRoot, pbf, JOB_ID, ATTEMPT_ID, config); + final ArgumentCaptor> argument = ArgumentCaptor.forClass(Worker.class); + verify(workerRunCreator).create(eq(jobRoot), eq(expectedInput), argument.capture()); + verify(integrationLauncherFactory).create(JOB_ID, ATTEMPT_ID, config.getDockerImage(), pbf); + Assertions.assertTrue(argument.getValue() instanceof JobOutputCheckConnectionWorker); + } + + @SuppressWarnings("unchecked") + @Test + void testSchema() { + final JobDiscoverCatalogConfig config = new JobDiscoverCatalogConfig() + .withDockerImage("airbyte/source-earth:0.1.0") + .withConnectionConfiguration(CONFIG); + final StandardDiscoverCatalogInput expectedInput = new StandardDiscoverCatalogInput() + .withConnectionConfiguration(config.getConnectionConfiguration()); + + new DiscoverWorkerRunFactory(integrationLauncherFactory, workerRunCreator).create(jobRoot, pbf, JOB_ID, ATTEMPT_ID, config); + final ArgumentCaptor> argument = ArgumentCaptor.forClass(Worker.class); + verify(workerRunCreator).create(eq(jobRoot), eq(expectedInput), argument.capture()); + verify(integrationLauncherFactory).create(JOB_ID, ATTEMPT_ID, config.getDockerImage(), pbf); + Assertions.assertTrue(argument.getValue() instanceof JobOutputDiscoverSchemaWorker); + + } + + @SuppressWarnings("unchecked") + @Test + void testSync() { + final JobSyncConfig config = new JobSyncConfig() + .withState(new State().withState(STATE)) + .withSourceDockerImage("airbyte/source-earth:0.1.0") + .withDestinationDockerImage("airbyte/destination-moon:0.1.0") + .withSourceConfiguration(CONFIG) + .withDestinationConfiguration(CONFIG2) + .withConfiguredAirbyteCatalog(CONFIGURED_CATALOG); + + final StandardSyncInput expectedInput = new StandardSyncInput() + .withSourceConfiguration(config.getSourceConfiguration()) + .withDestinationConfiguration(config.getDestinationConfiguration()) + .withCatalog(config.getConfiguredAirbyteCatalog()) + .withState(config.getState()); + + new SyncWorkerRunFactories.SyncWorkerRunFactory(integrationLauncherFactory, workerRunCreator) + .create(jobRoot, pbf, JOB_ID, ATTEMPT_ID, config); + + final ArgumentCaptor> argument = ArgumentCaptor.forClass(Worker.class); + verify(workerRunCreator).create(eq(jobRoot), eq(expectedInput), argument.capture()); + verify(integrationLauncherFactory).create(JOB_ID, ATTEMPT_ID, config.getSourceDockerImage(), pbf); + verify(integrationLauncherFactory).create(JOB_ID, ATTEMPT_ID, config.getDestinationDockerImage(), pbf); + Assertions.assertTrue(argument.getValue() instanceof JobOutputSyncWorker); + } + + @SuppressWarnings("unchecked") + @Test + void testResetConnection() { + final JobResetConnectionConfig config = new JobResetConnectionConfig() + .withDestinationDockerImage("airbyte/destination-moon:0.1.0") + .withDestinationConfiguration(CONFIG2) + .withConfiguredAirbyteCatalog(CONFIGURED_CATALOG); + + final StandardSyncInput expectedInput = new StandardSyncInput() + .withSourceConfiguration(Jsons.emptyObject()) + .withDestinationConfiguration(config.getDestinationConfiguration()) + .withCatalog(config.getConfiguredAirbyteCatalog()); + + new SyncWorkerRunFactories.ResetConnectionWorkerRunFactory(integrationLauncherFactory, workerRunCreator) + .create(jobRoot, pbf, JOB_ID, ATTEMPT_ID, config); + + final ArgumentCaptor> argument = ArgumentCaptor.forClass(Worker.class); + verify(workerRunCreator).create(eq(jobRoot), eq(expectedInput), argument.capture()); + verify(integrationLauncherFactory).create(JOB_ID, ATTEMPT_ID, config.getDestinationDockerImage(), pbf); + Assertions.assertTrue(argument.getValue() instanceof JobOutputSyncWorker); + } + + @SuppressWarnings("unchecked") + @Test + void testGetSpec() { + final JobGetSpecConfig expectedConfig = new JobGetSpecConfig().withDockerImage("notarealimage"); + + new GetSpecWorkerRunFactory(integrationLauncherFactory, workerRunCreator).create(jobRoot, pbf, JOB_ID, ATTEMPT_ID, expectedConfig); + + verify(integrationLauncherFactory).create(JOB_ID, ATTEMPT_ID, expectedConfig.getDockerImage(), pbf); + final ArgumentCaptor> argument = ArgumentCaptor.forClass(Worker.class); + verify(workerRunCreator).create(eq(jobRoot), eq(expectedConfig), argument.capture()); + Assertions.assertTrue(argument.getValue() instanceof JobOutputGetSpecWorker); + } + +}