Skip to content

Commit

Permalink
Rename defaultNamespace to prefix and make it optional (#2350)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopheDuong authored Mar 8, 2021
1 parent b03f1f8 commit e3cbb4e
Show file tree
Hide file tree
Showing 30 changed files with 107 additions and 121 deletions.
18 changes: 7 additions & 11 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1595,17 +1595,16 @@ components:
ConnectionCreate:
type: object
required:
- defaultNamespace
- sourceId
- destinationId
- status
properties:
name:
type: string
description: Optional name of the connection
defaultNamespace:
prefix:
type: string
description: Name of the connection that will determine where the source data will be written in the destination
description: Prefix that will be prepended to the name of each stream when it is written to the destination.
sourceId:
$ref: "#/components/schemas/SourceId"
destinationId:
Expand All @@ -1620,15 +1619,14 @@ components:
type: object
required:
- connectionId
- defaultNamespace
- syncCatalog
- status
properties:
connectionId:
$ref: "#/components/schemas/ConnectionId"
defaultNamespace:
prefix:
type: string
description: Name of the connection that will determine where the source data will be written in the destination
description: Prefix that will be prepended to the name of each stream when it is written to the destination.
syncCatalog:
$ref: "#/components/schemas/AirbyteCatalog"
schedule:
Expand All @@ -1647,7 +1645,6 @@ components:
required:
- connectionId
- name
- defaultNamespace
- sourceId
- destinationId
- syncCatalog
Expand All @@ -1657,9 +1654,9 @@ components:
$ref: "#/components/schemas/ConnectionId"
name:
type: string
defaultNamespace:
prefix:
type: string
description: Name of the connection that will determine where the source data will be written in the destination
description: Prefix that will be prepended to the name of each stream when it is written to the destination.
sourceId:
$ref: "#/components/schemas/SourceId"
destinationId:
Expand Down Expand Up @@ -1973,7 +1970,6 @@ components:
required:
- connectionId
- name
- defaultNamespace
- sourceId
- destinationId
- syncCatalog
Expand All @@ -1986,7 +1982,7 @@ components:
$ref: "#/components/schemas/ConnectionId"
name:
type: string
defaultNamespace:
prefix:
type: string
sourceId:
$ref: "#/components/schemas/SourceId"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ description: job reset connection config
type: object
additionalProperties: false
required:
- defaultNamespace
- destinationConfiguration
- configuredAirbyteCatalog
- destinationDockerImage
properties:
defaultNamespace:
description: Name of the connection that will determine where the source data will be written in the destination
prefix:
description: Prefix that will be prepended to the name of each stream when it is written to the destination.
type: string
destinationConfiguration:
description: Integration specific blob. Must be a valid JSON string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ description: job sync config
type: object
additionalProperties: false
required:
- defaultNamespace
- sourceConfiguration
- destinationConfiguration
- configuredAirbyteCatalog
- sourceDockerImage
- destinationDockerImage
properties:
defaultNamespace:
description: Name of the connection that will determine where the source data will be written in the destination
prefix:
description: Prefix that will be prepended to the name of each stream when it is written to the destination.
type: string
sourceConfiguration:
description: Integration specific blob. Must be a valid JSON string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ title: StandardSync
description: configuration required for sync for ALL taps
type: object
required:
- defaultNamespace
- sourceId
- destinationId
- name
- catalog
additionalProperties: false
properties:
defaultNamespace:
prefix:
description: Prefix that will be prepended to the name of each stream when it is written to the destination.
type: string
sourceId:
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ description: job sync config
type: object
additionalProperties: false
required:
- defaultNamespace
- sourceConfiguration
- destinationConfiguration
- configuredAirbyteCatalog
properties:
defaultNamespace:
description: Name of the connection that will determine where the source data will be written in the destination
prefix:
description: Prefix that will be prepended to the name of each stream when it is written to the destination.
type: string
sourceConfiguration:
description: Integration specific blob. Must be a valid JSON string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ void testListConfigs() throws JsonValidationException, IOException {
void writeConfigWithJsonSchemaRef() throws JsonValidationException, IOException, ConfigNotFoundException {
final StandardSync standardSync = new StandardSync()
.withName("sync")
.withDefaultNamespace("sync")
.withPrefix("sync")
.withConnectionId(UUID_1)
.withSourceId(UUID.randomUUID())
.withDestinationId(UUID.randomUUID())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Optional<Long> createSyncJob(SourceConnection source,
throws IOException {
// reusing this isn't going to quite work.
final JobSyncConfig jobSyncConfig = new JobSyncConfig()
.withDefaultNamespace(standardSync.getDefaultNamespace())
.withPrefix(standardSync.getPrefix())
.withSourceDockerImage(sourceDockerImageName)
.withSourceConfiguration(source.getConfiguration())
.withDestinationDockerImage(destinationDockerImageName)
Expand Down Expand Up @@ -137,7 +137,7 @@ public Optional<Long> createResetConnectionJob(DestinationConnection destination
configuredAirbyteCatalog.getStreams().forEach(configuredAirbyteStream -> configuredAirbyteStream.setSyncMode(SyncMode.FULL_REFRESH));

final JobResetConnectionConfig resetConnectionConfig = new JobResetConnectionConfig()
.withDefaultNamespace(standardSync.getDefaultNamespace())
.withPrefix(standardSync.getPrefix())
.withDestinationDockerImage(destinationDockerImage)
.withDestinationConfiguration(destination.getConfiguration())
.withConfiguredAirbyteCatalog(configuredAirbyteCatalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public JobOutput run(long jobId, long attemptId, String sourceDockerImage, Strin
jobId,
intAttemptId,
airbyteSource,
new NamespacingMapper(syncInput.getDefaultNamespace()),
new NamespacingMapper(syncInput.getPrefix()),
new DefaultAirbyteDestination(destinationLauncher),
new AirbyteMessageTracker(),
NormalizationRunnerFactory.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public WorkerRun create(Path jobRoot, ProcessBuilderFactory pbf, long jobId, int

private static StandardSyncInput createSyncInputFromResetConfig(JobResetConnectionConfig config) {
return new StandardSyncInput()
.withDefaultNamespace(config.getDefaultNamespace())
.withPrefix(config.getPrefix())
.withSourceConfiguration(Jsons.emptyObject())
.withDestinationConfiguration(config.getDestinationConfiguration())
.withCatalog(config.getConfiguredAirbyteCatalog());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public WorkerRun create(Path jobRoot, ProcessBuilderFactory pbf, long jobId, int

public static StandardSyncInput createSyncInputSyncConfig(JobSyncConfig config) {
return new StandardSyncInput()
.withDefaultNamespace(config.getDefaultNamespace())
.withPrefix(config.getPrefix())
.withSourceConfiguration(config.getSourceConfiguration())
.withDestinationConfiguration(config.getDestinationConfiguration())
.withCatalog(config.getConfiguredAirbyteCatalog())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static WorkerRun createSyncWorker(long jobId,
jobId,
attempt,
airbyteSource,
new NamespacingMapper(syncInput.getDefaultNamespace()),
new NamespacingMapper(syncInput.getPrefix()),
new DefaultAirbyteDestination(destinationLauncher),
new AirbyteMessageTracker(),
NormalizationRunnerFactory.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class JobSchedulerTest {
STANDARD_SYNC = new StandardSync()
.withConnectionId(connectionId)
.withName("presto to hudi")
.withDefaultNamespace("presto_to_hudi")
.withPrefix("presto_to_hudi")
.withStatus(StandardSync.Status.ACTIVE)
.withCatalog(catalog)
.withSourceId(sourceId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class DefaultJobCreatorTest {
STANDARD_SYNC = new StandardSync()
.withConnectionId(connectionId)
.withName("presto to hudi")
.withDefaultNamespace("presto_to_hudi")
.withPrefix("presto_to_hudi")
.withStatus(StandardSync.Status.ACTIVE)
.withCatalog(catalog)
.withSourceId(sourceId)
Expand Down Expand Up @@ -186,7 +186,7 @@ void testCreateGetSpecJob() throws IOException {
@Test
void testCreateSyncJob() throws IOException {
final JobSyncConfig jobSyncConfig = new JobSyncConfig()
.withDefaultNamespace(STANDARD_SYNC.getDefaultNamespace())
.withPrefix(STANDARD_SYNC.getPrefix())
.withSourceConfiguration(SOURCE_CONNECTION.getConfiguration())
.withSourceDockerImage(SOURCE_IMAGE_NAME)
.withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration())
Expand All @@ -212,7 +212,7 @@ void testCreateSyncJob() throws IOException {
@Test
void testCreateSyncJobEnsureNoQueuing() throws IOException {
final JobSyncConfig jobSyncConfig = new JobSyncConfig()
.withDefaultNamespace(STANDARD_SYNC.getDefaultNamespace())
.withPrefix(STANDARD_SYNC.getPrefix())
.withSourceConfiguration(SOURCE_CONNECTION.getConfiguration())
.withSourceDockerImage(SOURCE_IMAGE_NAME)
.withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration())
Expand Down Expand Up @@ -241,7 +241,7 @@ void testCreateResetConnectionJob() throws IOException {
.forEach(configuredAirbyteStream -> configuredAirbyteStream.setSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH));

final JobResetConnectionConfig JobResetConnectionConfig = new JobResetConnectionConfig()
.withDefaultNamespace(STANDARD_SYNC.getDefaultNamespace())
.withPrefix(STANDARD_SYNC.getPrefix())
.withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration())
.withDestinationDockerImage(DESTINATION_IMAGE_NAME)
.withConfiguredAirbyteCatalog(expectedCatalog);
Expand All @@ -267,7 +267,7 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
.forEach(configuredAirbyteStream -> configuredAirbyteStream.setSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH));

final JobResetConnectionConfig JobResetConnectionConfig = new JobResetConnectionConfig()
.withDefaultNamespace(STANDARD_SYNC.getDefaultNamespace())
.withPrefix(STANDARD_SYNC.getPrefix())
.withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration())
.withDestinationDockerImage(DESTINATION_IMAGE_NAME)
.withConfiguredAirbyteCatalog(expectedCatalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ void testSync() {
.withConfiguredAirbyteCatalog(CONFIGURED_CATALOG);

final StandardSyncInput expectedInput = new StandardSyncInput()
.withDefaultNamespace(config.getDefaultNamespace())
.withPrefix(config.getPrefix())
.withSourceConfiguration(config.getSourceConfiguration())
.withDestinationConfiguration(config.getDestinationConfiguration())
.withCatalog(config.getConfiguredAirbyteCatalog())
Expand All @@ -160,13 +160,13 @@ void testSync() {
@Test
void testResetConnection() {
final JobResetConnectionConfig config = new JobResetConnectionConfig()
.withDefaultNamespace("test")
.withPrefix("test")
.withDestinationDockerImage("airbyte/destination-moon:0.1.0")
.withDestinationConfiguration(CONFIG2)
.withConfiguredAirbyteCatalog(CONFIGURED_CATALOG);

final StandardSyncInput expectedInput = new StandardSyncInput()
.withDefaultNamespace(config.getDefaultNamespace())
.withPrefix(config.getPrefix())
.withSourceConfiguration(Jsons.emptyObject())
.withDestinationConfiguration(config.getDestinationConfiguration())
.withCatalog(config.getConfiguredAirbyteCatalog());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public ConnectionRead createConnection(ConnectionCreate connectionCreate) throws
final StandardSync standardSync = new StandardSync()
.withConnectionId(connectionId)
.withName(connectionCreate.getName() != null ? connectionCreate.getName() : "default")
.withDefaultNamespace(connectionCreate.getDefaultNamespace())
.withPrefix(connectionCreate.getPrefix())
.withSourceId(connectionCreate.getSourceId())
.withDestinationId(connectionCreate.getDestinationId())
.withStatus(toPersistenceStatus(connectionCreate.getStatus()));
Expand Down Expand Up @@ -155,7 +155,7 @@ private Builder<String, Object> generateMetadata(final StandardSync standardSync
public ConnectionRead updateConnection(ConnectionUpdate connectionUpdate) throws ConfigNotFoundException, IOException, JsonValidationException {
// retrieve sync
final StandardSync persistedSync = configRepository.getStandardSync(connectionUpdate.getConnectionId())
.withDefaultNamespace(connectionUpdate.getDefaultNamespace())
.withPrefix(connectionUpdate.getPrefix())
.withCatalog(CatalogConverter.toProtocol(connectionUpdate.getSyncCatalog()))
.withStatus(toPersistenceStatus(connectionUpdate.getStatus()));

Expand Down Expand Up @@ -218,7 +218,7 @@ public void deleteConnection(ConnectionIdRequestBody connectionIdRequestBody) th

public void deleteConnection(ConnectionRead connectionRead) throws ConfigNotFoundException, IOException, JsonValidationException {
final ConnectionUpdate connectionUpdate = new ConnectionUpdate()
.defaultNamespace(connectionRead.getDefaultNamespace())
.prefix(connectionRead.getPrefix())
.connectionId(connectionRead.getConnectionId())
.syncCatalog(connectionRead.getSyncCatalog())
.schedule(connectionRead.getSchedule())
Expand Down Expand Up @@ -260,7 +260,7 @@ private ConnectionRead buildConnectionRead(final StandardSync standardSync,
.status(toApiStatus(standardSync.getStatus()))
.schedule(apiSchedule)
.name(standardSync.getName())
.defaultNamespace(standardSync.getDefaultNamespace())
.prefix(standardSync.getPrefix())
.syncCatalog(CatalogConverter.toApi(standardSync.getCatalog()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private WbConnectionRead buildWbConnectionRead(ConnectionRead connectionRead) th
.sourceId(connectionRead.getSourceId())
.destinationId(connectionRead.getDestinationId())
.name(connectionRead.getName())
.defaultNamespace(connectionRead.getDefaultNamespace())
.prefix(connectionRead.getPrefix())
.syncCatalog(connectionRead.getSyncCatalog())
.status(connectionRead.getStatus())
.schedule(connectionRead.getSchedule())
Expand Down Expand Up @@ -212,7 +212,7 @@ public ConnectionRead webBackendUpdateConnection(WebBackendConnectionUpdate webB
protected static ConnectionUpdate toConnectionUpdate(WebBackendConnectionUpdate webBackendConnectionUpdate) {
ConnectionUpdate connectionUpdate = new ConnectionUpdate();

connectionUpdate.setDefaultNamespace(webBackendConnectionUpdate.getDefaultNamespace());
connectionUpdate.setPrefix(webBackendConnectionUpdate.getPrefix());
connectionUpdate.setConnectionId(webBackendConnectionUpdate.getConnectionId());
connectionUpdate.setSchedule(webBackendConnectionUpdate.getSchedule());
connectionUpdate.setStatus(webBackendConnectionUpdate.getStatus());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void testCreateConnection() throws JsonValidationException, ConfigNotFoundExcept
.sourceId(standardSync.getSourceId())
.destinationId(standardSync.getDestinationId())
.name("presto to hudi")
.defaultNamespace("presto_to_hudi")
.prefix("presto_to_hudi")
.status(ConnectionStatus.ACTIVE)
.schedule(ConnectionHelpers.generateBasicSchedule())
.syncCatalog(catalog);
Expand All @@ -123,7 +123,7 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept
catalog.getStreams().get(0).getConfig().setAliasName("azkaban_users");

final ConnectionUpdate connectionUpdate = new ConnectionUpdate()
.defaultNamespace(standardSync.getDefaultNamespace())
.prefix(standardSync.getPrefix())
.connectionId(standardSync.getConnectionId())
.status(ConnectionStatus.INACTIVE)
.schedule(null)
Expand All @@ -135,7 +135,7 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept
final StandardSync updatedStandardSync = new StandardSync()
.withConnectionId(standardSync.getConnectionId())
.withName("presto to hudi")
.withDefaultNamespace("presto_to_hudi")
.withPrefix("presto_to_hudi")
.withSourceId(standardSync.getSourceId())
.withDestinationId(standardSync.getDestinationId())
.withStatus(StandardSync.Status.INACTIVE)
Expand Down Expand Up @@ -212,7 +212,7 @@ void testDeleteConnection() throws JsonValidationException, IOException, ConfigN
standardSync.getDestinationId());

final ConnectionUpdate expectedConnectionUpdate = new ConnectionUpdate()
.defaultNamespace(connectionRead.getDefaultNamespace())
.prefix(connectionRead.getPrefix())
.connectionId(connectionRead.getConnectionId())
.status(ConnectionStatus.DEPRECATED)
.syncCatalog(connectionRead.getSyncCatalog())
Expand Down
Loading

0 comments on commit e3cbb4e

Please sign in to comment.