Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

destination-s3: don't reuse names of existing objects #45143

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
| :--------- | :--------- | :----------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 0.44.21 | 2024-09-04 | [\#45143](https://github.com/airbytehq/airbyte/pull/45143) | S3-destination: don't overwrite existing files, skip those file indexes instead |
| 0.44.20 | 2024-08-30 | [\#44933](https://github.com/airbytehq/airbyte/pull/44933) | Avro/Parquet destinations: handle `{}` schemas inside objects/arrays |
| 0.44.19 | 2024-08-20 | [\#44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase Jackson message length limit to 100mb |
| 0.44.18 | 2024-08-22 | [\#44505](https://github.com/airbytehq/airbyte/pull/44505) | Improve handling of incoming debezium change events |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.44.20
version=0.44.21
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ open class S3StorageOperations(
private val s3FilenameTemplateManager: S3FilenameTemplateManager = S3FilenameTemplateManager()

private val partCounts: ConcurrentMap<String, AtomicInteger> = ConcurrentHashMap()
private val objectNameByPrefix: ConcurrentMap<String, Set<String>> = ConcurrentHashMap()

override fun getBucketObjectPath(
namespace: String?,
Expand Down Expand Up @@ -167,6 +168,32 @@ open class S3StorageOperations(
* @return the uploaded filename, which is different from the serialized buffer filename
* </extension></partId>
*/
@VisibleForTesting
fun getFileName(
objectPath: String,
recordsData: SerializableBuffer,
): String {
var fullObjectKey: String
do {
val partId: String = getPartId(objectPath)
val fileExtension: String = getExtension(recordsData.filename)
fullObjectKey =
if (!s3Config.fileNamePattern.isNullOrBlank()) {
s3FilenameTemplateManager.applyPatternToFilename(
S3FilenameTemplateParameterObject.builder()
.partId(partId)
.recordsData(recordsData)
.objectPath(objectPath)
.fileExtension(fileExtension)
.fileNamePattern(s3Config.fileNamePattern)
.build(),
)
} else {
objectPath + partId + fileExtension
}
} while (objectNameByPrefix.getValue(objectPath).contains(fullObjectKey))
return fullObjectKey
}
@Throws(IOException::class)
private fun loadDataIntoBucket(
objectPath: String,
Expand All @@ -175,22 +202,7 @@ open class S3StorageOperations(
): String {
val partSize: Long = DEFAULT_PART_SIZE.toLong()
val bucket: String? = s3Config.bucketName
val partId: String = getPartId(objectPath)
val fileExtension: String = getExtension(recordsData.filename)
val fullObjectKey: String =
if (!s3Config.fileNamePattern.isNullOrBlank()) {
s3FilenameTemplateManager.applyPatternToFilename(
S3FilenameTemplateParameterObject.builder()
.partId(partId)
.recordsData(recordsData)
.objectPath(objectPath)
.fileExtension(fileExtension)
.fileNamePattern(s3Config.fileNamePattern)
.build(),
)
} else {
objectPath + partId + fileExtension
}
val fullObjectKey: String = getFileName(objectPath, recordsData)
val metadata: MutableMap<String, String> = HashMap()
for (blobDecorator: BlobDecorator in blobDecorators) {
blobDecorator.updateMetadata(metadata, getMetadataMapping())
Expand Down Expand Up @@ -263,31 +275,14 @@ open class S3StorageOperations(
) {
AtomicInteger(0)
}

if (partCount.get() == 0) {
var objects: ObjectListing?
var objectCount = 0

val bucket: String? = s3Config.bucketName
objects = s3Client.listObjects(bucket, objectPath)

if (objects != null) {
objectCount += objects.objectSummaries.size
while (objects != null && objects.nextMarker != null) {
objects =
s3Client.listObjects(
ListObjectsRequest()
.withBucketName(bucket)
.withPrefix(objectPath)
.withMarker(objects.nextMarker),
)
if (objects != null) {
objectCount += objects.objectSummaries.size
}
}
objectNameByPrefix.computeIfAbsent(
objectPath,
) {
var objectList: Set<String> = setOf()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can probably use this method here right ?

forObjectsByPage(objectPath) { objectSummaries ->
objectList = objectList + objectSummaries.map { it.key }
}

partCount.set(objectCount)
objectList
}

return partCount.getAndIncrement().toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import io.airbyte.cdk.integrations.destination.s3.jsonschema.JsonRecordIdentityM
import io.airbyte.commons.jackson.MoreMappers

class JsonRecordParquetPreprocessor : JsonRecordIdentityMapper() {
private fun mapCommon(record: JsonNode?, matchingOption: ObjectNode): JsonNode? {
private fun mapCommon(record: JsonNode?, matchingOption: ObjectNode): ObjectNode {
val newObj = MoreMappers.initMapper().createObjectNode()

val propertyName = JsonSchemaParquetPreprocessor.typeFieldName(matchingOption)
Expand All @@ -24,7 +24,7 @@ class JsonRecordParquetPreprocessor : JsonRecordIdentityMapper() {
return newObj
}

override fun mapUnion(record: JsonNode?, schema: ObjectNode): JsonNode? {
override fun mapUnion(record: JsonNode?, schema: ObjectNode): ObjectNode? {
if (record == null || record.isNull) {
return null
}
Expand All @@ -35,7 +35,7 @@ class JsonRecordParquetPreprocessor : JsonRecordIdentityMapper() {
return mapCommon(record, matchingOption)
}

override fun mapCombined(record: JsonNode?, schema: ObjectNode): JsonNode? {
override fun mapCombined(record: JsonNode?, schema: ObjectNode): ObjectNode? {
if (record == null || record.isNull) {
return null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.amazonaws.services.s3.model.ListObjectsRequest
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer
import io.airbyte.cdk.integrations.destination.s3.util.S3NameTransformer
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
Expand All @@ -23,6 +24,7 @@ import org.junit.jupiter.api.Test
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers
import org.mockito.Mockito
import org.mockito.kotlin.eq

class S3StorageOperationsTest {

Expand All @@ -31,7 +33,9 @@ class S3StorageOperationsTest {
private const val FAKE_BUCKET_PATH = "fake-bucketPath"
private const val NAMESPACE = "namespace"
private const val STREAM_NAME = "stream_name1"
private const val OBJECT_TO_DELETE = "$NAMESPACE/$STREAM_NAME/2022_04_04_123456789_0.csv.gz"
private const val OBJECT_PREFIX = "$NAMESPACE/$STREAM_NAME/2022_04_04_123456789_"
private const val OBJECT_EXTENSION = ".csv.gz"
private const val OBJECT_TO_DELETE = "${OBJECT_PREFIX}1$OBJECT_EXTENSION"
}

private lateinit var s3Client: AmazonS3
Expand Down Expand Up @@ -74,6 +78,15 @@ class S3StorageOperationsTest {
),
)
.thenReturn(results)
Mockito.`when`(
s3Client.listObjects(
eq(BUCKET_NAME),
ArgumentMatchers.any(
String::class.java,
),
),
)
.thenReturn(results)

val s3Config =
S3DestinationConfig.create(BUCKET_NAME, FAKE_BUCKET_PATH, "fake-region")
Expand Down Expand Up @@ -210,4 +223,22 @@ class S3StorageOperationsTest {
assertEquals("1", s3StorageOperations.getPartId(FAKE_BUCKET_PATH))
assertEquals("0", s3StorageOperations.getPartId("other_path"))
}

@Test
fun testGetFileName() {
val recordsData =
Mockito.mock(
SerializableBuffer::class.java,
)
Mockito.`when`(recordsData.filename).thenReturn(".csv.gz")
assertEquals(
OBJECT_PREFIX + 0 + OBJECT_EXTENSION,
s3StorageOperations.getFileName(OBJECT_PREFIX, recordsData)
)
// 1 is skipped because it's already existing
assertEquals(
OBJECT_PREFIX + 2 + OBJECT_EXTENSION,
s3StorageOperations.getFileName(OBJECT_PREFIX, recordsData)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.44.20'
cdkVersionRequired = '0.44.21'
features = ['db-destinations', 's3-destinations']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 1.0.4
dockerImageTag: 1.0.5
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,8 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou
<summary>Expand to review</summary>

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------- |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.0.5 | 2024-09-05 | [45143](https://github.com/airbytehq/airbyte/pull/45143) | don't overwrite (and delete) existing files, skip indexes instead |
| 1.0.4 | 2024-08-30 | [44933](https://github.com/airbytehq/airbyte/pull/44933) | Fix: Avro/Parquet: handle empty schemas in nested objects/lists |
| 1.0.3 | 2024-08-20 | [44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb |
| 1.0.2 | 2024-08-19 | [44401](https://github.com/airbytehq/airbyte/pull/44401) | Fix: S3 Avro/Parquet: handle nullable top-level schema |
Expand Down
Loading