diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationCatalog.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationCatalog.kt index be21089da20e..0759648e25ce 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationCatalog.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationCatalog.kt @@ -12,9 +12,7 @@ import jakarta.inject.Singleton * Internal representation of destination streams. This is intended to be a case class specialized * for usability. */ -data class DestinationCatalog( - val streams: List = emptyList(), -) { +data class DestinationCatalog(val streams: List = emptyList()) { private val byDescriptor: Map = streams.associateBy { it.descriptor } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationStream.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationStream.kt index 77c143ab6a30..6dcae9fa3f0b 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationStream.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/command/DestinationStream.kt @@ -4,7 +4,9 @@ package io.airbyte.cdk.command -import com.fasterxml.jackson.databind.node.ObjectNode +import io.airbyte.cdk.data.AirbyteType +import io.airbyte.cdk.data.AirbyteTypeToJsonSchema +import io.airbyte.cdk.data.JsonSchemaToAirbyteType import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream import io.airbyte.protocol.models.v0.DestinationSyncMode @@ -22,7 +24,7 @@ import jakarta.inject.Singleton data class DestinationStream( val descriptor: Descriptor, val importType: ImportType, - val schema: ObjectNode, + val schema: AirbyteType, val generationId: Long, val minimumGenerationId: Long, val syncId: Long, @@ -44,7 +46,7 @@ data class DestinationStream( AirbyteStream() .withNamespace(descriptor.namespace) .withName(descriptor.name) - .withJsonSchema(schema) + .withJsonSchema(AirbyteTypeToJsonSchema().convert(schema)) ) .withGenerationId(generationId) .withMinimumGenerationId(minimumGenerationId) @@ -83,10 +85,10 @@ class DestinationStreamFactory { DestinationSyncMode.APPEND_DEDUP -> Dedupe(primaryKey = stream.primaryKey, cursor = stream.cursorField) }, - schema = stream.stream.jsonSchema as ObjectNode, generationId = stream.generationId, minimumGenerationId = stream.minimumGenerationId, syncId = stream.syncId, + schema = JsonSchemaToAirbyteType().convert(stream.stream.jsonSchema) ) } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteType.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteType.kt new file mode 100644 index 000000000000..8824d1699f03 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteType.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.data + +sealed interface AirbyteType + +data object NullType : AirbyteType + +data object StringType : AirbyteType + +data object BooleanType : AirbyteType + +data object IntegerType : AirbyteType + +data object NumberType : AirbyteType + +data object DateType : AirbyteType + +data class TimestampType(val hasTimezone: Boolean) : AirbyteType + +data class TimeType(val hasTimezone: Boolean) : AirbyteType + +data class ArrayType(val items: FieldType) : AirbyteType + +data object ArrayTypeWithoutSchema : AirbyteType + +data class ObjectType(val properties: LinkedHashMap) : AirbyteType + +data object ObjectTypeWithEmptySchema : AirbyteType + +data object ObjectTypeWithoutSchema : AirbyteType + +data class UnionType(val options: List) : AirbyteType + +data class UnknownType(val what: String) : AirbyteType + +data class FieldType(val type: AirbyteType, val nullable: Boolean) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteTypeToJsonSchema.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteTypeToJsonSchema.kt new file mode 100644 index 000000000000..7fa89feba447 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteTypeToJsonSchema.kt @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.data + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.JsonNodeFactory +import com.fasterxml.jackson.databind.node.ObjectNode + +class AirbyteTypeToJsonSchema { + private fun ofType(typeName: String): ObjectNode { + return JsonNodeFactory.instance.objectNode().put("type", typeName) + } + + fun convert(airbyteType: AirbyteType): JsonNode { + return when (airbyteType) { + is NullType -> ofType("null") + is StringType -> ofType("string") + is BooleanType -> ofType("boolean") + is IntegerType -> ofType("integer") + is NumberType -> ofType("number") + is ArrayType -> + JsonNodeFactory.instance + .objectNode() + .put("type", "array") + .set("items", fromFieldType(airbyteType.items)) + is ArrayTypeWithoutSchema -> ofType("array") + is ObjectType -> { + val objNode = ofType("object") + val properties = objNode.putObject("properties") + airbyteType.properties.forEach { (name, field) -> + properties.replace(name, fromFieldType(field)) + } + objNode + } + is ObjectTypeWithoutSchema -> ofType("object") + is ObjectTypeWithEmptySchema -> { + val objectNode = ofType("object") + objectNode.putObject("properties") + objectNode + } + is UnionType -> { + val unionNode = JsonNodeFactory.instance.objectNode() + val unionOptions = unionNode.putArray("oneOf") + airbyteType.options.forEach { unionOptions.add(convert(it)) } + unionNode + } + is DateType -> ofType("string").put("format", "date") + is TimeType -> { + val timeNode = ofType("string").put("format", "time") + if (airbyteType.hasTimezone) { + timeNode.put("airbyte_type", "time_with_timezone") + } else { + timeNode.put("airbyte_type", "time_without_timezone") + } + } + is TimestampType -> { + val timestampNode = ofType("string").put("format", "date-time") + if (airbyteType.hasTimezone) { + timestampNode.put("airbyte_type", "timestamp_with_timezone") + } else { + timestampNode.put("airbyte_type", "timestamp_without_timezone") + } + } + else -> throw IllegalArgumentException("Unknown type: $airbyteType") + } + } + + private fun fromFieldType(field: FieldType): JsonNode { + if (field.nullable) { + if (field.type is UnionType) { + return convert(UnionType(options = field.type.options + NullType)) + } + return convert(UnionType(options = listOf(field.type, NullType))) + } + return convert(field.type) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteValue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteValue.kt new file mode 100644 index 000000000000..73acce297030 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteValue.kt @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.data + +import java.math.BigDecimal + +sealed interface AirbyteValue + +data object NullValue : AirbyteValue + +@JvmInline value class StringValue(val value: String) : AirbyteValue + +@JvmInline value class BooleanValue(val value: Boolean) : AirbyteValue + +@JvmInline value class IntegerValue(val value: Long) : AirbyteValue + +@JvmInline value class NumberValue(val value: BigDecimal) : AirbyteValue + +@JvmInline value class DateValue(val value: String) : AirbyteValue + +@JvmInline value class TimestampValue(val value: String) : AirbyteValue + +@JvmInline value class TimeValue(val value: String) : AirbyteValue + +@JvmInline value class ArrayValue(val values: List) : AirbyteValue + +@JvmInline value class ObjectValue(val values: LinkedHashMap) : AirbyteValue + +@JvmInline value class UnknownValue(val what: String) : AirbyteValue diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteValueToJson.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteValueToJson.kt new file mode 100644 index 000000000000..2d79157c54a4 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteValueToJson.kt @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.data + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.JsonNodeFactory + +class AirbyteValueToJson { + fun convert(value: AirbyteValue): JsonNode { + return when (value) { + is ArrayValue -> + JsonNodeFactory.instance.arrayNode().addAll(value.values.map { convert(it) }) + is BooleanValue -> JsonNodeFactory.instance.booleanNode(value.value) + is DateValue -> JsonNodeFactory.instance.textNode(value.value) + is IntegerValue -> JsonNodeFactory.instance.numberNode(value.value) + is NullValue -> JsonNodeFactory.instance.nullNode() + is NumberValue -> JsonNodeFactory.instance.numberNode(value.value) + is ObjectValue -> { + val objNode = JsonNodeFactory.instance.objectNode() + value.values.forEach { (name, field) -> objNode.replace(name, convert(field)) } + objNode + } + is StringValue -> JsonNodeFactory.instance.textNode(value.value) + is TimeValue -> JsonNodeFactory.instance.textNode(value.value) + is TimestampValue -> JsonNodeFactory.instance.textNode(value.value) + is UnknownValue -> throw IllegalArgumentException("Unknown value: $value") + } + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/JsonSchemaToAirbyteType.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/JsonSchemaToAirbyteType.kt new file mode 100644 index 000000000000..14ae1967e1df --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/JsonSchemaToAirbyteType.kt @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.data + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.JsonNodeFactory +import com.fasterxml.jackson.databind.node.ObjectNode + +class JsonSchemaToAirbyteType { + fun convert(schema: JsonNode): AirbyteType { + // try { + if (schema.isObject && schema.has("type")) { + // Normal json object with {"type": ..., ...} + val schemaType = (schema as ObjectNode).get("type") + return if (schemaType.isTextual) { + // {"type": , ...} + when (schema.get("type").asText()) { + "string" -> fromString(schema) + "boolean" -> BooleanType + "integer" -> IntegerType + "number" -> fromNumber(schema) + "array" -> fromArray(schema) + "object" -> fromObject(schema) + "null" -> NullType + else -> + throw IllegalArgumentException( + "Unknown type: ${ + schema.get("type").asText() + }" + ) + } + } else if (schemaType.isArray) { + // {"type": [...], ...} + unionFromCombinedTypes(schemaType.toList(), schema) + } else { + UnknownType("unspported type for 'type' field: $schemaType") + } + } else if (schema.isObject) { + // {"oneOf": [...], ...} or {"anyOf": [...], ...} or {"allOf": [...], ...} + val options = schema.get("oneOf") ?: schema.get("anyOf") ?: schema.get("allOf") + return if (options != null) { + UnionType(options.map { convert(it as ObjectNode) }) + } else { + // Default to object if no type and not a union type + convert((schema as ObjectNode).put("type", "object")) + } + } else if (schema.isTextual) { + // "" + val typeSchema = JsonNodeFactory.instance.objectNode().put("type", schema.asText()) + return convert(typeSchema) + } else { + return UnknownType("Unknown schema type: $schema") + } + } // catch (t: Throwable) { + // return UnknownType(t.message ?: "Unknown error") + // } + // } + + private fun fromString(schema: ObjectNode): AirbyteType = + when (schema.get("format")?.asText()) { + "date" -> DateType + "time" -> + TimeType( + hasTimezone = schema.get("airbyte_type")?.asText() != "time_without_timezone" + ) + "date-time" -> + TimestampType( + hasTimezone = + schema.get("airbyte_type")?.asText() != "timestamp_without_timezone" + ) + null -> StringType + else -> + throw IllegalArgumentException( + "Unknown string format: ${ + schema.get("format").asText() + }" + ) + } + + private fun fromNumber(schema: ObjectNode): AirbyteType = + if (schema.get("airbyte_type")?.asText() == "integer") { + IntegerType + } else { + NumberType + } + + private fun fromArray(schema: ObjectNode): AirbyteType { + val items = schema.get("items") ?: return ArrayTypeWithoutSchema + if (items.isArray) { + if (items.isEmpty) { + return ArrayTypeWithoutSchema + } + val itemOptions = UnionType(items.map { convert(it) }) + return ArrayType(fieldFromUnion(itemOptions)) + } + return ArrayType(fieldFromSchema(items as ObjectNode)) + } + + private fun fromObject(schema: ObjectNode): AirbyteType { + val properties = schema.get("properties") ?: return ObjectTypeWithoutSchema + if (properties.isEmpty) { + return ObjectTypeWithEmptySchema + } + val requiredFields = schema.get("required")?.map { it.asText() } ?: emptyList() + return objectFromProperties(properties as ObjectNode, requiredFields) + } + + private fun fieldFromSchema( + fieldSchema: ObjectNode, + onRequiredList: Boolean = false + ): FieldType { + val markedRequired = fieldSchema.get("required")?.asBoolean() ?: false + val nullable = !(onRequiredList || markedRequired) + val airbyteType = convert(fieldSchema) + if (airbyteType is UnionType) { + return fieldFromUnion(airbyteType, nullable) + } else { + return FieldType(airbyteType, nullable) + } + } + + private fun fieldFromUnion(unionType: UnionType, nullable: Boolean = false): FieldType { + if (unionType.options.contains(NullType)) { + val filtered = unionType.options.filter { it != NullType } + return FieldType(UnionType(filtered), nullable = true) + } + return FieldType(unionType, nullable = nullable) + } + + private fun objectFromProperties(schema: ObjectNode, requiredFields: List): ObjectType { + val properties = + schema + .fields() + .asSequence() + .map { (name, node) -> + name to fieldFromSchema(node as ObjectNode, requiredFields.contains(name)) + } + .toMap(LinkedHashMap()) + return ObjectType(properties) + } + + private fun unionFromCombinedTypes( + options: List, + parentSchema: ObjectNode + ): UnionType { + // Denormalize the properties across each type (the converter only checks what matters + // per type). + val unionOptions = + options.map { + if (it.isTextual) { + val schema = parentSchema.deepCopy() + schema.put("type", it.textValue()) + convert(schema) + } else { + convert(it) + } + } + return UnionType(unionOptions) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/JsonToAirbyteValue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/JsonToAirbyteValue.kt new file mode 100644 index 000000000000..bcce2ea1e1cc --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/JsonToAirbyteValue.kt @@ -0,0 +1,189 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.data + +import com.fasterxml.jackson.databind.JsonNode +import java.math.BigDecimal + +/** + * Converts from json to airbyte value, performing the minimum validation necessary to marshal to a + * native type. For example, we enforce that an integer is either integral or something that can be + * reasonably converted to an integer, but we do not parse dates or timestamps, which can be + * reasonably left as strings. + * + * TODO: In the future, should we be more or less aggressive here? Which keeps parity with existing + * behavior? Which existing behavior should be preserved? + */ +class JsonToAirbyteValue { + fun convert(json: JsonNode, schema: AirbyteType): AirbyteValue { + try { + return when (schema) { + is ArrayType -> toArray(json, schema.items.type) + is ArrayTypeWithoutSchema -> toArrayWithoutSchema(json) + is BooleanType -> toBoolean(json) + is DateType -> DateValue(json.asText()) + is IntegerType -> toInteger(json) + is NullType -> toNull(json) + is NumberType -> toNumber(json) + is ObjectType -> toObject(json, schema) + is ObjectTypeWithoutSchema, + is ObjectTypeWithEmptySchema -> toObjectWithoutSchema(json) + is StringType -> StringValue(json.asText()) + is TimeType -> TimeValue(json.asText()) + is TimestampType -> TimestampValue(json.asText()) + is UnionType -> toUnion(json, schema.options) + is UnknownType -> UnknownValue("From $schema: $json") + } + } catch (t: Throwable) { + return UnknownValue(t.message ?: "Unknown error") + } + } + + private fun toArray(json: JsonNode, schema: AirbyteType): ArrayValue { + if (!json.isArray) { + throw IllegalArgumentException("Could not convert $json to Array") + } + + return ArrayValue(json.map { convert(it, schema) }) + } + + private fun toArrayWithoutSchema(json: JsonNode): ArrayValue { + if (!json.isArray) { + throw IllegalArgumentException("Could not convert $json to Array") + } + + return ArrayValue(json.map { fromJson(it) }) + } + + private fun toBoolean(json: JsonNode): BooleanValue { + val boolVal = + when { + json.isBoolean -> json.asBoolean() + json.isIntegralNumber -> json.asLong() != 0L + json.isFloatingPointNumber -> json.asDouble() != 0.0 + json.isTextual -> json.asText().toBooleanStrict() + else -> throw IllegalArgumentException("Could not convert $json to Boolean") + } + return BooleanValue(boolVal) + } + + private fun toInteger(json: JsonNode): IntegerValue { + val longVal = + when { + json.isBoolean -> if (json.asBoolean()) 1L else 0L + json.isIntegralNumber -> json.asLong() + json.isFloatingPointNumber -> json.asDouble().toLong() + json.isTextual -> json.asText().toLong() + else -> throw IllegalArgumentException("Could not convert $json to Integer") + } + return IntegerValue(longVal) + } + + private fun toNumber(json: JsonNode): NumberValue { + val numVal = + when { + json.isBoolean -> BigDecimal(if (json.asBoolean()) 1.0 else 0.0) + json.isIntegralNumber -> json.asLong().toBigDecimal() + json.isFloatingPointNumber -> json.asDouble().toBigDecimal() + json.isTextual -> json.asText().toBigDecimal() + else -> throw IllegalArgumentException("Could not convert $json to Number") + } + return NumberValue(numVal) + } + + private fun toObject(json: JsonNode, schema: ObjectType): ObjectValue { + if (!json.isObject) { + throw IllegalArgumentException("Could not convert $json to Object") + } + + return ObjectValue( + values = + schema.properties + .mapValues { (name, field) -> convert(json.get(name), field.type) } + .toMap(LinkedHashMap()) + ) + } + + private fun toObjectWithoutSchema(json: JsonNode): ObjectValue { + if (!json.isObject) { + throw IllegalArgumentException("Could not convert $json to Object") + } + + return ObjectValue( + values = + json + .fields() + .asSequence() + .map { (name, value) -> name to fromJson(value) } + .toMap(LinkedHashMap()) + ) + } + + private fun toNull(json: JsonNode): NullValue { + if (!json.isNull) { + throw IllegalArgumentException("Null types must be null (not $json)") + } + + return NullValue + } + + private fun toUnion(json: JsonNode, options: List): AirbyteValue { + val option = + options.find { matchesStrictly(it, json) } + ?: options.find { matchesPermissively(it, json) } + ?: throw IllegalArgumentException( + "No matching union option in $options for $json" + ) + return convert(json, option) + } + + private fun fromJson(json: JsonNode): AirbyteValue { + return when { + json.isBoolean -> toBoolean(json) + json.isIntegralNumber -> toInteger(json) + json.isFloatingPointNumber -> toNumber(json) + json.isTextual -> StringValue(json.asText()) + json.isArray -> ArrayValue(json.map { fromJson(it) }) + json.isObject -> + ObjectValue( + json + .fields() + .asSequence() + .map { (name, value) -> name to fromJson(value) } + .toMap(LinkedHashMap()) + ) + json.isNull -> NullValue + else -> UnknownValue("From unrecognized json: $json") + } + } + + private fun matchesStrictly(schema: AirbyteType, json: JsonNode): Boolean { + return when (schema) { + is ArrayType, + is ArrayTypeWithoutSchema -> json.isArray + is BooleanType -> json.isBoolean + is DateType -> json.isTextual + is IntegerType -> json.isIntegralNumber + is NullType -> json.isNull + is NumberType -> json.isNumber + is ObjectType, + is ObjectTypeWithoutSchema, + is ObjectTypeWithEmptySchema -> json.isObject + is StringType -> json.isTextual + is TimeType -> json.isTextual + is TimestampType -> json.isTextual + is UnionType -> schema.options.any { matchesStrictly(it, json) } + is UnknownType -> false + } + } + + private fun matchesPermissively(schema: AirbyteType, json: JsonNode): Boolean { + return try { + convert(json, schema) !is UnknownValue + } catch (t: Throwable) { + false + } + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessage.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessage.kt index d6c37cfafce0..6d9c1b6bbf59 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessage.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/message/DestinationMessage.kt @@ -7,6 +7,9 @@ package io.airbyte.cdk.message import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.command.DestinationCatalog import io.airbyte.cdk.command.DestinationStream +import io.airbyte.cdk.data.AirbyteValue +import io.airbyte.cdk.data.AirbyteValueToJson +import io.airbyte.cdk.data.JsonToAirbyteValue import io.airbyte.cdk.message.CheckpointMessage.Checkpoint import io.airbyte.cdk.message.CheckpointMessage.Stats import io.airbyte.protocol.models.v0.AirbyteGlobalState @@ -37,7 +40,7 @@ sealed interface DestinationStreamAffinedMessage : DestinationMessage { data class DestinationRecord( override val stream: DestinationStream, - val data: JsonNode? = null, + val data: AirbyteValue, val emittedAtMs: Long, val meta: Meta?, val serialized: String, @@ -70,7 +73,7 @@ data class DestinationRecord( .withStream(stream.descriptor.name) .withNamespace(stream.descriptor.namespace) .withEmittedAt(emittedAtMs) - .withData(data) + .withData(AirbyteValueToJson().convert(data)) .also { if (meta != null) { it.meta = meta.asProtocolObject() @@ -206,14 +209,15 @@ data object Undefined : DestinationMessage { class DestinationMessageFactory(private val catalog: DestinationCatalog) { fun fromAirbyteMessage(message: AirbyteMessage, serialized: String): DestinationMessage { return when (message.type) { - AirbyteMessage.Type.RECORD -> + AirbyteMessage.Type.RECORD -> { + val stream = + catalog.getStream( + namespace = message.record.namespace, + name = message.record.stream, + ) DestinationRecord( - stream = - catalog.getStream( - namespace = message.record.namespace, - name = message.record.stream, - ), - data = message.record.data, + stream = stream, + data = JsonToAirbyteValue().convert(message.record.data, stream.schema), emittedAtMs = message.record.emittedAt, meta = message.record.meta?.let { meta -> @@ -229,6 +233,7 @@ class DestinationMessageFactory(private val catalog: DestinationCatalog) { }, serialized = serialized ) + } AirbyteMessage.Type.TRACE -> { val status = message.trace.streamStatus val stream = diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/command/MockCatalogFactory.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/command/MockCatalogFactory.kt index 93df547487a6..e0383e50293d 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/command/MockCatalogFactory.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/command/MockCatalogFactory.kt @@ -4,8 +4,10 @@ package io.airbyte.cdk.command -import com.fasterxml.jackson.databind.node.ObjectNode -import io.airbyte.protocol.models.Jsons +import io.airbyte.cdk.data.FieldType +import io.airbyte.cdk.data.IntegerType +import io.airbyte.cdk.data.ObjectType +import io.airbyte.cdk.data.StringType import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Replaces import io.micronaut.context.annotation.Requires @@ -20,10 +22,15 @@ class MockCatalogFactory : DestinationCatalogFactory { val stream1 = DestinationStream( DestinationStream.Descriptor("test", "stream1"), - Append, - Jsons.deserialize( - """{"type": "object", "properties": {"id": {"type": "integer"}}}""" - ) as ObjectNode, + importType = Append, + schema = + ObjectType( + properties = + linkedMapOf( + "id" to FieldType(type = IntegerType, nullable = true), + "name" to FieldType(type = StringType, nullable = true), + ), + ), generationId = 42, minimumGenerationId = 0, syncId = 42, @@ -31,10 +38,15 @@ class MockCatalogFactory : DestinationCatalogFactory { val stream2 = DestinationStream( DestinationStream.Descriptor("test", "stream2"), - Append, - Jsons.deserialize( - """{"type": "object", "properties": {"id": {"type": "integer"}}}""" - ) as ObjectNode, + importType = Append, + schema = + ObjectType( + properties = + linkedMapOf( + "id" to FieldType(type = IntegerType, nullable = true), + "name" to FieldType(type = StringType, nullable = true), + ), + ), generationId = 42, minimumGenerationId = 0, syncId = 42, diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/data/AirbyteTypeToJsonSchemaTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/data/AirbyteTypeToJsonSchemaTest.kt new file mode 100644 index 000000000000..5005755fcda7 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/data/AirbyteTypeToJsonSchemaTest.kt @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.data + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.JsonNodeFactory +import com.fasterxml.jackson.databind.node.ObjectNode +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test + +class AirbyteTypeToJsonSchemaTest { + @Test + fun testRoundTrip() { + val schema = JsonNodeFactory.instance.objectNode() + val props = schema.putObject("properties") + props.putObject("name").put("type", "string").put("required", true) + props.putObject("age").put("type", "integer") + props.putObject("is_cool").put("type", "boolean") + props.putObject("height").put("type", "number") + props.putObject("friends").put("type", "array").putObject("items").put("type", "string") + val subProps = props.putObject("address").put("type", "object").putObject("properties") + subProps.putObject("street").put("type", "string") + subProps.putObject("city").put("type", "string") + props.putObject("null_field").put("type", "null") + val union = props.putObject("nullable_union").putArray("oneOf") + union.add(JsonNodeFactory.instance.objectNode().put("type", "string")) + union.add(JsonNodeFactory.instance.objectNode().put("type", "integer")) + union.add(JsonNodeFactory.instance.objectNode().put("type", "null")) + + val union2 = props.putObject("nonnullable_union") + val union2opts = union2.putArray("oneOf") + union2opts.add(JsonNodeFactory.instance.objectNode().put("type", "string")) + union2opts.add(JsonNodeFactory.instance.objectNode().put("type", "integer")) + union2.put("required", true) + + props.putObject("combined_null").putArray("type").add("string").add("null") + + val combinedDenormalized = props.putObject("combined_denormalized") + combinedDenormalized.putArray("type").add("string").add("object") + combinedDenormalized.putObject("properties").putObject("name").put("type", "string") + + props + .putObject("union_array") + .put("type", "array") + .putArray("items") + .add("string") + .add("integer") + + props.putObject("date").put("type", "string").put("format", "date") + props.putObject("time").put("type", "string").put("format", "time") + props.putObject("timestamp").put("type", "string").put("format", "date-time") + props + .putObject("time_without_timezone") + .put("type", "string") + .put("format", "time") + .put("airbyte_type", "time_without_timezone") + props + .putObject("timestamp_without_timezone") + .put("type", "string") + .put("format", "date-time") + .put("airbyte_type", "timestamp_without_timezone") + + val converted = JsonSchemaToAirbyteType().convert(schema) + val unconverted = AirbyteTypeToJsonSchema().convert(converted) + + val propsOut = unconverted.get("properties") + Assertions.assertEquals(ofType("string", false), propsOut.get("name")) + Assertions.assertEquals(ofType("integer", true), propsOut.get("age")) + Assertions.assertEquals(ofType("boolean", true), propsOut.get("is_cool")) + Assertions.assertEquals(ofType("number", true), propsOut.get("height")) + + val friends = JsonNodeFactory.instance.objectNode() + friends.put("type", "array").replace("items", ofType("string", true)) + Assertions.assertEquals(ofNullable(friends), propsOut.get("friends")) + + val address = JsonNodeFactory.instance.objectNode() + val addressProps = address.put("type", "object").putObject("properties") + addressProps.replace("street", ofType("string", true)) + addressProps.replace("city", ofType("string", true)) + Assertions.assertEquals(ofNullable(address), propsOut.get("address")) + + Assertions.assertEquals(ofType("null", true), propsOut.get("null_field")) + + val nullableUnion = JsonNodeFactory.instance.objectNode() + nullableUnion + .putArray("oneOf") + .add(ofType("string", false)) + .add(ofType("integer", false)) + .add(ofType("null", false)) + Assertions.assertEquals(nullableUnion, propsOut.get("nullable_union")) + + val nonnullableUnion = JsonNodeFactory.instance.objectNode() + nonnullableUnion + .putArray("oneOf") + .add(ofType("string", false)) + .add(ofType("integer", false)) + Assertions.assertEquals(nonnullableUnion, propsOut.get("nonnullable_union")) + + Assertions.assertEquals(ofType("string", true), propsOut.get("combined_null")) + + val combinedDenormed = JsonNodeFactory.instance.objectNode() + val cdObj = ofType("object", false) + cdObj.putObject("properties").replace("name", ofType("string", true)) + combinedDenormed + .putArray("oneOf") + .add(ofType("string", false)) + .add(cdObj) + .add(ofType("null", false)) + Assertions.assertEquals(combinedDenormed, propsOut.get("combined_denormalized")) + + val unionArrayOut = JsonNodeFactory.instance.objectNode() + unionArrayOut + .put("type", "array") + .putObject("items") + .putArray("oneOf") + .add(ofType("string", false)) + .add(ofType("integer", false)) + Assertions.assertEquals(ofNullable(unionArrayOut), propsOut.get("union_array")) + + val timeTypeFieldNames = + listOf("time", "timestamp", "time_without_timezone", "timestamp_without_timezone") + timeTypeFieldNames.forEach { fieldName -> + val expected = props.get(fieldName) as ObjectNode + if (listOf("date-time", "time").contains(expected.get("format").asText())) { + val formatName = expected.get("format").asText().replace("date-time", "timestamp") + if (!expected.has("airbyte_type")) { + expected.put("airbyte_type", "${formatName}_with_timezone") + } + } + Assertions.assertEquals(ofNullable(expected), propsOut.get(fieldName)) + } + } + + private fun ofType(type: String, nullable: Boolean = true): ObjectNode = + if (nullable) { + ofNullable(ofType(type, false)) + } else { + JsonNodeFactory.instance.objectNode().put("type", type) + } + + private fun ofNullable(typeNode: JsonNode): ObjectNode { + val schema = JsonNodeFactory.instance.objectNode() + schema.putArray("oneOf").add(typeNode).add(ofType("null", false)) + return schema + } +} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/data/AirbyteValueToJsonTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/data/AirbyteValueToJsonTest.kt new file mode 100644 index 000000000000..dedc0997a8d5 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/data/AirbyteValueToJsonTest.kt @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.data + +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test + +class AirbyteValueToJsonTest { + @Test + fun testRoundTrip() { + val airbyteValue = + ObjectValue( + linkedMapOf( + "name" to StringValue("hello"), + "age" to IntegerValue(42), + "is_cool" to BooleanValue(true), + "height" to NumberValue("42.0".toBigDecimal()), + "friends" to ArrayValue(listOf(StringValue("hello"), StringValue("world"))), + "address" to + ObjectValue( + linkedMapOf( + "street" to StringValue("123 Main St"), + "city" to StringValue("San Francisco") + ) + ), + "null_field" to NullValue, + "nullable_union" to IntegerValue(42), + "nonnullable_union" to StringValue("hello"), + "combined_null" to StringValue("hello"), + "combined_denormalized" to + ObjectValue(linkedMapOf("name" to StringValue("hello"))), + "union_array" to ArrayValue(listOf(StringValue("hello"), IntegerValue(42))), + "date" to DateValue("2021-01-01"), + "time" to TimeValue("12:00:00"), + "timestamp" to TimestampValue("2021-01-01T12:00:00Z"), + "time_without_timezone" to TimeValue("12:00:00"), + "timestamp_without_timezone" to TimestampValue("2021-01-01T12:00:00") + ) + ) + val schema = + ObjectType( + linkedMapOf( + "name" to FieldType(StringType, true), + "age" to FieldType(IntegerType, false), + "is_cool" to FieldType(BooleanType, false), + "height" to FieldType(NumberType, false), + "friends" to FieldType(ArrayType(FieldType(StringType, true)), false), + "address" to + FieldType( + ObjectType( + linkedMapOf( + "street" to FieldType(StringType, true), + "city" to FieldType(StringType, true) + ) + ), + false + ), + "null_field" to FieldType(NullType, false), + "nullable_union" to + FieldType(UnionType(listOf(StringType, IntegerType, NullType)), false), + "nonnullable_union" to + FieldType(UnionType(listOf(StringType, IntegerType)), true), + "combined_null" to FieldType(UnionType(listOf(StringType, NullType)), false), + "combined_denormalized" to + FieldType( + ObjectType(linkedMapOf("name" to FieldType(StringType, true))), + false + ), + "union_array" to + FieldType( + ArrayType(FieldType(UnionType(listOf(StringType, IntegerType)), true)), + true + ), + "date" to FieldType(DateType, false), + "time" to FieldType(TimeType(false), false), + "timestamp" to FieldType(TimestampType(false), false), + "time_without_timezone" to FieldType(TimeType(true), false), + "timestamp_without_timezone" to FieldType(TimestampType(true), false) + ) + ) + val jsonValue = AirbyteValueToJson().convert(airbyteValue) + val roundTripValue = JsonToAirbyteValue().convert(jsonValue, schema) + + Assertions.assertEquals(airbyteValue, roundTripValue) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/data/JsonSchemaToAirbyteTypeTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/data/JsonSchemaToAirbyteTypeTest.kt new file mode 100644 index 000000000000..6afb6a89f549 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/data/JsonSchemaToAirbyteTypeTest.kt @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.data + +import com.fasterxml.jackson.databind.node.JsonNodeFactory +import com.fasterxml.jackson.databind.node.ObjectNode +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test + +class JsonSchemaToAirbyteTypeTest { + private fun ofType(type: String): ObjectNode { + return JsonNodeFactory.instance.objectNode().put("type", type) + } + + @Test + fun testNull() { + val nullType = ofType("null") + val airbyteType = JsonSchemaToAirbyteType().convert(nullType) + Assertions.assertTrue(airbyteType is NullType) + } + + @Test + fun testString() { + val stringType = ofType("string") + val airbyteType = JsonSchemaToAirbyteType().convert(stringType) + Assertions.assertTrue(airbyteType is StringType) + } + + @Test + fun testBoolean() { + val booleanType = ofType("boolean") + val airbyteType = JsonSchemaToAirbyteType().convert(booleanType) + Assertions.assertTrue(airbyteType is BooleanType) + } + + @Test + fun testInteger() { + val integerType = ofType("integer") + val airbyteType = JsonSchemaToAirbyteType().convert(integerType) + Assertions.assertTrue(airbyteType is IntegerType) + } + + @Test + fun testNumber() { + val numberType = ofType("number") + val airbyteType = JsonSchemaToAirbyteType().convert(numberType) + Assertions.assertTrue(airbyteType is NumberType) + numberType.put("airbyte_type", "integer") + val airbyteType2 = JsonSchemaToAirbyteType().convert(numberType) + Assertions.assertTrue(airbyteType2 is IntegerType) + } + + @Test + fun testStringDate() { + val stringType = ofType("string").put("format", "date") + val airbyteType = JsonSchemaToAirbyteType().convert(stringType) + Assertions.assertTrue(airbyteType is DateType) + } + + @Test + fun testStringTime() { + val stringType = ofType("string").put("format", "time") + val airbyteType = JsonSchemaToAirbyteType().convert(stringType) + Assertions.assertEquals(airbyteType, TimeType(hasTimezone = true)) + stringType.put("airbyte_type", "time_without_timezone") + val airbyteType2 = JsonSchemaToAirbyteType().convert(stringType) + Assertions.assertEquals(airbyteType2, TimeType(hasTimezone = false)) + stringType.put("airbyte_type", "time_with_timezone") + val airbyteType3 = JsonSchemaToAirbyteType().convert(stringType) + Assertions.assertEquals(airbyteType3, TimeType(hasTimezone = true)) + } + + @Test + fun testStringTimestamp() { + val stringType = ofType("string").put("format", "date-time") + val airbyteType = JsonSchemaToAirbyteType().convert(stringType) + Assertions.assertEquals(airbyteType, TimestampType(hasTimezone = true)) + stringType.put("airbyte_type", "timestamp_without_timezone") + val airbyteType2 = JsonSchemaToAirbyteType().convert(stringType) + Assertions.assertEquals(airbyteType2, TimestampType(hasTimezone = false)) + stringType.put("airbyte_type", "timestamp_with_timezone") + val airbyteType3 = JsonSchemaToAirbyteType().convert(stringType) + Assertions.assertEquals(airbyteType3, TimestampType(hasTimezone = true)) + } + + @Test + fun testObjectWithoutSchema() { + val objectType = ofType("object") + val airbyteType = JsonSchemaToAirbyteType().convert(objectType) + Assertions.assertTrue(airbyteType is ObjectTypeWithoutSchema) + } + + @Test + fun testObjectWithEmptySchema() { + val objectType = ofType("object") + objectType.replace("properties", JsonNodeFactory.instance.objectNode()) + val airbyteType = JsonSchemaToAirbyteType().convert(objectType) + Assertions.assertTrue(airbyteType is ObjectTypeWithEmptySchema) + } + + @Test + fun testArrayWithoutSchema() { + val arrayType = ofType("array") + val airbyteType = JsonSchemaToAirbyteType().convert(arrayType) + Assertions.assertTrue(airbyteType is ArrayTypeWithoutSchema) + } + + @Test + fun testObjectWithSchema() { + val schemaNode = ofType("object") + val properties = schemaNode.putObject("properties") + properties.replace("field1", ofType("string")) + properties.replace("field2", ofType("integer")) + val nestedProperties = + properties.putObject("nested").put("type", "object").putObject("properties") + nestedProperties.replace("field1", ofType("string")) + nestedProperties.replace("field2", ofType("integer")) + val airbyteType = JsonSchemaToAirbyteType().convert(schemaNode) + Assertions.assertTrue(airbyteType is ObjectType) + val objectType = airbyteType as ObjectType + Assertions.assertEquals(FieldType(StringType, true), objectType.properties["field1"]) + Assertions.assertEquals(FieldType(IntegerType, true), objectType.properties["field2"]) + + Assertions.assertTrue(objectType.properties.containsKey("nested")) + val nestedField = objectType.properties["nested"]!! + Assertions.assertTrue(nestedField.type is ObjectType) + val nestedObjectType = nestedField.type as ObjectType + Assertions.assertEquals(FieldType(StringType, true), nestedObjectType.properties["field1"]) + Assertions.assertEquals(FieldType(IntegerType, true), nestedObjectType.properties["field2"]) + } + + @Test + fun testArrayWithSingleSchema() { + val schemaNode = JsonNodeFactory.instance.objectNode().put("type", "array") + val itemsNode = schemaNode.putObject("items").put("type", "string") as ObjectNode + val airbyteType = JsonSchemaToAirbyteType().convert(schemaNode) + Assertions.assertTrue(airbyteType is ArrayType) + val arrayType = airbyteType as ArrayType + Assertions.assertEquals(FieldType(StringType, true), arrayType.items) + + itemsNode.put("type", "integer") + val airbyteType2 = JsonSchemaToAirbyteType().convert(schemaNode) + Assertions.assertTrue(airbyteType2 is ArrayType) + val arrayType2 = airbyteType2 as ArrayType + Assertions.assertEquals(FieldType(IntegerType, true), arrayType2.items) + } + + @Test + fun testUnionFromArrayOfTypes() { + listOf("oneOf", "anyOf", "allOf").forEach { + val schemaNode = JsonNodeFactory.instance.objectNode() + schemaNode.putArray(it).add(ofType("string")).add(ofType("integer")) + + val airbyteType = JsonSchemaToAirbyteType().convert(schemaNode) + Assertions.assertTrue(airbyteType is UnionType) + val unionType = airbyteType as UnionType + Assertions.assertEquals(2, unionType.options.size) + Assertions.assertTrue(unionType.options.contains(StringType)) + Assertions.assertTrue(unionType.options.contains(IntegerType)) + } + } + + @Test + fun testUnionFromArrayOfTypeNames() { + val schemaNode = JsonNodeFactory.instance.objectNode() + schemaNode.putArray("type").add("string").add("integer").add("object") + val airbyteType = JsonSchemaToAirbyteType().convert(schemaNode) + Assertions.assertTrue(airbyteType is UnionType) + val unionType = airbyteType as UnionType + Assertions.assertEquals(3, unionType.options.size) + Assertions.assertTrue(unionType.options.contains(StringType)) + } + + @Test + fun testObjectWithUnionProperties() { + val schemaNode = ofType("object") + val properties = schemaNode.putObject("properties") + val typesNode = JsonNodeFactory.instance.objectNode() + typesNode.putArray("type").add("string").add("integer") + properties.replace("field1", typesNode) + properties.replace("field2", ofType("integer")) + val airbyteType = JsonSchemaToAirbyteType().convert(schemaNode) + Assertions.assertTrue(airbyteType is ObjectType) + val objectType = airbyteType as ObjectType + Assertions.assertTrue(objectType.properties.containsKey("field1")) + val field1 = objectType.properties["field1"]!! + Assertions.assertTrue(field1.type is UnionType) + val unionType = field1.type as UnionType + Assertions.assertEquals(2, unionType.options.size) + Assertions.assertTrue(unionType.options.contains(StringType)) + Assertions.assertTrue(unionType.options.contains(IntegerType)) + Assertions.assertEquals(FieldType(IntegerType, true), objectType.properties["field2"]) + } + + @Test + fun testDenormalizeUnionProperties() { + val schemaNode = JsonNodeFactory.instance.objectNode() + schemaNode.putArray("type").add("object").add("array") + schemaNode.putObject("properties").replace("field1", ofType("string")) + schemaNode.putObject("items").put("type", "integer") + val airbyteType = JsonSchemaToAirbyteType().convert(schemaNode) + Assertions.assertTrue(airbyteType is UnionType) + val unionType = airbyteType as UnionType + Assertions.assertEquals(2, unionType.options.size) + val objectOption = unionType.options.find { it is ObjectType }!! + val arrayOption = unionType.options.find { it is ArrayType }!! + Assertions.assertTrue(objectOption is ObjectType) + val objectProperties = (objectOption as ObjectType).properties + Assertions.assertEquals(1, objectProperties.size) + Assertions.assertEquals(FieldType(StringType, true), objectProperties["field1"]) + Assertions.assertTrue(arrayOption is ArrayType) + val arrayItems = (arrayOption as ArrayType).items + Assertions.assertEquals(FieldType(IntegerType, true), arrayItems) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/data/JsonToAirbyteValueTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/data/JsonToAirbyteValueTest.kt new file mode 100644 index 000000000000..38b3259652b6 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/data/JsonToAirbyteValueTest.kt @@ -0,0 +1,163 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.data + +import com.fasterxml.jackson.databind.node.JsonNodeFactory +import java.math.BigDecimal +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test + +class JsonToAirbyteValueTest { + @Test + fun testNull() { + val value = JsonToAirbyteValue().convert(JsonNodeFactory.instance.nullNode(), NullType) + Assertions.assertTrue(value is NullValue) + } + + @Test + fun testString() { + val value = + JsonToAirbyteValue().convert(JsonNodeFactory.instance.textNode("hello"), StringType) + Assertions.assertTrue(value is StringValue) + Assertions.assertEquals("hello", (value as StringValue).value) + } + + @Test + fun testBoolean() { + val value = + JsonToAirbyteValue().convert(JsonNodeFactory.instance.booleanNode(true), BooleanType) + Assertions.assertTrue(value is BooleanValue) + Assertions.assertEquals(true, (value as BooleanValue).value) + } + + @Test + fun testInteger() { + val value = + JsonToAirbyteValue().convert(JsonNodeFactory.instance.numberNode(42), IntegerType) + Assertions.assertTrue(value is IntegerValue) + Assertions.assertEquals(42, (value as IntegerValue).value) + } + + @Test + fun testNumber() { + val value = + JsonToAirbyteValue().convert(JsonNodeFactory.instance.numberNode(42), NumberType) + Assertions.assertTrue(value is NumberValue) + Assertions.assertEquals(BigDecimal(42), (value as NumberValue).value) + } + + @Test + fun testArray() { + val value = + JsonToAirbyteValue() + .convert( + JsonNodeFactory.instance.arrayNode().add("hello").add("world"), + ArrayType(FieldType(StringType, true)) + ) + Assertions.assertTrue(value is ArrayValue) + val arrayValue = value as ArrayValue + Assertions.assertEquals(2, arrayValue.values.size) + Assertions.assertTrue(arrayValue.values[0] is StringValue) + Assertions.assertEquals("hello", (arrayValue.values[0] as StringValue).value) + Assertions.assertTrue(arrayValue.values[1] is StringValue) + Assertions.assertEquals("world", (arrayValue.values[1] as StringValue).value) + } + + @Test + fun testArrayWithoutSchema() { + val value = + JsonToAirbyteValue() + .convert( + JsonNodeFactory.instance.arrayNode().add("hello").add("world"), + ArrayTypeWithoutSchema + ) + Assertions.assertTrue(value is ArrayValue, "Expected ArrayValue, got $value") + val arrayValue = value as ArrayValue + Assertions.assertEquals(2, arrayValue.values.size) + Assertions.assertTrue(arrayValue.values[0] is StringValue) + Assertions.assertEquals("hello", (arrayValue.values[0] as StringValue).value) + Assertions.assertTrue(arrayValue.values[1] is StringValue) + Assertions.assertEquals("world", (arrayValue.values[1] as StringValue).value) + } + + @Test + fun testObject() { + val value = + JsonToAirbyteValue() + .convert( + JsonNodeFactory.instance.objectNode().put("name", "world"), + ObjectType(linkedMapOf("name" to FieldType(StringType, true))) + ) + Assertions.assertTrue(value is ObjectValue) + val objectValue = value as ObjectValue + Assertions.assertEquals(1, objectValue.values.size) + Assertions.assertTrue(objectValue.values["name"] is StringValue) + Assertions.assertEquals("world", (objectValue.values["name"] as StringValue).value) + } + + @Test + fun testObjectWithoutSchema() { + listOf(ObjectTypeWithoutSchema, ObjectTypeWithEmptySchema).forEach { + val value = + JsonToAirbyteValue() + .convert(JsonNodeFactory.instance.objectNode().put("name", "world"), it) + Assertions.assertTrue(value is ObjectValue) + val objectValue = value as ObjectValue + Assertions.assertEquals(1, objectValue.values.size) + Assertions.assertTrue(objectValue.values["name"] is StringValue) + Assertions.assertEquals("world", (objectValue.values["name"] as StringValue).value) + } + } + + @Test + fun testUnion() { + val stringValue = + JsonToAirbyteValue() + .convert( + JsonNodeFactory.instance.textNode("hello"), + UnionType(listOf(StringType, IntegerType)) + ) + Assertions.assertTrue(stringValue is StringValue) + Assertions.assertEquals("hello", (stringValue as StringValue).value) + + val intValue = + JsonToAirbyteValue() + .convert( + JsonNodeFactory.instance.numberNode(42), + UnionType(listOf(StringType, IntegerType)) + ) + Assertions.assertTrue(intValue is IntegerValue) + Assertions.assertEquals(42, (intValue as IntegerValue).value) + } + + @Test + fun testDate() { + val value = + JsonToAirbyteValue().convert(JsonNodeFactory.instance.textNode("2021-01-01"), DateType) + Assertions.assertTrue(value is DateValue) + Assertions.assertEquals("2021-01-01", (value as DateValue).value) + } + + @Test + fun testTimestamp() { + val value = + JsonToAirbyteValue() + .convert( + JsonNodeFactory.instance.textNode("2021-01-01T00:00:00Z"), + TimestampType(true) + ) + Assertions.assertTrue(value is TimestampValue) + Assertions.assertEquals("2021-01-01T00:00:00Z", (value as TimestampValue).value) + } + + @Test + fun testTime() { + val value = + JsonToAirbyteValue() + .convert(JsonNodeFactory.instance.textNode("00:00:00"), TimeType(true)) + Assertions.assertTrue(value is TimeValue) + Assertions.assertEquals("00:00:00", (value as TimeValue).value) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/message/DestinationMessageTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/message/DestinationMessageTest.kt index 16e82efde998..c7ba1edb0009 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/message/DestinationMessageTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/message/DestinationMessageTest.kt @@ -4,10 +4,10 @@ package io.airbyte.cdk.message -import com.fasterxml.jackson.databind.node.ObjectNode import io.airbyte.cdk.command.Append import io.airbyte.cdk.command.DestinationCatalog import io.airbyte.cdk.command.DestinationStream +import io.airbyte.cdk.data.ObjectTypeWithEmptySchema import io.airbyte.protocol.models.Jsons import io.airbyte.protocol.models.v0.AirbyteGlobalState import io.airbyte.protocol.models.v0.AirbyteMessage @@ -33,7 +33,7 @@ class DestinationMessageTest { DestinationStream( descriptor, Append, - Jsons.deserialize("{}") as ObjectNode, + ObjectTypeWithEmptySchema, generationId = 42, minimumGenerationId = 0, syncId = 42, diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/state/StreamsManagerTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/state/StreamsManagerTest.kt index 857a7b24bc5e..558d660affa3 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/state/StreamsManagerTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/state/StreamsManagerTest.kt @@ -4,17 +4,16 @@ package io.airbyte.cdk.state -import com.fasterxml.jackson.databind.node.ObjectNode import com.google.common.collect.Range import io.airbyte.cdk.command.Append import io.airbyte.cdk.command.DestinationCatalog import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.command.MockCatalogFactory.Companion.stream1 import io.airbyte.cdk.command.MockCatalogFactory.Companion.stream2 +import io.airbyte.cdk.data.NullType import io.airbyte.cdk.message.Batch import io.airbyte.cdk.message.BatchEnvelope import io.airbyte.cdk.message.SimpleBatch -import io.airbyte.protocol.models.Jsons import io.micronaut.test.extensions.junit5.annotation.MicronautTest import jakarta.inject.Inject import jakarta.inject.Named @@ -75,10 +74,8 @@ class StreamsManagerTest { streamsManager.getManager( DestinationStream( DestinationStream.Descriptor("test", "non-existent"), - Append, - Jsons.deserialize( - """{"type": "object", "properties": {"id": {"type": "integer"}}}""" - ) as ObjectNode, + importType = Append, + schema = NullType, generationId = 42, minimumGenerationId = 0, syncId = 42,