From b2eda57eb99274490ad96cff315983b4f654168b Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Wed, 4 Sep 2024 16:46:40 -0400 Subject: [PATCH 1/2] bulk-cdk: make exception classifiers recursive --- .../cdk/output/DefaultExceptionClassifier.kt | 24 +++++-------------- .../airbyte/cdk/output/ExceptionClassifier.kt | 16 +++++++++---- .../output/RegexExceptionClassifierTest.kt | 12 ++++++++++ .../cdk/output/JdbcExceptionClassifier.kt | 11 +++++---- 4 files changed, 36 insertions(+), 27 deletions(-) diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/DefaultExceptionClassifier.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/DefaultExceptionClassifier.kt index 71da76a5859f..fc6f09259b1a 100644 --- a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/DefaultExceptionClassifier.kt +++ b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/DefaultExceptionClassifier.kt @@ -22,24 +22,12 @@ class DefaultExceptionClassifier( ) : ExceptionClassifier { override fun classify(e: Throwable): ConnectorError? { - return when (val connectorErrorException: ConnectorErrorException? = unwind(e)) { - is ConfigErrorException -> ConfigError(connectorErrorException.message!!) - is TransientErrorException -> TransientError(connectorErrorException.message!!) - is SystemErrorException -> SystemError(connectorErrorException.message) - null -> null + val unwound: Throwable? = ExceptionClassifier.unwind(e) { it is ConnectorErrorException } + return when (unwound) { + is ConfigErrorException -> ConfigError(unwound.message!!) + is TransientErrorException -> TransientError(unwound.message!!) + is SystemErrorException -> SystemError(unwound.message) + else -> null } } - - /** Recursively walks the causes of [e] and returns the last [ConnectorErrorException]. */ - fun unwind(e: Throwable): ConnectorErrorException? { - var connectorErrorException: ConnectorErrorException? = null - var unwound: Throwable? = e - while (unwound != null) { - if (unwound is ConnectorErrorException) { - connectorErrorException = unwound - } - unwound = unwound.cause - } - return connectorErrorException - } } diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/ExceptionClassifier.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/ExceptionClassifier.kt index 0b7e19b44aec..21cec49bcbcd 100644 --- a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/ExceptionClassifier.kt +++ b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/ExceptionClassifier.kt @@ -15,6 +15,16 @@ interface ExceptionClassifier : Ordered { val orderValue: Int override fun getOrder(): Int = orderValue + + companion object { + fun unwind(e: Throwable, test: (Throwable) -> Boolean): Throwable? { + var unwound = e + while (!test(unwound)) { + unwound = unwound.cause ?: return null + } + return unwound + } + } } /** Each [ConnectorError] subtype corresponds to a [AirbyteErrorTraceMessage.FailureType]. */ @@ -54,10 +64,8 @@ interface RuleBasedExceptionClassifier : override fun classify(e: Throwable): ConnectorError? { for (rule in rules) { - if (!rule.matches(e)) { - continue - } - val message: String = rule.output ?: e.message ?: e.toString() + val match: Throwable = ExceptionClassifier.unwind(e, rule::matches) ?: continue + val message: String = rule.output ?: match.message ?: match.toString() val firstLine: String = if (rule.group == null) message else "${rule.group}: $message" val lines: List = listOf(firstLine) + rule.referenceLinks val displayMessage: String = lines.joinToString(separator = "\n") diff --git a/airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/output/RegexExceptionClassifierTest.kt b/airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/output/RegexExceptionClassifierTest.kt index e511d46488ec..2fa23ddd680a 100644 --- a/airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/output/RegexExceptionClassifierTest.kt +++ b/airbyte-cdk/bulk/core/base/src/test/kotlin/io/airbyte/cdk/output/RegexExceptionClassifierTest.kt @@ -80,4 +80,16 @@ class RegexExceptionClassifierTest { classifier.classify(RuntimeException("barbaz")), ) } + + @Test + fun testRecursiveRuleOrdering() { + Assertions.assertEquals( + ConfigError("grouped: has foo\nhttps://www.youtube.com/watch?v=xvFZjo5PgG0"), + classifier.classify(RuntimeException("quux", RuntimeException("foobarbaz"))), + ) + Assertions.assertEquals( + TransientError("barbaz"), + classifier.classify(RuntimeException("quux", RuntimeException("barbaz"))), + ) + } } diff --git a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/output/JdbcExceptionClassifier.kt b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/output/JdbcExceptionClassifier.kt index 806f0c1d10c6..adb8713bde4e 100644 --- a/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/output/JdbcExceptionClassifier.kt +++ b/airbyte-cdk/bulk/toolkits/extract-jdbc/src/main/kotlin/io/airbyte/cdk/output/JdbcExceptionClassifier.kt @@ -29,15 +29,16 @@ class JdbcExceptionClassifier( } override fun classify(e: Throwable): ConnectorError? { - if (e !is SQLException) return null + var match: SQLException = + ExceptionClassifier.unwind(e) { it is SQLException } as? SQLException ?: return null val decoratedMessage: String = listOfNotNull( - e.sqlState?.let { "State code: $it" }, - e.errorCode.takeIf { it != 0 }?.let { "Error code: $it" }, - e.message?.let { "Message: $it" }, + match.sqlState?.let { "State code: $it" }, + match.errorCode.takeIf { it != 0 }?.let { "Error code: $it" }, + match.message?.let { "Message: $it" }, ) .joinToString(separator = "; ") - val decoratedException = SQLException(decoratedMessage, e.sqlState, e.errorCode) + val decoratedException = SQLException(decoratedMessage, match.sqlState, match.errorCode) val ruleBasedMatch: ConnectorError? = super.classify(decoratedException) if (ruleBasedMatch != null) { return ruleBasedMatch From 82a908e1ecd3efe2b1f90b4b107aefc6ae9bf174 Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Wed, 4 Sep 2024 17:25:28 -0400 Subject: [PATCH 2/2] s/test/stopUnwind/ --- .../main/kotlin/io/airbyte/cdk/output/ExceptionClassifier.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/ExceptionClassifier.kt b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/ExceptionClassifier.kt index 21cec49bcbcd..1583a271fb6e 100644 --- a/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/ExceptionClassifier.kt +++ b/airbyte-cdk/bulk/core/base/src/main/kotlin/io/airbyte/cdk/output/ExceptionClassifier.kt @@ -17,9 +17,9 @@ interface ExceptionClassifier : Ordered { override fun getOrder(): Int = orderValue companion object { - fun unwind(e: Throwable, test: (Throwable) -> Boolean): Throwable? { + fun unwind(e: Throwable, stopUnwind: (Throwable) -> Boolean): Throwable? { var unwound = e - while (!test(unwound)) { + while (!stopUnwind(unwound)) { unwound = unwound.cause ?: return null } return unwound