Skip to content

Commit

Permalink
chore: move main entry point command script logic generation to Conta…
Browse files Browse the repository at this point in the history
…inerCom… (#13138)
  • Loading branch information
tryangul committed Jul 17, 2024
1 parent f9790db commit 7bccaef
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ class ConnectorWatcher(
while (!areNeededFilesPresent()) {
Thread.sleep(100)
if (fileTimeoutReach(stopwatch)) {
logger.warn { "Failed to find output files from connector within timeout $fileTimeoutMinutes. Is the connector still running?" }
logger.warn { "Failed to find output files from connector within timeout $fileTimeoutMinutes minute(s). Is the connector still running?" }
val failureReason =
FailureReason()
.withFailureOrigin(FailureReason.FailureOrigin.UNKNOWN)
.withExternalMessage("Failed to find output files from connector within timeout $fileTimeoutMinutes.")
.withExternalMessage("Failed to find output files from connector within timeout $fileTimeoutMinutes minute(s).")

failWorkload(workloadId, failureReason)
exitFileNotFound()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import io.airbyte.featureflag.UseCustomK8sScheduler
import io.airbyte.workers.process.KubeContainerInfo
import io.airbyte.workers.process.KubePodInfo
import io.airbyte.workers.process.KubePodProcess
import io.airbyte.workers.sync.OrchestratorConstants
import io.fabric8.kubernetes.api.model.CapabilitiesBuilder
import io.fabric8.kubernetes.api.model.Container
import io.fabric8.kubernetes.api.model.ContainerBuilder
Expand Down Expand Up @@ -106,25 +105,13 @@ class ConnectorPodFactory(
containerInfo: KubeContainerInfo,
extraEnvVars: List<EnvVar>,
): Container {
val configArg =
val configArgs =
connectorArgs.map {
(k, v) ->
"--$k $v"
}.joinToString(prefix = " ", separator = " ")

val mainCommand =
"""
pwd
eval "${'$'}AIRBYTE_ENTRYPOINT $operationCommand $configArg" > ${KubePodProcess.CONFIG_DIR}/${OrchestratorConstants.JOB_OUTPUT_FILENAME}
exit_code=${'$'}?
echo ${'$'}exit_code > ${KubePodProcess.CONFIG_DIR}/${OrchestratorConstants.EXIT_CODE_FILE}
cat ${KubePodProcess.CONFIG_DIR}/${OrchestratorConstants.JOB_OUTPUT_FILENAME}
exit ${'$'}exit_code
""".trimIndent()
val mainCommand = ContainerCommandFactory.connectorOperation(operationCommand, configArgs)

return ContainerBuilder()
.withName(KubePodProcess.MAIN_CONTAINER_NAME)
Expand All @@ -140,10 +127,13 @@ class ConnectorPodFactory(
}

private fun buildSidecarContainer(volumeMounts: List<VolumeMount>): Container {
val mainCommand = ContainerCommandFactory.sidecar()

return ContainerBuilder()
.withName(KubePodProcess.SIDECAR_CONTAINER_NAME)
.withImage(sidecarContainerInfo.image)
.withImagePullPolicy(sidecarContainerInfo.pullPolicy)
.withCommand("sh", "-c", mainCommand)
.withWorkingDir(KubePodProcess.CONFIG_DIR)
.withEnv(sideCarEnvVars)
.withVolumeMounts(volumeMounts)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.airbyte.workload.launcher.pods.factories

import io.airbyte.workers.process.KubePodProcess
import io.airbyte.workers.sync.OrchestratorConstants

/**
* Factory for generating/templating the main shell scripts we use as the entry points in our containers.
* Factor out into Singleton as necessary.
*/
object ContainerCommandFactory {
private const val TERMINATION_MARKER_FILE = "TERMINATED"

// WARNING: Fragile. Coupled to our conventions on building, unpacking and naming our executables.
private const val SIDE_CAR_APPLICATION_EXECUTABLE = "/app/airbyte-app/bin/airbyte-connector-sidecar"
private const val TERMINATION_CHECK_INTERVAL_SECONDS = 10

/**
* Runs the sidecar container and creates the TERMINATION_MARKER_FILE file on exit for both success and failure.
*/
fun sidecar() =
"""
trap "touch $TERMINATION_MARKER_FILE" EXIT
$SIDE_CAR_APPLICATION_EXECUTABLE
""".trimIndent()

/**
* Starts the connector operation and a monitor process that conditionally kills the connector and exits if
* a TERMINATION_MARKER_FILE is present. This ensures the connector exits when the sidecar exits.
*/
fun connectorOperation(
operationCommand: String,
configArgs: String,
) = """
# fail loudly if entry point not set
if [ -z "${'$'}AIRBYTE_ENTRYPOINT" ]; then
echo "Entrypoint was not set! AIRBYTE_ENTRYPOINT must be set in the container to run on Kubernetes."
exit 127
else
echo "Using existing AIRBYTE_ENTRYPOINT: ${'$'}AIRBYTE_ENTRYPOINT"
fi
# run connector operation in background and store PID
(eval "${'$'}AIRBYTE_ENTRYPOINT $operationCommand $configArgs" > ${KubePodProcess.CONFIG_DIR}/${OrchestratorConstants.JOB_OUTPUT_FILENAME}) &
CHILD_PID=${'$'}!
# run busy loop in background that checks for termination file and if present kills the connector operation and exits
(while true; do if [ -f $TERMINATION_MARKER_FILE ]; then kill ${'$'}CHILD_PID; exit 0; fi; sleep $TERMINATION_CHECK_INTERVAL_SECONDS; done) &
# wait on connector operation
wait ${'$'}CHILD_PID
EXIT_CODE=$?
# write its exit code to a file for the sidecar
echo ${'$'}EXIT_CODE > ${KubePodProcess.CONFIG_DIR}/${OrchestratorConstants.EXIT_CODE_FILE}
# print result for debugging
cat ${KubePodProcess.CONFIG_DIR}/${OrchestratorConstants.JOB_OUTPUT_FILENAME}
# propagate connector exit code by assuming it
exit ${'$'}EXIT_CODE
""".trimIndent()
}

0 comments on commit 7bccaef

Please sign in to comment.