Skip to content

Commit

Permalink
Merge branch 'master' into orchestrahq/add-orchestra-to-docs
Browse files Browse the repository at this point in the history
  • Loading branch information
willdavies0 authored Sep 6, 2024
2 parents 03aaab4 + f23ae56 commit 222e391
Show file tree
Hide file tree
Showing 341 changed files with 10,393 additions and 8,203 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.64.1
current_version = 0.64.3
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
54 changes: 0 additions & 54 deletions .github/workflows/community_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,60 +76,6 @@ jobs:
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
subcommand: "format check all"
is_fork: "true"

check-review-requirements:
name: Check if a review is required from Connector teams on fork
if: github.event.pull_request.head.repo.fork == true
environment: community-ci-auto
runs-on: community-tooling-test-small
needs: fail_on_protected_path_changes
timeout-minutes: 10
env:
MAIN_BRANCH_NAME: "master"
permissions:
pull-requests: write
steps:
# This checkouts a fork which can contain untrusted code
# It's deemed safe as the review required check is not executing any checked out code
- name: Checkout fork
uses: actions/checkout@v4
with:
repository: ${{ github.event.pull_request.head.repo.full_name }}
ref: ${{ github.event.pull_request.head.sha }}
fetch-depth: 1
# This will sync the .github folder of the main repo with the fork
# This allows us to use up to date actions and CI logic from the main repo
- name: Pull .github folder from main repository
id: pull_github_folder
run: |
git remote add main https://github.com/airbytehq/airbyte.git
git fetch main ${MAIN_BRANCH_NAME}
git checkout main/${MAIN_BRANCH_NAME} -- .github
git checkout main/${MAIN_BRANCH_NAME} -- airbyte-ci
- name: Install Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
- name: Install ci-connector-ops package
run: |
pip install pipx
pipx ensurepath
pipx install airbyte-ci/connectors/connector_ops
- name: Write review requirements file
id: write-review-requirements-file
run: write-review-requirements-file >> $GITHUB_OUTPUT
- name: Get mandatory reviewers
id: get-mandatory-reviewers
run: print-mandatory-reviewers >> $GITHUB_OUTPUT
- name: Check if the review requirements are met
if: steps.write-review-requirements-file.outputs.CREATED_REQUIREMENTS_FILE == 'true'
uses: Automattic/action-required-review@v3
with:
status: ${{ steps.get-mandatory-reviewers.outputs.MANDATORY_REVIEWERS }}
token: ${{ secrets.OCTAVIA_4_ROOT_ACCESS }}
request-reviews: true
requirements-file: .github/connector_org_review_requirements.yaml

connectors_early_ci:
name: Run connectors early CI on fork
if: github.event.pull_request.head.repo.fork == true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,12 @@ class DefaultExceptionClassifier(
) : ExceptionClassifier {

override fun classify(e: Throwable): ConnectorError? {
return when (val connectorErrorException: ConnectorErrorException? = unwind(e)) {
is ConfigErrorException -> ConfigError(connectorErrorException.message!!)
is TransientErrorException -> TransientError(connectorErrorException.message!!)
is SystemErrorException -> SystemError(connectorErrorException.message)
null -> null
val unwound: Throwable? = ExceptionClassifier.unwind(e) { it is ConnectorErrorException }
return when (unwound) {
is ConfigErrorException -> ConfigError(unwound.message!!)
is TransientErrorException -> TransientError(unwound.message!!)
is SystemErrorException -> SystemError(unwound.message)
else -> null
}
}

/** Recursively walks the causes of [e] and returns the last [ConnectorErrorException]. */
fun unwind(e: Throwable): ConnectorErrorException? {
var connectorErrorException: ConnectorErrorException? = null
var unwound: Throwable? = e
while (unwound != null) {
if (unwound is ConnectorErrorException) {
connectorErrorException = unwound
}
unwound = unwound.cause
}
return connectorErrorException
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ interface ExceptionClassifier : Ordered {
val orderValue: Int

override fun getOrder(): Int = orderValue

companion object {
fun unwind(e: Throwable, stopUnwind: (Throwable) -> Boolean): Throwable? {
var unwound = e
while (!stopUnwind(unwound)) {
unwound = unwound.cause ?: return null
}
return unwound
}
}
}

/** Each [ConnectorError] subtype corresponds to a [AirbyteErrorTraceMessage.FailureType]. */
Expand Down Expand Up @@ -54,10 +64,8 @@ interface RuleBasedExceptionClassifier<T : RuleBasedExceptionClassifier.Rule> :

override fun classify(e: Throwable): ConnectorError? {
for (rule in rules) {
if (!rule.matches(e)) {
continue
}
val message: String = rule.output ?: e.message ?: e.toString()
val match: Throwable = ExceptionClassifier.unwind(e, rule::matches) ?: continue
val message: String = rule.output ?: match.message ?: match.toString()
val firstLine: String = if (rule.group == null) message else "${rule.group}: $message"
val lines: List<String> = listOf(firstLine) + rule.referenceLinks
val displayMessage: String = lines.joinToString(separator = "\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,16 @@ class RegexExceptionClassifierTest {
classifier.classify(RuntimeException("barbaz")),
)
}

@Test
fun testRecursiveRuleOrdering() {
Assertions.assertEquals(
ConfigError("grouped: has foo\nhttps://www.youtube.com/watch?v=xvFZjo5PgG0"),
classifier.classify(RuntimeException("quux", RuntimeException("foobarbaz"))),
)
Assertions.assertEquals(
TransientError("barbaz"),
classifier.classify(RuntimeException("quux", RuntimeException("barbaz"))),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ data class DestinationCatalog(
}
}

interface DestinationCatalogFactory {
fun make(): DestinationCatalog
}

@Factory
class DestinationCatalogFactory(
class DefaultDestinationCatalogFactory(
private val catalog: ConfiguredAirbyteCatalog,
private val streamFactory: DestinationStreamFactory
) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.message

import io.airbyte.protocol.models.v0.AirbyteGlobalState
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStateStats
import io.airbyte.protocol.models.v0.AirbyteStreamState
import io.airbyte.protocol.models.v0.StreamDescriptor
import jakarta.inject.Singleton

/**
* Converts the internal @[DestinationStateMessage] case class to the Protocol state messages
* required by @[io.airbyte.cdk.output.OutputConsumer]
*/
interface MessageConverter<T, U> {
fun from(message: T): U
}

@Singleton
class DefaultMessageConverter : MessageConverter<DestinationStateMessage, AirbyteMessage> {
override fun from(message: DestinationStateMessage): AirbyteMessage {
val state =
when (message) {
is DestinationStreamState ->
AirbyteStateMessage()
.withSourceStats(
AirbyteStateStats()
.withRecordCount(message.sourceStats.recordCount.toDouble())
)
.withDestinationStats(
message.destinationStats?.let {
AirbyteStateStats().withRecordCount(it.recordCount.toDouble())
}
?: throw IllegalStateException(
"Destination stats must be provided for DestinationStreamState"
)
)
.withType(AirbyteStateMessage.AirbyteStateType.STREAM)
.withStream(fromStreamState(message.streamState))
is DestinationGlobalState ->
AirbyteStateMessage()
.withSourceStats(
AirbyteStateStats()
.withRecordCount(message.sourceStats.recordCount.toDouble())
)
.withDestinationStats(
message.destinationStats?.let {
AirbyteStateStats().withRecordCount(it.recordCount.toDouble())
}
)
.withType(AirbyteStateMessage.AirbyteStateType.GLOBAL)
.withGlobal(
AirbyteGlobalState()
.withSharedState(message.state)
.withStreamStates(message.streamStates.map { fromStreamState(it) })
)
}
return AirbyteMessage().withState(state)
}

private fun fromStreamState(
streamState: DestinationStateMessage.StreamState
): AirbyteStreamState {
return AirbyteStreamState()
.withStreamDescriptor(
StreamDescriptor()
.withNamespace(streamState.stream.descriptor.namespace)
.withName(streamState.stream.descriptor.name)
)
.withStreamState(streamState.state)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class DestinationMessageQueueWriter(
private val catalog: DestinationCatalog,
private val messageQueue: MessageQueue<DestinationStream, DestinationRecordWrapped>,
private val streamsManager: StreamsManager,
private val stateManager: StateManager
private val stateManager: StateManager<DestinationStream, DestinationStateMessage>
) : MessageQueueWriter<DestinationMessage> {
/**
* Deserialize and route the message to the appropriate channel.
Expand All @@ -43,13 +43,12 @@ class DestinationMessageQueueWriter(
/* If the input message represents a record. */
is DestinationRecordMessage -> {
val manager = streamsManager.getManager(message.stream)
val index = manager.countRecordIn(sizeBytes)
when (message) {
/* If a data record */
is DestinationRecord -> {
val wrapped =
StreamRecordWrapped(
index = index,
index = manager.countRecordIn(),
sizeBytes = sizeBytes,
record = message
)
Expand All @@ -58,7 +57,7 @@ class DestinationMessageQueueWriter(

/* If an end-of-stream marker. */
is DestinationStreamComplete -> {
val wrapped = StreamCompleteWrapped(index)
val wrapped = StreamCompleteWrapped(index = manager.countEndOfStream())
messageQueue.getChannel(message.stream).send(wrapped)
}
}
Expand Down
Loading

0 comments on commit 222e391

Please sign in to comment.