Skip to content

Commit

Permalink
destination-s3: fix issue with columnless streams
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Sep 18, 2024
1 parent 14f9e5c commit 1e92493
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.ObjectWriter
import com.fasterxml.jackson.databind.node.ObjectNode
import com.fasterxml.jackson.databind.node.TextNode
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.s3.avro.AvroConstants.Companion.AVRO_EXTRA_PROPS_FIELD
import io.airbyte.cdk.integrations.destination.s3.avro.AvroConstants.Companion.JSON_EXTRA_PROPS_FIELDS
Expand Down Expand Up @@ -81,7 +82,13 @@ class AvroRecordFactory(
// Preprocess the client data to add features not supported by the converter
val data = recordMessage.data as ObjectNode
val preprocessed = recordPreprocessor.invoke(data)
jsonRecord.setAll<JsonNode>(preprocessed as ObjectNode)
if (preprocessed is ObjectNode) {
jsonRecord.setAll<JsonNode>(preprocessed)
} else if (preprocessed !is TextNode || preprocessed.asText() != "{}") {
throw (IllegalArgumentException(
"found data field of type ${preprocessed?.javaClass} and length ${preprocessed?.asText()?.length} (value=${preprocessed?.asText()}"
))
}

return converter!!.convertToGenericDataRecord(WRITER.writeValueAsBytes(jsonRecord), schema)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.amazonaws.services.s3.model.DeleteObjectsRequest
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.JsonNodeFactory
import com.fasterxml.jackson.databind.node.ObjectNode
import com.google.common.collect.ImmutableMap
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
Expand All @@ -20,17 +21,7 @@ import io.airbyte.commons.io.IOs
import io.airbyte.commons.jackson.MoreMappers
import io.airbyte.commons.json.Jsons
import io.airbyte.commons.resources.MoreResources
import io.airbyte.protocol.models.v0.AirbyteCatalog
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
import io.airbyte.protocol.models.v0.AirbyteTraceMessage
import io.airbyte.protocol.models.v0.CatalogHelpers
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.airbyte.protocol.models.v0.StreamDescriptor
import io.airbyte.protocol.models.v0.SyncMode
import io.airbyte.protocol.models.v0.*
import io.airbyte.workers.exception.TestHarnessException
import io.github.oshai.kotlinlogging.KotlinLogging
import java.nio.file.Path
Expand Down Expand Up @@ -296,6 +287,67 @@ protected constructor(
.map { Jsons.deserialize(it, AirbyteMessage::class.java) }
}

@Test
fun testStreamWithNoColumns() {
val namespace = "nsp" + RandomStringUtils.randomAlphanumeric(5)
val streamName = "str" + RandomStringUtils.randomAlphanumeric(5)
val config = getConfig()

val streamSchema = JsonNodeFactory.instance.objectNode()
streamSchema.set<JsonNode>("properties", JsonNodeFactory.instance.objectNode())
val catalog =
ConfiguredAirbyteCatalog()
.withStreams(
java.util.List.of(
ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP)
.withGenerationId(0)
.withMinimumGenerationId(0)
.withSyncId(0)
.withStream(
AirbyteStream()
.withNamespace(namespace)
.withName(streamName)
.withJsonSchema(streamSchema),
),
),
)
val recordMessage =
AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(
AirbyteRecordMessage()
.withStream(catalog.streams[0].stream.name)
.withNamespace(catalog.streams[0].stream.namespace)
.withEmittedAt(Instant.now().toEpochMilli())
.withData(
JsonNodeFactory.instance.objectNode(),
)
)
val streamCompleteMessage =
AirbyteMessage()
.withType(AirbyteMessage.Type.TRACE)
.withTrace(
AirbyteTraceMessage()
.withStreamStatus(
AirbyteStreamStatusTraceMessage()
.withStatus(
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE
)
.withStreamDescriptor(
StreamDescriptor().withNamespace(namespace).withName(streamName)
)
)
)
runSyncAndVerifyStateOutput(
config,
listOf(recordMessage, streamCompleteMessage),
catalog,
false
)
}

/**
* Test 2 runs before refreshes support and after refreshes support in OVERWRITE mode. Verifies
* we clean up after ourselves correctly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.46.0'
features = ['db-destinations', 's3-destinations']
useLocalCdk = false
useLocalCdk = true
}

airbyteJavaConnector.addCdkDependencies()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 1.1.0
dockerImageTag: 1.2.0
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams |
| 1.1.0 | 2024-09-18 | [45436](https://github.com/airbytehq/airbyte/pull/45436) | upgrade all dependencies |
| 1.0.5 | 2024-09-05 | [45143](https://github.com/airbytehq/airbyte/pull/45143) | don't overwrite (and delete) existing files, skip indexes instead |
| 1.0.4 | 2024-08-30 | [44933](https://github.com/airbytehq/airbyte/pull/44933) | Fix: Avro/Parquet: handle empty schemas in nested objects/lists |
Expand Down

0 comments on commit 1e92493

Please sign in to comment.