-
Notifications
You must be signed in to change notification settings - Fork 4.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Bulk Load CDK: AirbyteType & AirbyteValue, marshaling from json (#45430)
- Loading branch information
1 parent
ace4e82
commit b17df3f
Showing
16 changed files
with
1,195 additions
and
34 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
39 changes: 39 additions & 0 deletions
39
airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteType.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.cdk.data | ||
|
||
sealed interface AirbyteType | ||
|
||
data object NullType : AirbyteType | ||
|
||
data object StringType : AirbyteType | ||
|
||
data object BooleanType : AirbyteType | ||
|
||
data object IntegerType : AirbyteType | ||
|
||
data object NumberType : AirbyteType | ||
|
||
data object DateType : AirbyteType | ||
|
||
data class TimestampType(val hasTimezone: Boolean) : AirbyteType | ||
|
||
data class TimeType(val hasTimezone: Boolean) : AirbyteType | ||
|
||
data class ArrayType(val items: FieldType) : AirbyteType | ||
|
||
data object ArrayTypeWithoutSchema : AirbyteType | ||
|
||
data class ObjectType(val properties: LinkedHashMap<String, FieldType>) : AirbyteType | ||
|
||
data object ObjectTypeWithEmptySchema : AirbyteType | ||
|
||
data object ObjectTypeWithoutSchema : AirbyteType | ||
|
||
data class UnionType(val options: List<AirbyteType>) : AirbyteType | ||
|
||
data class UnknownType(val what: String) : AirbyteType | ||
|
||
data class FieldType(val type: AirbyteType, val nullable: Boolean) |
79 changes: 79 additions & 0 deletions
79
airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteTypeToJsonSchema.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
/* | ||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.cdk.data | ||
|
||
import com.fasterxml.jackson.databind.JsonNode | ||
import com.fasterxml.jackson.databind.node.JsonNodeFactory | ||
import com.fasterxml.jackson.databind.node.ObjectNode | ||
|
||
class AirbyteTypeToJsonSchema { | ||
private fun ofType(typeName: String): ObjectNode { | ||
return JsonNodeFactory.instance.objectNode().put("type", typeName) | ||
} | ||
|
||
fun convert(airbyteType: AirbyteType): JsonNode { | ||
return when (airbyteType) { | ||
is NullType -> ofType("null") | ||
is StringType -> ofType("string") | ||
is BooleanType -> ofType("boolean") | ||
is IntegerType -> ofType("integer") | ||
is NumberType -> ofType("number") | ||
is ArrayType -> | ||
JsonNodeFactory.instance | ||
.objectNode() | ||
.put("type", "array") | ||
.set("items", fromFieldType(airbyteType.items)) | ||
is ArrayTypeWithoutSchema -> ofType("array") | ||
is ObjectType -> { | ||
val objNode = ofType("object") | ||
val properties = objNode.putObject("properties") | ||
airbyteType.properties.forEach { (name, field) -> | ||
properties.replace(name, fromFieldType(field)) | ||
} | ||
objNode | ||
} | ||
is ObjectTypeWithoutSchema -> ofType("object") | ||
is ObjectTypeWithEmptySchema -> { | ||
val objectNode = ofType("object") | ||
objectNode.putObject("properties") | ||
objectNode | ||
} | ||
is UnionType -> { | ||
val unionNode = JsonNodeFactory.instance.objectNode() | ||
val unionOptions = unionNode.putArray("oneOf") | ||
airbyteType.options.forEach { unionOptions.add(convert(it)) } | ||
unionNode | ||
} | ||
is DateType -> ofType("string").put("format", "date") | ||
is TimeType -> { | ||
val timeNode = ofType("string").put("format", "time") | ||
if (airbyteType.hasTimezone) { | ||
timeNode.put("airbyte_type", "time_with_timezone") | ||
} else { | ||
timeNode.put("airbyte_type", "time_without_timezone") | ||
} | ||
} | ||
is TimestampType -> { | ||
val timestampNode = ofType("string").put("format", "date-time") | ||
if (airbyteType.hasTimezone) { | ||
timestampNode.put("airbyte_type", "timestamp_with_timezone") | ||
} else { | ||
timestampNode.put("airbyte_type", "timestamp_without_timezone") | ||
} | ||
} | ||
else -> throw IllegalArgumentException("Unknown type: $airbyteType") | ||
} | ||
} | ||
|
||
private fun fromFieldType(field: FieldType): JsonNode { | ||
if (field.nullable) { | ||
if (field.type is UnionType) { | ||
return convert(UnionType(options = field.type.options + NullType)) | ||
} | ||
return convert(UnionType(options = listOf(field.type, NullType))) | ||
} | ||
return convert(field.type) | ||
} | ||
} |
31 changes: 31 additions & 0 deletions
31
airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteValue.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.cdk.data | ||
|
||
import java.math.BigDecimal | ||
|
||
sealed interface AirbyteValue | ||
|
||
data object NullValue : AirbyteValue | ||
|
||
@JvmInline value class StringValue(val value: String) : AirbyteValue | ||
|
||
@JvmInline value class BooleanValue(val value: Boolean) : AirbyteValue | ||
|
||
@JvmInline value class IntegerValue(val value: Long) : AirbyteValue | ||
|
||
@JvmInline value class NumberValue(val value: BigDecimal) : AirbyteValue | ||
|
||
@JvmInline value class DateValue(val value: String) : AirbyteValue | ||
|
||
@JvmInline value class TimestampValue(val value: String) : AirbyteValue | ||
|
||
@JvmInline value class TimeValue(val value: String) : AirbyteValue | ||
|
||
@JvmInline value class ArrayValue(val values: List<AirbyteValue>) : AirbyteValue | ||
|
||
@JvmInline value class ObjectValue(val values: LinkedHashMap<String, AirbyteValue>) : AirbyteValue | ||
|
||
@JvmInline value class UnknownValue(val what: String) : AirbyteValue |
31 changes: 31 additions & 0 deletions
31
airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/AirbyteValueToJson.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.cdk.data | ||
|
||
import com.fasterxml.jackson.databind.JsonNode | ||
import com.fasterxml.jackson.databind.node.JsonNodeFactory | ||
|
||
class AirbyteValueToJson { | ||
fun convert(value: AirbyteValue): JsonNode { | ||
return when (value) { | ||
is ArrayValue -> | ||
JsonNodeFactory.instance.arrayNode().addAll(value.values.map { convert(it) }) | ||
is BooleanValue -> JsonNodeFactory.instance.booleanNode(value.value) | ||
is DateValue -> JsonNodeFactory.instance.textNode(value.value) | ||
is IntegerValue -> JsonNodeFactory.instance.numberNode(value.value) | ||
is NullValue -> JsonNodeFactory.instance.nullNode() | ||
is NumberValue -> JsonNodeFactory.instance.numberNode(value.value) | ||
is ObjectValue -> { | ||
val objNode = JsonNodeFactory.instance.objectNode() | ||
value.values.forEach { (name, field) -> objNode.replace(name, convert(field)) } | ||
objNode | ||
} | ||
is StringValue -> JsonNodeFactory.instance.textNode(value.value) | ||
is TimeValue -> JsonNodeFactory.instance.textNode(value.value) | ||
is TimestampValue -> JsonNodeFactory.instance.textNode(value.value) | ||
is UnknownValue -> throw IllegalArgumentException("Unknown value: $value") | ||
} | ||
} | ||
} |
162 changes: 162 additions & 0 deletions
162
airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/data/JsonSchemaToAirbyteType.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
/* | ||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.cdk.data | ||
|
||
import com.fasterxml.jackson.databind.JsonNode | ||
import com.fasterxml.jackson.databind.node.JsonNodeFactory | ||
import com.fasterxml.jackson.databind.node.ObjectNode | ||
|
||
class JsonSchemaToAirbyteType { | ||
fun convert(schema: JsonNode): AirbyteType { | ||
// try { | ||
if (schema.isObject && schema.has("type")) { | ||
// Normal json object with {"type": ..., ...} | ||
val schemaType = (schema as ObjectNode).get("type") | ||
return if (schemaType.isTextual) { | ||
// {"type": <string>, ...} | ||
when (schema.get("type").asText()) { | ||
"string" -> fromString(schema) | ||
"boolean" -> BooleanType | ||
"integer" -> IntegerType | ||
"number" -> fromNumber(schema) | ||
"array" -> fromArray(schema) | ||
"object" -> fromObject(schema) | ||
"null" -> NullType | ||
else -> | ||
throw IllegalArgumentException( | ||
"Unknown type: ${ | ||
schema.get("type").asText() | ||
}" | ||
) | ||
} | ||
} else if (schemaType.isArray) { | ||
// {"type": [...], ...} | ||
unionFromCombinedTypes(schemaType.toList(), schema) | ||
} else { | ||
UnknownType("unspported type for 'type' field: $schemaType") | ||
} | ||
} else if (schema.isObject) { | ||
// {"oneOf": [...], ...} or {"anyOf": [...], ...} or {"allOf": [...], ...} | ||
val options = schema.get("oneOf") ?: schema.get("anyOf") ?: schema.get("allOf") | ||
return if (options != null) { | ||
UnionType(options.map { convert(it as ObjectNode) }) | ||
} else { | ||
// Default to object if no type and not a union type | ||
convert((schema as ObjectNode).put("type", "object")) | ||
} | ||
} else if (schema.isTextual) { | ||
// "<typename>" | ||
val typeSchema = JsonNodeFactory.instance.objectNode().put("type", schema.asText()) | ||
return convert(typeSchema) | ||
} else { | ||
return UnknownType("Unknown schema type: $schema") | ||
} | ||
} // catch (t: Throwable) { | ||
// return UnknownType(t.message ?: "Unknown error") | ||
// } | ||
// } | ||
|
||
private fun fromString(schema: ObjectNode): AirbyteType = | ||
when (schema.get("format")?.asText()) { | ||
"date" -> DateType | ||
"time" -> | ||
TimeType( | ||
hasTimezone = schema.get("airbyte_type")?.asText() != "time_without_timezone" | ||
) | ||
"date-time" -> | ||
TimestampType( | ||
hasTimezone = | ||
schema.get("airbyte_type")?.asText() != "timestamp_without_timezone" | ||
) | ||
null -> StringType | ||
else -> | ||
throw IllegalArgumentException( | ||
"Unknown string format: ${ | ||
schema.get("format").asText() | ||
}" | ||
) | ||
} | ||
|
||
private fun fromNumber(schema: ObjectNode): AirbyteType = | ||
if (schema.get("airbyte_type")?.asText() == "integer") { | ||
IntegerType | ||
} else { | ||
NumberType | ||
} | ||
|
||
private fun fromArray(schema: ObjectNode): AirbyteType { | ||
val items = schema.get("items") ?: return ArrayTypeWithoutSchema | ||
if (items.isArray) { | ||
if (items.isEmpty) { | ||
return ArrayTypeWithoutSchema | ||
} | ||
val itemOptions = UnionType(items.map { convert(it) }) | ||
return ArrayType(fieldFromUnion(itemOptions)) | ||
} | ||
return ArrayType(fieldFromSchema(items as ObjectNode)) | ||
} | ||
|
||
private fun fromObject(schema: ObjectNode): AirbyteType { | ||
val properties = schema.get("properties") ?: return ObjectTypeWithoutSchema | ||
if (properties.isEmpty) { | ||
return ObjectTypeWithEmptySchema | ||
} | ||
val requiredFields = schema.get("required")?.map { it.asText() } ?: emptyList() | ||
return objectFromProperties(properties as ObjectNode, requiredFields) | ||
} | ||
|
||
private fun fieldFromSchema( | ||
fieldSchema: ObjectNode, | ||
onRequiredList: Boolean = false | ||
): FieldType { | ||
val markedRequired = fieldSchema.get("required")?.asBoolean() ?: false | ||
val nullable = !(onRequiredList || markedRequired) | ||
val airbyteType = convert(fieldSchema) | ||
if (airbyteType is UnionType) { | ||
return fieldFromUnion(airbyteType, nullable) | ||
} else { | ||
return FieldType(airbyteType, nullable) | ||
} | ||
} | ||
|
||
private fun fieldFromUnion(unionType: UnionType, nullable: Boolean = false): FieldType { | ||
if (unionType.options.contains(NullType)) { | ||
val filtered = unionType.options.filter { it != NullType } | ||
return FieldType(UnionType(filtered), nullable = true) | ||
} | ||
return FieldType(unionType, nullable = nullable) | ||
} | ||
|
||
private fun objectFromProperties(schema: ObjectNode, requiredFields: List<String>): ObjectType { | ||
val properties = | ||
schema | ||
.fields() | ||
.asSequence() | ||
.map { (name, node) -> | ||
name to fieldFromSchema(node as ObjectNode, requiredFields.contains(name)) | ||
} | ||
.toMap(LinkedHashMap()) | ||
return ObjectType(properties) | ||
} | ||
|
||
private fun unionFromCombinedTypes( | ||
options: List<JsonNode>, | ||
parentSchema: ObjectNode | ||
): UnionType { | ||
// Denormalize the properties across each type (the converter only checks what matters | ||
// per type). | ||
val unionOptions = | ||
options.map { | ||
if (it.isTextual) { | ||
val schema = parentSchema.deepCopy() | ||
schema.put("type", it.textValue()) | ||
convert(schema) | ||
} else { | ||
convert(it) | ||
} | ||
} | ||
return UnionType(unionOptions) | ||
} | ||
} |
Oops, something went wrong.