Skip to content

Commit

Permalink
Destinations CDK: protocol message interop (#45407)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Sep 13, 2024
1 parent 2fa9e2e commit 58d241d
Show file tree
Hide file tree
Showing 10 changed files with 579 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ data class DestinationCatalog(
return byDescriptor[descriptor]
?: throw IllegalArgumentException("Stream not found: namespace=$namespace, name=$name")
}

fun asProtocolObject(): ConfiguredAirbyteCatalog =
ConfiguredAirbyteCatalog().withStreams(streams.map { it.asProtocolObject() })
}

interface DestinationCatalogFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

package io.airbyte.cdk.command

import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.airbyte.protocol.models.v0.StreamDescriptor
import jakarta.inject.Singleton

/**
Expand All @@ -15,20 +19,51 @@ import jakarta.inject.Singleton
*
* TODO: Add dedicated schema type, converted from json-schema.
*/
class DestinationStream(val descriptor: Descriptor) {
data class Descriptor(val namespace: String, val name: String)

override fun hashCode(): Int {
return descriptor.hashCode()
}

override fun equals(other: Any?): Boolean {
return other is DestinationStream && descriptor == other.descriptor
data class DestinationStream(
val descriptor: Descriptor,
val importType: ImportType,
val schema: ObjectNode,
val generationId: Long,
val minimumGenerationId: Long,
val syncId: Long,
) {
data class Descriptor(val namespace: String, val name: String) {
fun asProtocolObject(): StreamDescriptor =
StreamDescriptor().withNamespace(namespace).withName(name)
}

override fun toString(): String {
return "DestinationStream(descriptor=$descriptor)"
}
/**
* This is not fully round-trippable. Destinations don't care about most of the stuff in an
* AirbyteStream (e.g. we don't care about defaultCursorField, we only care about the _actual_
* cursor field; we don't care about the source sync mode, we only care about the destination
* sync mode; etc.).
*/
fun asProtocolObject(): ConfiguredAirbyteStream =
ConfiguredAirbyteStream()
.withStream(
AirbyteStream()
.withNamespace(descriptor.namespace)
.withName(descriptor.name)
.withJsonSchema(schema)
)
.withGenerationId(generationId)
.withMinimumGenerationId(minimumGenerationId)
.withSyncId(syncId)
.apply {
when (importType) {
is Append -> {
destinationSyncMode = DestinationSyncMode.APPEND
}
is Dedupe -> {
destinationSyncMode = DestinationSyncMode.APPEND_DEDUP
cursorField = importType.cursor
primaryKey = importType.primaryKey
}
Overwrite -> {
destinationSyncMode = DestinationSyncMode.OVERWRITE
}
}
}
}

@Singleton
Expand All @@ -39,7 +74,46 @@ class DestinationStreamFactory {
DestinationStream.Descriptor(
namespace = stream.stream.namespace,
name = stream.stream.name
)
),
importType =
when (stream.destinationSyncMode) {
null -> throw IllegalArgumentException("Destination sync mode was null")
DestinationSyncMode.APPEND -> Append
DestinationSyncMode.OVERWRITE -> Overwrite
DestinationSyncMode.APPEND_DEDUP ->
Dedupe(primaryKey = stream.primaryKey, cursor = stream.cursorField)
},
schema = stream.stream.jsonSchema as ObjectNode,
generationId = stream.generationId,
minimumGenerationId = stream.minimumGenerationId,
syncId = stream.syncId,
)
}
}

sealed interface ImportType

data object Append : ImportType

data class Dedupe(
/**
* theoretically, the path to the fields in the PK. In practice, most destinations only support
* PK at the root level, i.e. `listOf(listOf(pkField1), listOf(pkField2), etc)`.
*/
val primaryKey: List<List<String>>,
/**
* theoretically, the path to the cursor. In practice, most destinations only support cursors at
* the root level, i.e. `listOf(cursorField)`.
*/
val cursor: List<String>,
) : ImportType
/**
* A legacy destination sync mode. Modern destinations depend on platform to set
* overwrite/record-retaining behavior via the generationId / minimumGenerationId parameters, and
* should treat this as equivalent to Append.
*
* [Overwrite] is approximately equivalent to an [Append] sync, with nonzeao generationId equal to
* minimumGenerationId.
*/
// TODO should this even exist?
data object Overwrite : ImportType
Loading

0 comments on commit 58d241d

Please sign in to comment.