Skip to content

Commit

Permalink
Bulk Load CDK: Simply Interface & Add Check
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Sep 10, 2024
1 parent 5761f46 commit 8630887
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.check

import io.airbyte.cdk.Operation
import io.airbyte.cdk.output.ExceptionHandler
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton

@Singleton
@Requires(property = Operation.PROPERTY, value = "check")
@Requires(env = ["destination"])
class CheckOperation(
private val destination: DestinationCheck,
private val exceptionHandler: ExceptionHandler,
) : Operation {
override fun execute() {
try {
destination.check()
} catch (e: Exception) {
exceptionHandler.handle(e)
} finally {
destination.cleanup()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.check

interface DestinationCheck {
fun check()
fun cleanup() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import kotlinx.coroutines.sync.withLock
* TODO: Some degree of logging/monitoring around how accurate we're actually being?
*/
@Singleton
class MemoryManager(availableMemoryProvider: AvailableMemoryProvider) {
class MemoryManager(private val availableMemoryProvider: AvailableMemoryProvider) {
private val totalMemoryBytes: Long = availableMemoryProvider.availableMemoryBytes
private var usedMemoryBytes = AtomicLong(0L)
private val mutex = Mutex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
package io.airbyte.cdk.task

import io.airbyte.cdk.command.DestinationStream
import io.airbyte.cdk.write.Destination
import io.airbyte.cdk.write.DestinationWrite
import io.airbyte.cdk.write.StreamLoader
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton

/**
* Wraps @[StreamLoader.open] and starts the spill-to-disk tasks.
* Wraps @[StreamLoader.start] and starts the spill-to-disk tasks.
*
* TODO: There's no reason to wait on initialization to start spilling to disk.
*/
Expand All @@ -20,15 +20,15 @@ class OpenStreamTask(
private val taskLauncher: DestinationTaskLauncher
) : Task {
override suspend fun execute() {
streamLoader.open()
streamLoader.start()
taskLauncher.startSpillToDiskTasks(streamLoader)
}
}

@Singleton
@Secondary
class OpenStreamTaskFactory(
private val destination: Destination,
private val destination: DestinationWrite,
) {
fun make(taskLauncher: DestinationTaskLauncher, stream: DestinationStream): OpenStreamTask {
return OpenStreamTask(destination.getStreamLoader(stream), taskLauncher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@

package io.airbyte.cdk.task

import io.airbyte.cdk.write.Destination
import io.airbyte.cdk.write.DestinationWrite
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton

/**
* Wraps @[Destination.setup] and starts the open stream tasks.
* Wraps @[DestinationWrite.setup] and starts the open stream tasks.
*
* TODO: This should call something like "TaskLauncher.setupComplete" and let it decide what to do
* next.
*/
class SetupTask(
private val destination: Destination,
private val destination: DestinationWrite,
private val taskLauncher: DestinationTaskLauncher
) : Task {
override suspend fun execute() {
Expand All @@ -27,7 +27,7 @@ class SetupTask(
@Singleton
@Secondary
class SetupTaskFactory(
private val destination: Destination,
private val destination: DestinationWrite,
) {
fun make(taskLauncher: DestinationTaskLauncher): SetupTask {
return SetupTask(destination, taskLauncher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@
package io.airbyte.cdk.task

import io.airbyte.cdk.state.StreamsManager
import io.airbyte.cdk.write.Destination
import io.airbyte.cdk.write.DestinationWrite
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.util.concurrent.atomic.AtomicBoolean

/**
* Wraps @[Destination.teardown] and stops the task launcher.
* Wraps @[DestinationWrite.teardown] and stops the task launcher.
*
* TODO: Report teardown-complete and let the task launcher decide what to do next.
*/
class TeardownTask(
private val destination: Destination,
private val destination: DestinationWrite,
private val streamsManager: StreamsManager,
private val taskLauncher: DestinationTaskLauncher
) : Task {
Expand All @@ -44,7 +44,7 @@ class TeardownTask(
@Singleton
@Secondary
class TeardownTaskFactory(
private val destination: Destination,
private val destination: DestinationWrite,
private val streamsManager: StreamsManager,
) {
fun make(taskLauncher: DestinationTaskLauncher): TeardownTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,30 @@ import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton

/**
* Implementor interface. Extended this only if you need to perform initialization and teardown
* *across all streams*, or if your per-stream operations need shared global state.
*
* If initialization can be done on a per-stream basis, implement @[StreamLoaderFactory] instead.
* Implementor interface. Every Destination must extend this and at least provide an implementation
* of [getStreamLoader].
*/
interface Destination {
interface DestinationWrite {
// Called once before anything else
suspend fun setup() {}

// Return a StreamLoader for the given stream
fun getStreamLoader(stream: DestinationStream): StreamLoader

// Called once at the end of the job
// Called once at the end of the job, unconditionally.
suspend fun teardown(succeeded: Boolean = true) {}
}

@Singleton
@Secondary
class DefaultDestination(private val streamLoaderFactory: StreamLoaderFactory) : Destination {
class DefaultDestinationWrite : DestinationWrite {
init {
throw NotImplementedError(
"DestinationWrite not implemented. Please create a custom @Singleton implementation."
)
}

override fun getStreamLoader(stream: DestinationStream): StreamLoader {
return streamLoaderFactory.make(stream)
throw NotImplementedError()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import io.airbyte.cdk.message.DestinationMessage
import io.airbyte.cdk.message.MessageQueueWriter
import io.github.oshai.kotlinlogging.KLogger
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton
import java.io.InputStream
import java.nio.charset.StandardCharsets
Expand Down Expand Up @@ -63,12 +62,3 @@ class DefaultInputConsumer(
) : DeserializingInputStreamConsumer<DestinationMessage> {
override val log = KotlinLogging.logger {}
}

/** Override to provide a custom input stream. */
@Factory
class InputStreamFactory {
@Singleton
fun make(): InputStream {
return System.`in`
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,61 +8,35 @@ import io.airbyte.cdk.command.DestinationStream
import io.airbyte.cdk.message.Batch
import io.airbyte.cdk.message.DestinationRecord
import io.airbyte.cdk.message.SimpleBatch
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton

/**
* Implementor interface. The framework calls open and close once per stream at the beginning and
* end of processing. The framework calls processRecords once per batch of records as batches of the
* configured size become available. (Specified in @
* [io.airbyte.cdk.command.WriteConfiguration.recordBatchSizeBytes]
* [io.airbyte.cdk.command.WriteConfiguration.recordBatchSizeBytes])
*
* processBatch is called once per incomplete batch returned by either processRecords or
* processBatch itself. See @[io.airbyte.cdk.message.Batch] for more details.
* [start] is called once before any records are processed.
*
* [processRecords] is called whenever a batch of records is available for processing, and only
* after [start] has returned successfully. The return value is a client-defined implementation of @
* [Batch] that the framework may pass to [processBatch] and/or [finalize]. (See @[Batch] for more
* details.)
*
* [processBatch] is called once per incomplete batch returned by either [processRecords] or
* [processBatch] itself.
*
* [finalize] is called once after all records and batches have been processed successfully.
*
* [close] is called once after all records have been processed, regardless of success or failure.
* If there are failed batches, they are passed in as an argument.
*/
interface StreamLoader {
val stream: DestinationStream

suspend fun open() {}
suspend fun start() {}
suspend fun processRecords(records: Iterator<DestinationRecord>, totalSizeBytes: Long): Batch
suspend fun processBatch(batch: Batch): Batch = SimpleBatch(state = Batch.State.COMPLETE)
suspend fun close() {}
}

/**
* Default stream loader (Not yet implemented) will process the records into a locally staged file
* of a format specified in the configuration.
*/
class DefaultStreamLoader(
override val stream: DestinationStream,
) : StreamLoader {
val log = KotlinLogging.logger {}

override suspend fun processRecords(
records: Iterator<DestinationRecord>,
totalSizeBytes: Long
): Batch {
TODO(
"Default implementation adds airbyte metadata, maybe flattens, no-op maps, and converts to destination format"
)
}
}

/**
* If you do not need to perform initialization and teardown across all streams, or if your
* per-stream operations do not need shared global state, implement this interface instead of @
* [Destination]. The framework will call it exactly once per stream to create instances that will
* be used for the life cycle of the stream.
*/
interface StreamLoaderFactory {
fun make(stream: DestinationStream): StreamLoader
}
suspend fun processBatch(batch: Batch): Batch = SimpleBatch(Batch.State.COMPLETE)
suspend fun finalize() {}

@Singleton
@Secondary
class DefaultStreamLoaderFactory() : StreamLoaderFactory {
override fun make(stream: DestinationStream): StreamLoader {
TODO("See above")
}
suspend fun close(failedBatches: List<Batch> = emptyList()) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import io.airbyte.cdk.Operation
import io.airbyte.cdk.message.DestinationMessage
import io.airbyte.cdk.task.TaskLauncher
import io.airbyte.cdk.task.TaskRunner
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Secondary
import java.io.InputStream
import javax.inject.Singleton
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
Expand All @@ -34,3 +37,14 @@ class WriteOperation(
}
}
}

/** Override to provide a custom input stream. */
@Factory
class InputStreamFactory {
@Singleton
@Secondary
@Requires(property = Operation.PROPERTY, value = "write")
fun make(): InputStream {
return System.`in`
}
}

0 comments on commit 8630887

Please sign in to comment.