diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 7e2423a22d08..db9ca4d8eae9 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -174,6 +174,7 @@ corresponds to that version. | Version | Date | Pull Request | Subject | |:-----------|:-----------| :----------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.45.0 | 2024-09-16 | [\#45469](https://github.com/airbytehq/airbyte/pull/45469) | Fix some race conditions, improve thread filtering, improve test logging | | 0.44.22 | 2024-09-10 | [\#45368](https://github.com/airbytehq/airbyte/pull/45368) | Remove excessive debezium logging | | 0.44.21 | 2024-09-04 | [\#45143](https://github.com/airbytehq/airbyte/pull/45143) | S3-destination: don't overwrite existing files, skip those file indexes instead | | 0.44.20 | 2024-08-30 | [\#44933](https://github.com/airbytehq/airbyte/pull/44933) | Avro/Parquet destinations: handle `{}` schemas inside objects/arrays | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/DefaultJdbcDatabase.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/DefaultJdbcDatabase.kt index 407f399d646c..ec006b75d885 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/DefaultJdbcDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/DefaultJdbcDatabase.kt @@ -116,19 +116,28 @@ constructor( recordTransform: CheckedFunction ): Stream { val connection = dataSource.connection - return JdbcDatabase.Companion.toUnsafeStream( - statementCreator.apply(connection).executeQuery(), - recordTransform - ) - .onClose( - Runnable { - try { - LOGGER.info { "closing connection" } - connection.close() - } catch (e: SQLException) { - throw RuntimeException(e) + try { + return JdbcDatabase.Companion.toUnsafeStream( + statementCreator.apply(connection).executeQuery(), + recordTransform + ) + .onClose( + Runnable { + try { + LOGGER.info { "closing connection" } + connection.close() + } catch (e: SQLException) { + throw RuntimeException(e) + } } - } - ) + ) + } catch (e: Throwable) { + // this is ugly because we usually don't close the connection here. + // We expect the calleer to close the returned stream, which will call the onClose + // but if the executeQuery threw an exception, we still need to close the connection + LOGGER.warn(e) { "closing connection because of an Exception" } + connection.close() + throw e + } } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt index 1ce78e39d2b3..d9be7e843e90 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt @@ -316,6 +316,45 @@ internal constructor( } } + data class OrphanedThreadInfo + private constructor( + val thread: Thread, + val threadCreationInfo: ThreadCreationInfo, + val lastStackTrace: List + ) { + fun getLogString(): String { + return String.format( + "%s (%s)\n Thread stacktrace: %s", + thread.name, + thread.state, + lastStackTrace.joinToString("\n at ") + ) + } + + companion object { + fun getAll(): List { + return ThreadUtils.getAllThreads().mapNotNull { getForThread(it) } + } + + fun getForThread(thread: Thread): OrphanedThreadInfo? { + val threadCreationInfo = + getMethod.invoke(threadCreationInfo, thread) as ThreadCreationInfo? + val stack = thread.stackTrace.asList() + if (threadCreationInfo == null) { + return null + } + return OrphanedThreadInfo(thread, threadCreationInfo, stack) + } + + // ThreadLocal.get(Thread) is private. So we open it and keep a reference to the + // opened method + private val getMethod: Method = + ThreadLocal::class.java.getDeclaredMethod("get", Thread::class.java).also { + it.isAccessible = true + } + } + } + class ThreadCreationInfo { val stack: List = Thread.currentThread().stackTrace.asList() val time: Instant = Instant.now() @@ -327,25 +366,13 @@ internal constructor( companion object { private val threadCreationInfo: InheritableThreadLocal = object : InheritableThreadLocal() { - override fun childValue(parentValue: ThreadCreationInfo): ThreadCreationInfo { + override fun childValue(parentValue: ThreadCreationInfo?): ThreadCreationInfo { return ThreadCreationInfo() } } const val TYPE_AND_DEDUPE_THREAD_NAME: String = "type-and-dedupe" - // ThreadLocal.get(Thread) is private. So we open it and keep a reference to the - // opened method - private val getMethod: Method = - ThreadLocal::class.java.getDeclaredMethod("get", Thread::class.java).also { - it.isAccessible = true - } - - @JvmStatic - fun getThreadCreationInfo(thread: Thread): ThreadCreationInfo? { - return getMethod.invoke(threadCreationInfo, thread) as ThreadCreationInfo? - } - /** * Filters threads that should not be considered when looking for orphaned threads at * shutdown of the integration runner. @@ -355,11 +382,11 @@ internal constructor( * active so long as the database connection pool is open. */ @VisibleForTesting - private val orphanedThreadPredicates: MutableList<(Thread) -> Boolean> = - mutableListOf({ runningThread: Thread -> - (runningThread.name != Thread.currentThread().name && - !runningThread.isDaemon && - TYPE_AND_DEDUPE_THREAD_NAME != runningThread.name) + private val orphanedThreadPredicates: MutableList<(OrphanedThreadInfo) -> Boolean> = + mutableListOf({ runningThreadInfo: OrphanedThreadInfo -> + (runningThreadInfo.thread.name != Thread.currentThread().name && + !runningThreadInfo.thread.isDaemon && + TYPE_AND_DEDUPE_THREAD_NAME != runningThreadInfo.thread.name) }) const val INTERRUPT_THREAD_DELAY_MINUTES: Int = 1 @@ -402,12 +429,12 @@ internal constructor( } @JvmStatic - fun addOrphanedThreadFilter(predicate: (Thread) -> (Boolean)) { + fun addOrphanedThreadFilter(predicate: (OrphanedThreadInfo) -> (Boolean)) { orphanedThreadPredicates.add(predicate) } - fun filterOrphanedThread(thread: Thread): Boolean { - return orphanedThreadPredicates.all { it(thread) } + fun filterOrphanedThread(threadInfo: OrphanedThreadInfo): Boolean { + return orphanedThreadPredicates.all { it(threadInfo) } } /** @@ -437,8 +464,8 @@ internal constructor( ) { val currentThread = Thread.currentThread() - val runningThreads = ThreadUtils.getAllThreads().filter(::filterOrphanedThread) - if (runningThreads.isNotEmpty()) { + val runningThreadInfos = OrphanedThreadInfo.getAll().filter(::filterOrphanedThread) + if (runningThreadInfos.isNotEmpty()) { LOGGER.warn { """ The main thread is exiting while children non-daemon threads from a connector are still active. @@ -457,18 +484,15 @@ internal constructor( .daemon(true) .build() ) - for (runningThread in runningThreads) { - val str = - "Active non-daemon thread: " + - dumpThread(runningThread) + - "\ncreationStack=${getThreadCreationInfo(runningThread)}" + for (runningThreadInfo in runningThreadInfos) { + val str = "Active non-daemon thread info: ${runningThreadInfo.getLogString()}" LOGGER.warn { str } // even though the main thread is already shutting down, we still leave some // chances to the children // threads to close properly on their own. // So, we schedule an interrupt hook after a fixed time delay instead... scheduledExecutorService.schedule( - { runningThread.interrupt() }, + { runningThreadInfo.thread.interrupt() }, interruptTimeDelay.toLong(), interruptTimeUnit ) @@ -493,6 +517,7 @@ internal constructor( } private fun dumpThread(thread: Thread): String { + OrphanedThreadInfo.getForThread(thread) return String.format( "%s (%s)\n Thread stacktrace: %s", thread.name, diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 61353e8975b2..7da2702998b3 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.44.23 +version=0.45.0 diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtilityTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtilityTest.kt index 3fd9480cffe0..98afff065026 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtilityTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtilityTest.kt @@ -48,8 +48,18 @@ class AirbyteTraceMessageUtilityTest { Mockito.mock(RuntimeException::class.java), "this is a config error" ) - val outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8)) - assertJsonNodeIsTraceMessage(outJson) + val outCt = outContent.toString(StandardCharsets.UTF_8) + var outJson: JsonNode? = null + // because we are running tests in parallel, it's possible that another test is writing to + // stdout while we run this test, in which case we'd see their messages. + // we filter through the messages to find an error (hopefully hours) + for (line in outCt.split('\n')) { + if (line.contains("\"error\"")) { + outJson = Jsons.deserialize(line) + break + } + } + assertJsonNodeIsTraceMessage(outJson!!) Assertions.assertEquals("config_error", outJson["trace"]["error"]["failure_type"].asText()) } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunnerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunnerTest.kt index 7503bd2f0b7e..6e562dea20b1 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunnerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunnerTest.kt @@ -442,11 +442,12 @@ ${Jsons.serialize(message2)}""".toByteArray( } catch (e: Exception) { throw RuntimeException(e) } - val runningThreads = - ThreadUtils.getAllThreads().filter(IntegrationRunner::filterOrphanedThread) + val runningThreadInfos = + IntegrationRunner.OrphanedThreadInfo.getAll() + .filter(IntegrationRunner::filterOrphanedThread) // all threads should be interrupted - Assertions.assertEquals(listOf(), runningThreads) + Assertions.assertEquals(listOf(), runningThreadInfos) Assertions.assertEquals(1, caughtExceptions.size) } @@ -468,11 +469,12 @@ ${Jsons.serialize(message2)}""".toByteArray( throw RuntimeException(e) } - val runningThreads = - ThreadUtils.getAllThreads().filter(IntegrationRunner::filterOrphanedThread) + val runningThreadInfos = + IntegrationRunner.OrphanedThreadInfo.getAll() + .filter(IntegrationRunner::filterOrphanedThread) // a thread that refuses to be interrupted should remain - Assertions.assertEquals(1, runningThreads.size) + Assertions.assertEquals(1, runningThreadInfos.size) Assertions.assertEquals(1, caughtExceptions.size) Assertions.assertTrue(exitCalled.get()) } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt index f15596af805d..c40503722f75 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt @@ -11,6 +11,7 @@ import java.time.format.DateTimeParseException import java.util.* import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException +import java.util.concurrent.atomic.AtomicLong import java.util.regex.Pattern import kotlin.concurrent.Volatile import org.apache.commons.lang3.StringUtils @@ -88,7 +89,7 @@ class LoggingInvocationInterceptor : InvocationInterceptor { logLineSuffix = "execution of unknown intercepted call $methodName" } val currentThread = Thread.currentThread() - val timeoutTask = TimeoutInteruptor(currentThread) + val timeoutTask = TimeoutInteruptor(currentThread, logLineSuffix) val start = Instant.now() try { val timeout = reflectiveInvocationContext?.let(::getTimeout) @@ -116,6 +117,7 @@ class LoggingInvocationInterceptor : InvocationInterceptor { val elapsedMs = Duration.between(start, Instant.now()).toMillis() val t1: Throwable if (timeoutTask.wasTriggered) { + LOGGER.info { "timeoutTask ${timeoutTask.id} was triggered." } val timeoutAsString = DurationFormatUtils.formatDurationWords(elapsedMs, true, true) t1 = @@ -126,6 +128,7 @@ class LoggingInvocationInterceptor : InvocationInterceptor { ) t1.initCause(throwable) } else { + LOGGER.info { "timeoutTask ${timeoutTask.id} was not triggered." } t1 = throwable } var belowCurrentCall = false @@ -157,25 +160,36 @@ class LoggingInvocationInterceptor : InvocationInterceptor { throw t1 } finally { timeoutTask.cancel() - TestContext.CURRENT_TEST_NAME.set(null) + TestContext.CURRENT_TEST_NAME.set(TestContext.NO_RUNNING_TEST) } } - private class TimeoutInteruptor(private val parentThread: Thread) : TimerTask() { + private class TimeoutInteruptor( + private val parentThread: Thread, + private val context: String + ) : TimerTask() { @Volatile var wasTriggered: Boolean = false + val id = timerIdentifier.incrementAndGet() override fun run() { LOGGER.info( - "interrupting running task on ${parentThread.name}. Current Stacktrace is ${parentThread.stackTrace.asList()}" + "interrupting running task on ${parentThread.name}. " + + "Current Stacktrace is ${parentThread.stackTrace.asList()}" + + "TimeoutIterruptor $id interrupting running task on ${parentThread.name}: $context. " + + "Current Stacktrace is ${parentThread.stackTrace.asList()}" ) wasTriggered = true parentThread.interrupt() } override fun cancel(): Boolean { - LOGGER.info("cancelling timer task on ${parentThread.name}") + LOGGER.info("cancelling TimeoutIterruptor $id on ${parentThread.name}") return super.cancel() } + + companion object { + private val timerIdentifier = AtomicLong(1) + } } companion object { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/TestContext.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/TestContext.kt index 6608ec0696f6..88ddea448802 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/TestContext.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/TestContext.kt @@ -5,5 +5,11 @@ package io.airbyte.cdk.extensions object TestContext { - val CURRENT_TEST_NAME: ThreadLocal = ThreadLocal() + const val NO_RUNNING_TEST = "NONE" + val CURRENT_TEST_NAME: ThreadLocal = + object : ThreadLocal() { + override fun initialValue(): String { + return NO_RUNNING_TEST + } + } } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteDestination.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteDestination.kt index cb1e10ec944c..83a3377c7f50 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteDestination.kt @@ -5,6 +5,7 @@ package io.airbyte.workers.internal import com.google.common.base.Charsets import com.google.common.base.Preconditions +import io.airbyte.cdk.extensions.TestContext import io.airbyte.commons.io.IOs import io.airbyte.commons.io.LineGobbler import io.airbyte.commons.json.Jsons @@ -182,7 +183,7 @@ constructor( fun createContainerLogMdcBuilder(): MdcScope.Builder = MdcScope.Builder() - .setLogPrefix("destination") + .setLogPrefix("destination-${TestContext.CURRENT_TEST_NAME.get()}") .setPrefixColor(LoggingHelper.Color.YELLOW_BACKGROUND) val IGNORED_EXIT_CODES: Set = setOf(