Skip to content

Commit

Permalink
Remove OutputAndStatus from Worker interface (#2318)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Mar 9, 2021
1 parent 4c22b9b commit 79f0935
Show file tree
Hide file tree
Showing 17 changed files with 89 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.integrations.standardtest.destination;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -40,18 +41,17 @@
import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.config.StandardGetSpecOutput;
import io.airbyte.config.StandardTargetConfig;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.workers.DefaultCheckConnectionWorker;
import io.airbyte.workers.DefaultGetSpecWorker;
import io.airbyte.workers.OutputAndStatus;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerException;
import io.airbyte.workers.normalization.NormalizationRunner;
Expand Down Expand Up @@ -147,12 +147,11 @@ protected boolean implementsBasicNormalization() {
*
* @return - a boolean.
*/
protected boolean implementsIncremental() {
final OutputAndStatus<StandardGetSpecOutput> output = runSpec();
assertTrue(output.getOutput().isPresent());
final StandardGetSpecOutput spec = output.getOutput().get();
if (spec.getSpecification().getSupportsIncremental() != null) {
return spec.getSpecification().getSupportsIncremental();
protected boolean implementsIncremental() throws WorkerException {
final ConnectorSpecification spec = runSpec();
assertNotNull(spec);
if (spec.getSupportsIncremental() != null) {
return spec.getSupportsIncremental();
} else {
return false;
}
Expand Down Expand Up @@ -223,9 +222,8 @@ void tearDownInternal() throws Exception {
* Verify that when the integrations returns a valid spec.
*/
@Test
public void testGetSpec() {
final OutputAndStatus<StandardGetSpecOutput> output = runSpec();
assertTrue(output.getOutput().isPresent());
public void testGetSpec() throws WorkerException {
assertNotNull(runSpec());
}

/**
Expand All @@ -234,9 +232,7 @@ public void testGetSpec() {
*/
@Test
public void testCheckConnection() throws Exception {
final OutputAndStatus<StandardCheckConnectionOutput> output = runCheck(getConfig());
assertTrue(output.getOutput().isPresent());
assertEquals(Status.SUCCEEDED, output.getOutput().get().getStatus());
assertEquals(Status.SUCCEEDED, runCheck(getConfig()).getStatus());
}

/**
Expand All @@ -245,9 +241,7 @@ public void testCheckConnection() throws Exception {
*/
@Test
public void testCheckConnectionInvalidCredentials() throws Exception {
final OutputAndStatus<StandardCheckConnectionOutput> output = runCheck(getFailCheckConfig());
assertTrue(output.getOutput().isPresent());
assertEquals(Status.FAILED, output.getOutput().get().getStatus());
assertEquals(Status.FAILED, runCheck(getFailCheckConfig()).getStatus());
}

private static class DataArgumentsProvider implements ArgumentsProvider {
Expand Down Expand Up @@ -365,12 +359,12 @@ public void testSecondSync() throws Exception {
assertSameMessages(secondSyncMessages, retrieveRecordsForCatalog(catalog));
}

private OutputAndStatus<StandardGetSpecOutput> runSpec() {
private ConnectorSpecification runSpec() throws WorkerException {
return new DefaultGetSpecWorker(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), pbf))
.run(new JobGetSpecConfig().withDockerImage(getImageName()), jobRoot);
}

private OutputAndStatus<StandardCheckConnectionOutput> runCheck(JsonNode config) {
private StandardCheckConnectionOutput runCheck(JsonNode config) throws WorkerException {
return new DefaultCheckConnectionWorker(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), pbf))
.run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,9 @@
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.StandardDiscoverCatalogOutput;
import io.airbyte.config.StandardGetSpecOutput;
import io.airbyte.config.StandardTapConfig;
import io.airbyte.config.State;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
Expand All @@ -53,7 +52,7 @@
import io.airbyte.workers.DefaultCheckConnectionWorker;
import io.airbyte.workers.DefaultDiscoverCatalogWorker;
import io.airbyte.workers.DefaultGetSpecWorker;
import io.airbyte.workers.OutputAndStatus;
import io.airbyte.workers.WorkerException;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.DockerProcessBuilderFactory;
import io.airbyte.workers.process.ProcessBuilderFactory;
Expand Down Expand Up @@ -205,10 +204,7 @@ public void tearDownInternal() throws Exception {
*/
@Test
public void testGetSpec() throws Exception {
final OutputAndStatus<StandardGetSpecOutput> output = runSpec();
assertTrue(output.getOutput().isPresent(), "Expected spec not to be empty");
assertEquals(getSpec(), output.getOutput().get().getSpecification(),
"Expected spec output by integration to be equal to spec provided by test runner");
assertEquals(getSpec(), runSpec(), "Expected spec output by integration to be equal to spec provided by test runner");
}

/**
Expand All @@ -217,9 +213,7 @@ public void testGetSpec() throws Exception {
*/
@Test
public void testCheckConnection() throws Exception {
final OutputAndStatus<StandardCheckConnectionOutput> output = runCheck();
assertTrue(output.getOutput().isPresent(), "Expected check connection to succeed when using provided credentials.");
assertEquals(Status.SUCCEEDED, output.getOutput().get().getStatus(), "Expected check connection operation to succeed");
assertEquals(Status.SUCCEEDED, runCheck().getStatus(), "Expected check connection operation to succeed");
}

// /**
Expand All @@ -239,11 +233,9 @@ public void testCheckConnection() throws Exception {
*/
@Test
public void testDiscover() throws Exception {
final OutputAndStatus<StandardDiscoverCatalogOutput> output = runDiscover();
assertTrue(output.getOutput().isPresent(), "Expected discover to produce a catalog");
// the worker validates that it is a valid catalog, so we do not need to validate again (as long as
// we use the worker, which we will not want to do long term).
assertNotNull(output.getOutput().get().getCatalog(), "Expected discover to produce a catalog");
assertNotNull(runDiscover(), "Expected discover to produce a catalog");
}

/**
Expand Down Expand Up @@ -401,17 +393,17 @@ private boolean sourceSupportsIncremental() throws Exception {
return false;
}

private OutputAndStatus<StandardGetSpecOutput> runSpec() {
private ConnectorSpecification runSpec() throws WorkerException {
return new DefaultGetSpecWorker(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), pbf))
.run(new JobGetSpecConfig().withDockerImage(getImageName()), jobRoot);
}

private OutputAndStatus<StandardCheckConnectionOutput> runCheck() throws Exception {
private StandardCheckConnectionOutput runCheck() throws Exception {
return new DefaultCheckConnectionWorker(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), pbf))
.run(new StandardCheckConnectionInput().withConnectionConfiguration(getConfig()), jobRoot);
}

private OutputAndStatus<StandardDiscoverCatalogOutput> runDiscover() throws Exception {
private AirbyteCatalog runDiscover() throws Exception {
return new DefaultDiscoverCatalogWorker(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), pbf))
.run(new StandardDiscoverCatalogInput().withConnectionConfiguration(getConfig()), jobRoot);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@

package io.airbyte.workers;

import static io.airbyte.workers.JobStatus.FAILED;
import static io.airbyte.workers.JobStatus.SUCCEEDED;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.io.IOs;
Expand Down Expand Up @@ -67,7 +64,7 @@ public DefaultCheckConnectionWorker(final IntegrationLauncher integrationLaunche
}

@Override
public OutputAndStatus<StandardCheckConnectionOutput> run(StandardCheckConnectionInput input, Path jobRoot) {
public StandardCheckConnectionOutput run(StandardCheckConnectionInput input, Path jobRoot) throws WorkerException {

final JsonNode configDotJson = input.getConnectionConfiguration();
IOs.writeFile(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(configDotJson));
Expand Down Expand Up @@ -95,14 +92,13 @@ public OutputAndStatus<StandardCheckConnectionOutput> run(StandardCheckConnectio

LOGGER.debug("Check connection job subprocess finished with exit code {}", exitCode);
LOGGER.debug("Check connection job received output: {}", output);
return new OutputAndStatus<>(SUCCEEDED, output);
return output;
} else {
return new OutputAndStatus<>(FAILED);
throw new WorkerException("Error while getting checking connection.");
}

} catch (Exception e) {
LOGGER.error("Error while getting checking connection.");
return new OutputAndStatus<>(JobStatus.FAILED);
throw new WorkerException("Error while getting checking connection.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.StandardDiscoverCatalogOutput;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand Down Expand Up @@ -62,8 +61,7 @@ public DefaultDiscoverCatalogWorker(final IntegrationLauncher integrationLaunche
}

@Override
public OutputAndStatus<StandardDiscoverCatalogOutput> run(final StandardDiscoverCatalogInput discoverSchemaInput,
final Path jobRoot) {
public AirbyteCatalog run(final StandardDiscoverCatalogInput discoverSchemaInput, final Path jobRoot) throws WorkerException {

final JsonNode configDotJson = discoverSchemaInput.getConnectionConfiguration();
IOs.writeFile(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(configDotJson));
Expand All @@ -87,21 +85,17 @@ public OutputAndStatus<StandardDiscoverCatalogOutput> run(final StandardDiscover
int exitCode = process.exitValue();
if (exitCode == 0) {
if (catalog.isEmpty()) {
LOGGER.error("Integration failed to output a catalog struct.");
return new OutputAndStatus<>(JobStatus.FAILED);
throw new WorkerException("Integration failed to output a catalog struct.");
}

return new OutputAndStatus<>(
JobStatus.SUCCEEDED,
new StandardDiscoverCatalogOutput()
.withCatalog(catalog.get()));
return catalog.get();
} else {
LOGGER.debug("Discover job subprocess finished with exit code {}", exitCode);
return new OutputAndStatus<>(JobStatus.FAILED);
throw new WorkerException(String.format("Discover job subprocess finished with exit code %s", exitCode));
}
} catch (final WorkerException e) {
throw e;
} catch (final Exception e) {
LOGGER.error("Error while discovering schema", e);
return new OutputAndStatus<>(JobStatus.FAILED);
throw new WorkerException("Error while discovering schema", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.config.StandardGetSpecOutput;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConnectorSpecification;
Expand Down Expand Up @@ -60,7 +59,7 @@ public DefaultGetSpecWorker(final IntegrationLauncher integrationLauncher) {
}

@Override
public OutputAndStatus<StandardGetSpecOutput> run(JobGetSpecConfig config, Path jobRoot) {
public ConnectorSpecification run(JobGetSpecConfig config, Path jobRoot) throws WorkerException {
try {
process = integrationLauncher.spec(jobRoot).start();

Expand All @@ -83,19 +82,16 @@ public OutputAndStatus<StandardGetSpecOutput> run(JobGetSpecConfig config, Path
int exitCode = process.exitValue();
if (exitCode == 0) {
if (spec.isEmpty()) {
LOGGER.error("integration failed to output a spec struct.");
return new OutputAndStatus<>(JobStatus.FAILED);
throw new WorkerException("integration failed to output a spec struct.");
}

return new OutputAndStatus<>(JobStatus.SUCCEEDED, new StandardGetSpecOutput().withSpecification(spec.get()));
return spec.get();

} else {
LOGGER.error("Spec job subprocess finished with exit code {}", exitCode);
return new OutputAndStatus<>(JobStatus.FAILED);
throw new WorkerException(String.format("Spec job subprocess finished with exit code %s", exitCode));
}
} catch (Exception e) {
LOGGER.error("Error while getting spec from image {}: {}", config.getDockerImage(), e);
return new OutputAndStatus<>(JobStatus.FAILED);
throw new WorkerException(String.format("Error while getting spec from image %s: %s", config.getDockerImage(), e));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public DefaultSyncWorker(
}

@Override
public OutputAndStatus<StandardSyncOutput> run(StandardSyncInput syncInput, Path jobRoot) {
public StandardSyncOutput run(StandardSyncInput syncInput, Path jobRoot) throws WorkerException {
long startTime = System.currentTimeMillis();

LOGGER.info("configured sync modes: {}", syncInput.getCatalog().getStreams()
Expand All @@ -106,9 +106,7 @@ public OutputAndStatus<StandardSyncOutput> run(StandardSyncInput syncInput, Path
}

} catch (Exception e) {
LOGGER.error("Sync worker failed.", e);

return new OutputAndStatus<>(JobStatus.FAILED, null);
throw new WorkerException("Sync worker failed.", e);
}

try (normalizationRunner) {
Expand All @@ -119,8 +117,7 @@ public OutputAndStatus<StandardSyncOutput> run(StandardSyncInput syncInput, Path
throw new WorkerException("Normalization Failed.");
}
} catch (Exception e) {
LOGGER.error("Normalization Failed.", e);
return new OutputAndStatus<>(JobStatus.FAILED, null);
throw new WorkerException("Normalization Failed.", e);
}

final StandardSyncSummary summary = new StandardSyncSummary()
Expand All @@ -132,14 +129,18 @@ public OutputAndStatus<StandardSyncOutput> run(StandardSyncInput syncInput, Path

LOGGER.info("sync summary: {}", summary);

if (cancelled.get()) {
throw new WorkerException("Sync was cancelled.");
}

final StandardSyncOutput output = new StandardSyncOutput().withStandardSyncSummary(summary);
messageTracker.getOutputState().ifPresent(capturedState -> {
final State state = new State()
.withState(capturedState);
output.withState(state);
});

return new OutputAndStatus<>(cancelled.get() ? JobStatus.FAILED : JobStatus.SUCCEEDED, output);
return output;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@
package io.airbyte.workers;

import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.StandardDiscoverCatalogOutput;
import io.airbyte.protocol.models.AirbyteCatalog;

public interface DiscoverCatalogWorker extends Worker<StandardDiscoverCatalogInput, StandardDiscoverCatalogOutput> {}
public interface DiscoverCatalogWorker extends Worker<StandardDiscoverCatalogInput, AirbyteCatalog> {}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public class EchoWorker implements Worker<String, String> {
public EchoWorker() {}

@Override
public OutputAndStatus<String> run(String string, Path jobRoot) {
public String run(String string, Path jobRoot) {
LOGGER.info("Hello World. input: {}, workspace root: {}", string, jobRoot);
return new OutputAndStatus<>(JobStatus.SUCCEEDED, "echoed");
return "echoed";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@
package io.airbyte.workers;

import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.config.StandardGetSpecOutput;
import io.airbyte.protocol.models.ConnectorSpecification;

public interface GetSpecWorker extends Worker<JobGetSpecConfig, StandardGetSpecOutput> {}
public interface GetSpecWorker extends Worker<JobGetSpecConfig, ConnectorSpecification> {}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public interface Worker<InputType, OutputType> {
* Blocking call to run the worker's workflow. Once this is complete, getStatus should return either
* COMPLETE, FAILED, or CANCELLED.
*/
OutputAndStatus<OutputType> run(InputType inputType, Path jobRoot);
OutputType run(InputType inputType, Path jobRoot) throws WorkerException;

void cancel();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,8 @@ public WorkerException(String message) {
super(message);
}

public WorkerException(String message, Throwable cause) {
super(message, cause);
}

}
Loading

0 comments on commit 79f0935

Please sign in to comment.