From 8630887b5d5b2ea7fe66057cffdfbd0afe0aed1d Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Mon, 9 Sep 2024 17:01:16 -0700 Subject: [PATCH] Bulk Load CDK: Simply Interface & Add Check --- .../io/airbyte/cdk/check/CheckOperation.kt | 28 ++++++++ .../io/airbyte/cdk/check/DestinationCheck.kt | 10 +++ .../io/airbyte/cdk/state/MemoryManager.kt | 2 +- .../io/airbyte/cdk/task/OpenStreamTask.kt | 8 +-- .../kotlin/io/airbyte/cdk/task/SetupTask.kt | 8 +-- .../io/airbyte/cdk/task/TeardownTask.kt | 8 +-- .../{Destination.kt => DestinationWrite.kt} | 20 +++--- .../io/airbyte/cdk/write/InputConsumer.kt | 10 --- .../io/airbyte/cdk/write/StreamLoader.kt | 64 ++++++------------- .../io/airbyte/cdk/write/WriteOperation.kt | 14 ++++ 10 files changed, 96 insertions(+), 76 deletions(-) create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/check/CheckOperation.kt create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/check/DestinationCheck.kt rename airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/{Destination.kt => DestinationWrite.kt} (54%) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/check/CheckOperation.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/check/CheckOperation.kt new file mode 100644 index 000000000000..5da8da2df22f --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/check/CheckOperation.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.check + +import io.airbyte.cdk.Operation +import io.airbyte.cdk.output.ExceptionHandler +import io.micronaut.context.annotation.Requires +import jakarta.inject.Singleton + +@Singleton +@Requires(property = Operation.PROPERTY, value = "check") +@Requires(env = ["destination"]) +class CheckOperation( + private val destination: DestinationCheck, + private val exceptionHandler: ExceptionHandler, +) : Operation { + override fun execute() { + try { + destination.check() + } catch (e: Exception) { + exceptionHandler.handle(e) + } finally { + destination.cleanup() + } + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/check/DestinationCheck.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/check/DestinationCheck.kt new file mode 100644 index 000000000000..bbf07446fe37 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/check/DestinationCheck.kt @@ -0,0 +1,10 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.check + +interface DestinationCheck { + fun check() + fun cleanup() {} +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/MemoryManager.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/MemoryManager.kt index d191223b08fd..f5511f58fcc1 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/MemoryManager.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/state/MemoryManager.kt @@ -19,7 +19,7 @@ import kotlinx.coroutines.sync.withLock * TODO: Some degree of logging/monitoring around how accurate we're actually being? */ @Singleton -class MemoryManager(availableMemoryProvider: AvailableMemoryProvider) { +class MemoryManager(private val availableMemoryProvider: AvailableMemoryProvider) { private val totalMemoryBytes: Long = availableMemoryProvider.availableMemoryBytes private var usedMemoryBytes = AtomicLong(0L) private val mutex = Mutex() diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/OpenStreamTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/OpenStreamTask.kt index b36f64c14a16..ffd56eb48d27 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/OpenStreamTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/OpenStreamTask.kt @@ -5,13 +5,13 @@ package io.airbyte.cdk.task import io.airbyte.cdk.command.DestinationStream -import io.airbyte.cdk.write.Destination +import io.airbyte.cdk.write.DestinationWrite import io.airbyte.cdk.write.StreamLoader import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton /** - * Wraps @[StreamLoader.open] and starts the spill-to-disk tasks. + * Wraps @[StreamLoader.start] and starts the spill-to-disk tasks. * * TODO: There's no reason to wait on initialization to start spilling to disk. */ @@ -20,7 +20,7 @@ class OpenStreamTask( private val taskLauncher: DestinationTaskLauncher ) : Task { override suspend fun execute() { - streamLoader.open() + streamLoader.start() taskLauncher.startSpillToDiskTasks(streamLoader) } } @@ -28,7 +28,7 @@ class OpenStreamTask( @Singleton @Secondary class OpenStreamTaskFactory( - private val destination: Destination, + private val destination: DestinationWrite, ) { fun make(taskLauncher: DestinationTaskLauncher, stream: DestinationStream): OpenStreamTask { return OpenStreamTask(destination.getStreamLoader(stream), taskLauncher) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/SetupTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/SetupTask.kt index 5b6f4e6dd6e5..881c311208e1 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/SetupTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/SetupTask.kt @@ -4,18 +4,18 @@ package io.airbyte.cdk.task -import io.airbyte.cdk.write.Destination +import io.airbyte.cdk.write.DestinationWrite import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton /** - * Wraps @[Destination.setup] and starts the open stream tasks. + * Wraps @[DestinationWrite.setup] and starts the open stream tasks. * * TODO: This should call something like "TaskLauncher.setupComplete" and let it decide what to do * next. */ class SetupTask( - private val destination: Destination, + private val destination: DestinationWrite, private val taskLauncher: DestinationTaskLauncher ) : Task { override suspend fun execute() { @@ -27,7 +27,7 @@ class SetupTask( @Singleton @Secondary class SetupTaskFactory( - private val destination: Destination, + private val destination: DestinationWrite, ) { fun make(taskLauncher: DestinationTaskLauncher): SetupTask { return SetupTask(destination, taskLauncher) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/TeardownTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/TeardownTask.kt index 5d76c2b260f9..d715cc5527f8 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/TeardownTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/task/TeardownTask.kt @@ -5,19 +5,19 @@ package io.airbyte.cdk.task import io.airbyte.cdk.state.StreamsManager -import io.airbyte.cdk.write.Destination +import io.airbyte.cdk.write.DestinationWrite import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton import java.util.concurrent.atomic.AtomicBoolean /** - * Wraps @[Destination.teardown] and stops the task launcher. + * Wraps @[DestinationWrite.teardown] and stops the task launcher. * * TODO: Report teardown-complete and let the task launcher decide what to do next. */ class TeardownTask( - private val destination: Destination, + private val destination: DestinationWrite, private val streamsManager: StreamsManager, private val taskLauncher: DestinationTaskLauncher ) : Task { @@ -44,7 +44,7 @@ class TeardownTask( @Singleton @Secondary class TeardownTaskFactory( - private val destination: Destination, + private val destination: DestinationWrite, private val streamsManager: StreamsManager, ) { fun make(taskLauncher: DestinationTaskLauncher): TeardownTask { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/Destination.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/DestinationWrite.kt similarity index 54% rename from airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/Destination.kt rename to airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/DestinationWrite.kt index db610ec8716a..a0bab2df82bc 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/Destination.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/DestinationWrite.kt @@ -9,26 +9,30 @@ import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton /** - * Implementor interface. Extended this only if you need to perform initialization and teardown - * *across all streams*, or if your per-stream operations need shared global state. - * - * If initialization can be done on a per-stream basis, implement @[StreamLoaderFactory] instead. + * Implementor interface. Every Destination must extend this and at least provide an implementation + * of [getStreamLoader]. */ -interface Destination { +interface DestinationWrite { // Called once before anything else suspend fun setup() {} // Return a StreamLoader for the given stream fun getStreamLoader(stream: DestinationStream): StreamLoader - // Called once at the end of the job + // Called once at the end of the job, unconditionally. suspend fun teardown(succeeded: Boolean = true) {} } @Singleton @Secondary -class DefaultDestination(private val streamLoaderFactory: StreamLoaderFactory) : Destination { +class DefaultDestinationWrite : DestinationWrite { + init { + throw NotImplementedError( + "DestinationWrite not implemented. Please create a custom @Singleton implementation." + ) + } + override fun getStreamLoader(stream: DestinationStream): StreamLoader { - return streamLoaderFactory.make(stream) + throw NotImplementedError() } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/InputConsumer.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/InputConsumer.kt index 9adb2bc4ffc2..898d9889692f 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/InputConsumer.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/InputConsumer.kt @@ -9,7 +9,6 @@ import io.airbyte.cdk.message.DestinationMessage import io.airbyte.cdk.message.MessageQueueWriter import io.github.oshai.kotlinlogging.KLogger import io.github.oshai.kotlinlogging.KotlinLogging -import io.micronaut.context.annotation.Factory import jakarta.inject.Singleton import java.io.InputStream import java.nio.charset.StandardCharsets @@ -63,12 +62,3 @@ class DefaultInputConsumer( ) : DeserializingInputStreamConsumer { override val log = KotlinLogging.logger {} } - -/** Override to provide a custom input stream. */ -@Factory -class InputStreamFactory { - @Singleton - fun make(): InputStream { - return System.`in` - } -} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/StreamLoader.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/StreamLoader.kt index d038eeea985e..6e35c0cea99f 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/StreamLoader.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/StreamLoader.kt @@ -8,61 +8,35 @@ import io.airbyte.cdk.command.DestinationStream import io.airbyte.cdk.message.Batch import io.airbyte.cdk.message.DestinationRecord import io.airbyte.cdk.message.SimpleBatch -import io.github.oshai.kotlinlogging.KotlinLogging -import io.micronaut.context.annotation.Secondary -import jakarta.inject.Singleton /** * Implementor interface. The framework calls open and close once per stream at the beginning and * end of processing. The framework calls processRecords once per batch of records as batches of the * configured size become available. (Specified in @ - * [io.airbyte.cdk.command.WriteConfiguration.recordBatchSizeBytes] + * [io.airbyte.cdk.command.WriteConfiguration.recordBatchSizeBytes]) * - * processBatch is called once per incomplete batch returned by either processRecords or - * processBatch itself. See @[io.airbyte.cdk.message.Batch] for more details. + * [start] is called once before any records are processed. + * + * [processRecords] is called whenever a batch of records is available for processing, and only + * after [start] has returned successfully. The return value is a client-defined implementation of @ + * [Batch] that the framework may pass to [processBatch] and/or [finalize]. (See @[Batch] for more + * details.) + * + * [processBatch] is called once per incomplete batch returned by either [processRecords] or + * [processBatch] itself. + * + * [finalize] is called once after all records and batches have been processed successfully. + * + * [close] is called once after all records have been processed, regardless of success or failure. + * If there are failed batches, they are passed in as an argument. */ interface StreamLoader { val stream: DestinationStream - suspend fun open() {} + suspend fun start() {} suspend fun processRecords(records: Iterator, totalSizeBytes: Long): Batch - suspend fun processBatch(batch: Batch): Batch = SimpleBatch(state = Batch.State.COMPLETE) - suspend fun close() {} -} - -/** - * Default stream loader (Not yet implemented) will process the records into a locally staged file - * of a format specified in the configuration. - */ -class DefaultStreamLoader( - override val stream: DestinationStream, -) : StreamLoader { - val log = KotlinLogging.logger {} - - override suspend fun processRecords( - records: Iterator, - totalSizeBytes: Long - ): Batch { - TODO( - "Default implementation adds airbyte metadata, maybe flattens, no-op maps, and converts to destination format" - ) - } -} - -/** - * If you do not need to perform initialization and teardown across all streams, or if your - * per-stream operations do not need shared global state, implement this interface instead of @ - * [Destination]. The framework will call it exactly once per stream to create instances that will - * be used for the life cycle of the stream. - */ -interface StreamLoaderFactory { - fun make(stream: DestinationStream): StreamLoader -} + suspend fun processBatch(batch: Batch): Batch = SimpleBatch(Batch.State.COMPLETE) + suspend fun finalize() {} -@Singleton -@Secondary -class DefaultStreamLoaderFactory() : StreamLoaderFactory { - override fun make(stream: DestinationStream): StreamLoader { - TODO("See above") - } + suspend fun close(failedBatches: List = emptyList()) {} } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/WriteOperation.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/WriteOperation.kt index 69fa3dfacc43..13f7a0035777 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/WriteOperation.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/write/WriteOperation.kt @@ -8,7 +8,10 @@ import io.airbyte.cdk.Operation import io.airbyte.cdk.message.DestinationMessage import io.airbyte.cdk.task.TaskLauncher import io.airbyte.cdk.task.TaskRunner +import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Requires +import io.micronaut.context.annotation.Secondary +import java.io.InputStream import javax.inject.Singleton import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking @@ -34,3 +37,14 @@ class WriteOperation( } } } + +/** Override to provide a custom input stream. */ +@Factory +class InputStreamFactory { + @Singleton + @Secondary + @Requires(property = Operation.PROPERTY, value = "write") + fun make(): InputStream { + return System.`in` + } +}