Skip to content

Commit

Permalink
Add new configuration allowing to keep processing when there are fata…
Browse files Browse the repository at this point in the history
…l exceptions or timeout (#1708)
  • Loading branch information
yehaolan authored Nov 25, 2024
1 parent 0cf9f9a commit 63c86b5
Show file tree
Hide file tree
Showing 4 changed files with 313 additions and 26 deletions.
27 changes: 27 additions & 0 deletions samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ public class TaskConfig extends MapConfig {
public static final String COMMIT_TIMEOUT_MS = "task.commit.timeout.ms";
static final long DEFAULT_COMMIT_TIMEOUT_MS = Duration.ofMinutes(30).toMillis();

// Flag to indicate whether to skip commit during failures (exceptions or timeouts)
// The number of allowed successive commit exceptions and timeouts are controlled by the following two configs.
public static final String SKIP_COMMIT_DURING_FAILURES_ENABLED = "task.commit.skip.commit.during.failures.enabled";
private static final boolean DEFAULT_SKIP_COMMIT_DURING_FAILURES_ENABLED = false;

// Maximum number of allowed successive commit exceptions.
// If the number of successive commit exceptions exceeds this limit, the task will be shut down.
public static final String SKIP_COMMIT_EXCEPTION_MAX_LIMIT = "task.commit.skip.commit.exception.max.limit";
private static final int DEFAULT_SKIP_COMMIT_EXCEPTION_MAX_LIMIT = 5;

// Maximum number of allowed successive commit timeouts.
// If the number of successive commit timeout exceeds this limit, the task will be shut down.
public static final String SKIP_COMMIT_TIMEOUT_MAX_LIMIT = "task.commit.skip.commit.timeout.max.limit";
private static final int DEFAULT_SKIP_COMMIT_TIMEOUT_MAX_LIMIT = 2;

// how long to wait for a clean shutdown
public static final String TASK_SHUTDOWN_MS = "task.shutdown.ms";
static final long DEFAULT_TASK_SHUTDOWN_MS = 30000L;
Expand Down Expand Up @@ -418,4 +433,16 @@ public long getWatermarkIdleTimeoutMs() {
public double getWatermarkQuorumSizePercentage() {
return getDouble(WATERMARK_QUORUM_SIZE_PERCENTAGE, DEFAULT_WATERMARK_QUORUM_SIZE_PERCENTAGE);
}

public boolean getSkipCommitDuringFailuresEnabled() {
return getBoolean(SKIP_COMMIT_DURING_FAILURES_ENABLED, DEFAULT_SKIP_COMMIT_DURING_FAILURES_ENABLED);
}

public int getSkipCommitExceptionMaxLimit() {
return getInt(SKIP_COMMIT_EXCEPTION_MAX_LIMIT, DEFAULT_SKIP_COMMIT_EXCEPTION_MAX_LIMIT);
}

public int getSkipCommitTimeoutMaxLimit() {
return getInt(SKIP_COMMIT_TIMEOUT_MAX_LIMIT, DEFAULT_SKIP_COMMIT_TIMEOUT_MAX_LIMIT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.samza.util.ScalaJavaUtil.JavaOptionals.toRichOptional
import org.apache.samza.util.{Logging, ReflectionUtil, ScalaJavaUtil}

import java.util
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import java.util.function.BiConsumer
import java.util.function.Function
import scala.collection.JavaConversions._
Expand Down Expand Up @@ -133,8 +133,13 @@ class TaskInstance(
val checkpointWriteVersions = new TaskConfig(config).getCheckpointWriteVersions

@volatile var lastCommitStartTimeMs = System.currentTimeMillis()
val commitExceptionCounter = new AtomicInteger(0)
val commitTimeoutCounter = new AtomicInteger(0)
val commitMaxDelayMs = taskConfig.getCommitMaxDelayMs
val commitTimeoutMs = taskConfig.getCommitTimeoutMs
val skipCommitDuringFailureEnabled = taskConfig.getSkipCommitDuringFailuresEnabled
val skipCommitExceptionMaxLimit = taskConfig.getSkipCommitExceptionMaxLimit
val skipCommitTimeoutMaxLimit = taskConfig.getSkipCommitTimeoutMaxLimit
val commitInProgress = new Semaphore(1)
val commitException = new AtomicReference[Exception]()

Expand Down Expand Up @@ -312,10 +317,22 @@ class TaskInstance(

val commitStartNs = System.nanoTime()
// first check if there were any unrecoverable errors during the async stage of the pending commit
// and if so, shut down the container.
// If there is unrecoverable error, increment the metric and the counter.
// Shutdown the container in the following scenarios:
// 1. skipCommitDuringFailureEnabled is not enabled
// 2. skipCommitDuringFailureEnabled is enabled but the number of exceptions exceeded the max count
// Otherwise, ignore the exception.
if (commitException.get() != null) {
throw new SamzaException("Unrecoverable error during pending commit for taskName: %s." format taskName,
commitException.get())
metrics.commitExceptions.inc()
commitExceptionCounter.incrementAndGet()
if (!skipCommitDuringFailureEnabled || commitExceptionCounter.get() > skipCommitExceptionMaxLimit) {
throw new SamzaException("Unrecoverable error during pending commit for taskName: %s. Exception Counter: %s"
format (taskName, commitExceptionCounter.get()), commitException.get())
} else {
warn("Ignored the commit failure for taskName %s. Exception Counter: %s."
format (taskName, commitExceptionCounter.get()), commitException.get())
commitException.set(null)
}
}

// if no commit is in progress for this task, continue with this commit.
Expand All @@ -328,21 +345,36 @@ class TaskInstance(
if (timeSinceLastCommit < commitMaxDelayMs) {
info("Skipping commit for taskName: %s since another commit is in progress. " +
"%s ms have elapsed since the pending commit started." format (taskName, timeSinceLastCommit))
metrics.commitsSkipped.set(metrics.commitsSkipped.getValue + 1)
metrics.commitsSkipped.inc()
return
} else {
warn("Blocking processing for taskName: %s until in-flight commit is complete. " +
"%s ms have elapsed since the pending commit started, " +
"which is greater than the max allowed commit delay: %s."
format (taskName, timeSinceLastCommit, commitMaxDelayMs))

// Wait for the previous commit to complete within the timeout.
// If it doesn't complete within the timeout, increment metric and the counter.
// Shutdown the container in the following scenarios:
// 1. skipCommitDuringFailureEnabled is not enabled
// 2. skipCommitDuringFailureEnabled is enabled but the number of timeouts exceeded the max count
// Otherwise, ignore the timeout.
if (!commitInProgress.tryAcquire(commitTimeoutMs, TimeUnit.MILLISECONDS)) {
val timeSinceLastCommit = System.currentTimeMillis() - lastCommitStartTimeMs
metrics.commitsTimedOut.set(metrics.commitsTimedOut.getValue + 1)
throw new SamzaException("Timeout waiting for pending commit for taskName: %s to finish. " +
"%s ms have elapsed since the pending commit started. Max allowed commit delay is %s ms " +
"and commit timeout beyond that is %s ms" format (taskName, timeSinceLastCommit,
commitMaxDelayMs, commitTimeoutMs))
metrics.commitsTimedOut.inc()
commitTimeoutCounter.incrementAndGet()
if (!skipCommitDuringFailureEnabled || commitTimeoutCounter.get() > skipCommitTimeoutMaxLimit) {
throw new SamzaException("Timeout waiting for pending commit for taskName: %s to finish. " +
"%s ms have elapsed since the pending commit started. Max allowed commit delay is %s ms " +
"and commit timeout beyond that is %s ms. Timeout Counter: %s" format (taskName, timeSinceLastCommit,
commitMaxDelayMs, commitTimeoutMs, commitTimeoutCounter.get()))
} else {
warn("Ignoring commit timeout for taskName: %s. %s ms have elapsed since another commit started. " +
"Max allowed commit delay is %s ms and commit timeout beyond that is %s ms. Timeout Counter: %s."
format (taskName, timeSinceLastCommit, commitMaxDelayMs, commitTimeoutMs, commitTimeoutCounter.get()))
commitInProgress.release()
return
}
}
}
}
Expand Down Expand Up @@ -426,7 +458,7 @@ class TaskInstance(
}
})

metrics.lastCommitNs.set(System.nanoTime() - commitStartNs)
metrics.lastCommitNs.set(System.nanoTime())
metrics.commitSyncNs.update(System.nanoTime() - commitStartNs)
debug("Finishing sync stage of commit for taskName: %s checkpointId: %s" format (taskName, checkpointId))
}
Expand Down Expand Up @@ -531,8 +563,11 @@ class TaskInstance(
"Saved exception under Caused By.", commitException.get())
}
} else {
commitExceptionCounter.set(0)
commitTimeoutCounter.set(0)
metrics.commitAsyncNs.update(System.nanoTime() - asyncStageStartNs)
metrics.commitNs.update(System.nanoTime() - commitStartNs)
metrics.lastCommitAsyncTimestamp.set(System.nanoTime())
}
} finally {
// release the permit indicating that previous commit is complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ class TaskInstanceMetrics(
val pendingMessages = newGauge("pending-messages", 0)
val messagesInFlight = newGauge("messages-in-flight", 0)
val asyncCallbackCompleted = newCounter("async-callback-complete-calls")
val commitsTimedOut = newGauge("commits-timed-out", 0)
val commitsSkipped = newGauge("commits-skipped", 0)
val commitsTimedOut = newCounter("commits-timed-out")
val commitsSkipped = newCounter("commits-skipped")
val commitExceptions = newCounter("commit-exceptions")
val commitNs = newTimer("commit-ns")
val lastCommitNs = newGauge("last-commit-ns", 0L)
val lastCommitAsyncTimestamp = newGauge("last-async-commit-timestamp", 0L)
val commitSyncNs = newTimer("commit-sync-ns")
val commitAsyncNs = newTimer("commit-async-ns")
val snapshotNs = newTimer("snapshot-ns")
Expand Down
Loading

0 comments on commit 63c86b5

Please sign in to comment.