Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Databricks: Create namespace if missing in CHECK #45208

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 072d5540-f236-4294-ba7c-ade8fd918496
dockerImageTag: 3.2.3
dockerImageTag: 3.2.4
dockerRepository: airbyte/destination-databricks
githubIssueLabel: destination-databricks
icon: databricks.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,33 @@ import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer
import io.airbyte.cdk.integrations.base.Destination
import io.airbyte.cdk.integrations.base.IntegrationRunner
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer
import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager
import io.airbyte.cdk.integrations.destination.async.deser.AirbyteMessageDeserializer
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.integrations.base.destination.operation.DefaultFlush
import io.airbyte.integrations.base.destination.operation.DefaultSyncOperation
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
import io.airbyte.integrations.base.destination.typing_deduping.ImportType
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus
import io.airbyte.integrations.base.destination.typing_deduping.Sql
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
import io.airbyte.integrations.destination.databricks.jdbc.DatabricksDestinationHandler
import io.airbyte.integrations.destination.databricks.jdbc.DatabricksNamingTransformer
import io.airbyte.integrations.destination.databricks.jdbc.DatabricksSqlGenerator
import io.airbyte.integrations.destination.databricks.model.DatabricksConnectorConfig
import io.airbyte.integrations.destination.databricks.operation.DatabricksStorageOperation
import io.airbyte.integrations.destination.databricks.operation.DatabricksStreamOperation
import io.airbyte.integrations.destination.databricks.operation.DatabricksStreamOperationFactory
import io.airbyte.integrations.destination.databricks.staging.DatabricksFileBufferFactory
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.*
Expand All @@ -50,124 +57,146 @@ class DatabricksDestination : BaseConnector(), Destination {
}

override fun check(config: JsonNode): AirbyteConnectionStatus? {
// TODO: Add proper checks for
// Check schema permissions, or if raw_override and default already exists
// Check catalog permissions to USE catalog
// Check CREATE volume, COPY INTO, File upload permissions
// Check Table creation, Table drop permissions
val connectorConfig = DatabricksConnectorConfig.deserialize(config)
val sqlGenerator =
DatabricksSqlGenerator(DatabricksNamingTransformer(), connectorConfig.database)
val datasource = DatabricksConnectorClientsFactory.createDataSource(connectorConfig)
val jdbcDatabase = DefaultJdbcDatabase(datasource)
val destinationHandler =
DatabricksDestinationHandler(sqlGenerator, connectorConfig.database, jdbcDatabase)
val workspaceClient =
DatabricksConnectorClientsFactory.createWorkspaceClient(
connectorConfig.hostname,
connectorConfig.authentication
)
val storageOperations =
DatabricksStorageOperation(
sqlGenerator,
destinationHandler,
workspaceClient,
connectorConfig.database,
connectorConfig.purgeStagingData
)
val dummyNamespace = connectorConfig.rawSchemaOverride
val dummyName = "airbyte_check_test_table"
val streamId =
StreamId(
dummyNamespace,
dummyName,
dummyNamespace,
dummyName,
dummyNamespace,
dummyName
)
val streamConfig =
StreamConfig(
id = streamId,
postImportAction = ImportType.APPEND,
primaryKey = listOf(),
cursor = Optional.empty(),
columns = linkedMapOf(),
generationId = 1,
minimumGenerationId = 1,
syncId = 0
)

// quick utility method to drop the airbyte_check_test_table table
// returns a connection status if there was an error, or null on success
fun dropCheckTable(): AirbyteConnectionStatus? {
val dropCheckTableStatement =
"DROP TABLE IF EXISTS `${connectorConfig.database}`.`${streamId.rawNamespace}`.`${streamId.rawName}`;"
try {
destinationHandler.execute(
Sql.of(
dropCheckTableStatement,
),
try {
val connectorConfig = DatabricksConnectorConfig.deserialize(config)
val datasource = DatabricksConnectorClientsFactory.createDataSource(connectorConfig)
val sqlGenerator =
DatabricksSqlGenerator(DatabricksNamingTransformer(), connectorConfig.database)
val jdbcDatabase = DefaultJdbcDatabase(datasource)
val destinationHandler =
DatabricksDestinationHandler(sqlGenerator, connectorConfig.database, jdbcDatabase)
val workspaceClient =
DatabricksConnectorClientsFactory.createWorkspaceClient(
connectorConfig.hostname,
connectorConfig.authentication
)
} catch (e: Exception) {
log.error(e) { "Failed to execute query $dropCheckTableStatement" }
return AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Failed to execute $dropCheckTableStatement: ${e.message}")
}
return null
}
val storageOperation =
DatabricksStorageOperation(
sqlGenerator,
destinationHandler,
workspaceClient,
connectorConfig.database,
connectorConfig.purgeStagingData
)
val rawTableNamespace = connectorConfig.rawSchemaOverride
val finalTableName = "airbyte_check_test_table"

// Before we start, clean up any preexisting check table from a previous attempt.
dropCheckTable()?.let {
return it
}
// Both raw & final Namespaces are same for dummy sync since we don't do any final table
// operations
// in check
val streamId =
sqlGenerator.buildStreamId(rawTableNamespace, finalTableName, rawTableNamespace)
val streamConfig =
StreamConfig(
id = streamId,
postImportAction = ImportType.APPEND,
primaryKey = listOf(),
cursor = Optional.empty(),
columns = linkedMapOf(),
generationId = 1,
minimumGenerationId = 1,
syncId = 0
)

try {
storageOperations.prepareStage(streamId, suffix = "")
} catch (e: Exception) {
log.error(e) { "Failed to prepare stage as part of CHECK" }
return AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Failed to prepare stage")
}
// quick utility method to drop the airbyte_check_test_table table
// returns a connection status if there was an error, or null on success
fun dropCheckTable(): AirbyteConnectionStatus? {
val dropCheckTableStatement =
"DROP TABLE IF EXISTS `${connectorConfig.database}`.`${streamId.rawNamespace}`.`${streamId.rawName}`;"
try {
destinationHandler.execute(
Sql.of(
dropCheckTableStatement,
),
)
} catch (e: Exception) {
log.error(e) { "Failed to execute query $dropCheckTableStatement" }
return AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Failed to execute $dropCheckTableStatement: ${e.message}")
}
return null
}

try {
val writeBuffer = DatabricksFileBufferFactory.createBuffer(FileUploadFormat.CSV)
writeBuffer.use {
it.accept(
"{\"airbyte_check\":\"passed\"}",
"{}",
generationId = 0,
System.currentTimeMillis()
// None of the fields in destination initial status matter
// for a dummy sync with type-dedupe disabled. We only look at these
// when we perform final table related setup operations.
// We just need the streamId to perform the calls in streamOperation.
val initialStatus =
DestinationInitialStatus(
streamConfig = streamConfig,
isFinalTablePresent = false,
initialRawTableStatus =
InitialRawTableStatus(
rawTableExists = false,
hasUnprocessedRecords = true,
maxProcessedTimestamp = Optional.empty()
),
initialTempRawTableStatus =
InitialRawTableStatus(
rawTableExists = false,
hasUnprocessedRecords = true,
maxProcessedTimestamp = Optional.empty()
),
isSchemaMismatch = true,
isFinalTableEmpty = true,
destinationState = MinimumDestinationState.Impl(needsSoftReset = false),
finalTableGenerationId = null,
finalTempTableGenerationId = null,
)
it.flush()
storageOperations.writeToStage(streamConfig, suffix = "", writeBuffer)

// We simulate a mini-sync to see the raw table code path is exercised. and disable T+D
// This code is similar to Snowflake's Check
destinationHandler.createNamespaces(setOf(rawTableNamespace))
// Before we start, clean up any preexisting check table from a previous attempt.
// Even though we clean up at the end. This exists because some version of the old
// connector
// didn't clean up properly and to let them pass the check we do it both before and
// after.
dropCheckTable()?.let {
return it
}
} catch (e: Exception) {
log.error(e) { "Failed to write to stage as part of CHECK" }
return AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Failed to write to stage")
}
val streamOperation =
DatabricksStreamOperation(
storageOperation,
initialStatus,
FileUploadFormat.CSV,
disableTypeDedupe = true
)

try {
storageOperations.cleanupStage(streamId)
val data =
"""
{"airbyte_check": "passed"}
""".trimIndent()
val message =
PartialAirbyteMessage()
.withSerialized(data)
.withRecord(
PartialAirbyteRecordMessage()
.withEmittedAt(System.currentTimeMillis())
.withMeta(
AirbyteRecordMessageMeta(),
),
)

streamOperation.writeRecords(streamConfig, listOf(message).stream())
streamOperation.finalizeTable(
streamConfig,
StreamSyncSummary(1, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE)
)
// Clean up after ourselves.
// Not _strictly_ necessary since we do this at the start of `check`,
// but it's slightly nicer.
dropCheckTable()?.let {
return it
}
return AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED)
} catch (e: Exception) {
log.error(e) { "Failed to cleanup stage" }
log.error(e) { "Failed to execute check" }
return AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Failed to cleanup stage")
.withMessage("${e.message}")
}

// Clean up after ourselves.
// Not _strictly_ necessary since we do this at the start of `check`,
// but it's slightly nicer.
dropCheckTable()?.let {
return it
}

return AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED)
}

override fun getSerializedMessageConsumer(
Expand All @@ -176,7 +205,6 @@ class DatabricksDestination : BaseConnector(), Destination {
outputRecordCollector: Consumer<AirbyteMessage>
): SerializedAirbyteMessageConsumer {

// TODO: Deserialization should be taken care by connector runner framework later
val connectorConfig = DatabricksConnectorConfig.deserialize(config)

val sqlGenerator =
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/destinations/databricks.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ with the raw tables, and their format is subject to change without notice.
<summary>Expand to review</summary>

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------------------------------------------------------------------ | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|:--------|:-----------|:--------------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.2.4 | 2024-09-09 | [#45208](https://github.com/airbytehq/airbyte/pull/45208) | Fix CHECK to create missing namespace if not exists. |
| 3.2.3 | 2024-09-03 | [#45115](https://github.com/airbytehq/airbyte/pull/45115) | Clarify Unity Catalog Name option. |
| 3.2.2 | 2024-08-22 | [#44941](https://github.com/airbytehq/airbyte/pull/44941) | Clarify Unity Catalog Path option. |
| 3.2.1 | 2024-08-22 | [#44506](https://github.com/airbytehq/airbyte/pull/44506) | Handle uppercase/mixed-case stream name/namespaces |
Expand Down
Loading