Skip to content

Commit

Permalink
issue #757 Initialization failure handling control
Browse files Browse the repository at this point in the history
  • Loading branch information
wajda committed Oct 23, 2023
1 parent 42ec158 commit 5ddc472
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2023 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.harvester.conf;

public enum InitFailureHandlingMode {

// Log errors and continue without lineage tracking
LOG,

// Propagate errors to the Spark process
BREAK,
}
5 changes: 5 additions & 0 deletions core/src/main/resources/spline.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ spline:
# (DON'T MODIFY UNLESS YOU UNDERSTAND THE IMPLICATIONS)
internal.execPlan.uuid.version: 5

# How the agent should respond to initialization errors:
# - LOG (log the error and disable the agent. Spark job continues unaffected without lineage tracking.)
# - BREAK (propagate the error to the Spark process.)
onInitFailure: LOG

# Should the agent capture failed executions:
# - NONE (only capture successful executions)
# - NON_FATAL (capture successful executions, and failed executions, but only when the error is non-fatal)
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/za/co/absa/spline/agent/AgentBOM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.SparkSession
import za.co.absa.spline.HierarchicalObjectFactory
import za.co.absa.spline.agent.AgentConfig.ConfProperty
import za.co.absa.spline.harvester.IdGenerator.UUIDVersion
import za.co.absa.spline.harvester.conf.{SQLFailureCaptureMode, SplineMode}
import za.co.absa.spline.harvester.conf.{InitFailureHandlingMode, SQLFailureCaptureMode, SplineMode}
import za.co.absa.spline.harvester.dispatcher.{CompositeLineageDispatcher, LineageDispatcher}
import za.co.absa.spline.harvester.iwd.IgnoredWriteDetectionStrategy
import za.co.absa.spline.harvester.postprocessing.{CompositePostProcessingFilter, PostProcessingFilter}
Expand All @@ -32,6 +32,7 @@ import scala.reflect.ClassTag

private[spline] trait AgentBOM {
def splineMode: SplineMode
def initFailureHandlingMode: InitFailureHandlingMode
def sqlFailureCaptureMode: SQLFailureCaptureMode
def postProcessingFilter: Option[PostProcessingFilter]
def lineageDispatcher: LineageDispatcher
Expand All @@ -58,6 +59,10 @@ object AgentBOM {
mergedConfig.getRequiredEnum[SQLFailureCaptureMode](ConfProperty.SQLFailureCaptureMode)
}

override def initFailureHandlingMode: InitFailureHandlingMode = {
mergedConfig.getRequiredEnum[InitFailureHandlingMode](ConfProperty.InitFailureHandlingMode)
}

override def execPlanUUIDVersion: UUIDVersion = {
mergedConfig.getRequiredInt(ConfProperty.ExecPlanUUIDVersion)
}
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/za/co/absa/spline/agent/AgentConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object AgentConfig {

def from(configuration: Configuration): AgentConfig = from(
configuration
.getKeys.asScala.toSeq.asInstanceOf[Seq[String]]
.getKeys.asScala.toSeq
.map(k => k -> configuration.getProperty(k)))

def from(options: Iterable[(String, Any)]): AgentConfig =
Expand Down Expand Up @@ -95,10 +95,17 @@ object AgentConfig {
*/
val Mode = "spline.mode"

/**
* How Spline should handle initialization errors.
*
* @see [[za.co.absa.spline.harvester.conf.InitFailureHandlingMode]]
*/
val InitFailureHandlingMode = "spline.onInitFailure"

/**
* How Spline should handle failed SQL executions.
*
* @see [[SQLFailureCaptureMode]]
* @see [[za.co.absa.spline.harvester.conf.SQLFailureCaptureMode]]
*/
val SQLFailureCaptureMode = "spline.sql.failure.capture"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private[spline] class SparkLineageInitializer(sparkSession: SparkSession) extend
logInfo("initialization aborted")
None
}
else withErrorHandling {
else withErrorHandling(bom.initFailureHandlingMode) {
if (isCodelessInit)
Some(createListener(bom))
else
Expand Down Expand Up @@ -171,11 +171,11 @@ private[spline] class SparkLineageInitializer(sparkSession: SparkSession) extend
}
}

private def withErrorHandling(body: => Option[QueryExecutionListener]) = {
private def withErrorHandling(initFailureMode: InitFailureHandlingMode)(body: => Option[QueryExecutionListener]) = {
try {
body
} catch {
case NonFatal(e) =>
case NonFatal(e) if initFailureMode == InitFailureHandlingMode.LOG =>
logError(s"Spline initialization failed! Spark Lineage tracking is DISABLED.", e)
None
}
Expand Down
1 change: 1 addition & 0 deletions examples/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ RUN chmod a+x /entrypoint.sh
# Bind environment variables
ENV SPLINE_PRODUCER_URL=
ENV SPLINE_MODE=ENABLED
ENV ON_INIT_FAILURE=BREAK
ENV DISABLE_SSL_VALIDATION=false

ENV HTTP_PROXY_HOST=
Expand Down
1 change: 1 addition & 0 deletions examples/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ exec ./run.sh \
-Dspline.lineageDispatcher=http \
-Dspline.lineageDispatcher.http.producer.url="$SPLINE_PRODUCER_URL" \
-Dspline.lineageDispatcher.http.disableSslValidation="$DISABLE_SSL_VALIDATION" \
-Dspline.onInitFailure="$ON_INIT_FAILURE" \
-Dspline.mode="$SPLINE_MODE" \
-Dhttp.proxyHost="$HTTP_PROXY_HOST" \
-Dhttp.proxyPort="$HTTP_PROXY_PORT" \
Expand Down

0 comments on commit 5ddc472

Please sign in to comment.