Skip to content

Commit

Permalink
SAMZA-1489: TaskInstance should commit offset before it closes() if a…
Browse files Browse the repository at this point in the history
…uto commit is enabled

Author: Dong Lin <[email protected]>

Reviewers: Jagadish<[email protected]>

Closes #417 from lindong28/SAMZA-1489
  • Loading branch information
lindong28 authored and jagadish-v0 committed Feb 1, 2018
1 parent 03e5026 commit c51693b
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.runtime.AbstractFunction1;

import java.util.concurrent.ExecutorService;

import static org.apache.samza.util.Util.asScalaClock;
import static org.apache.samza.util.ScalaToJavaUtils.defaultValue;


/**
* Factory class to create runloop for a Samza task, based on the type
Expand All @@ -41,10 +40,6 @@
public class RunLoopFactory {
private static final Logger log = LoggerFactory.getLogger(RunLoopFactory.class);

private static final long DEFAULT_WINDOW_MS = -1L;
private static final long DEFAULT_COMMIT_MS = 60000L;
private static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1L;

public static Runnable createRunLoop(scala.collection.immutable.Map<TaskName, TaskInstance> taskInstances,
SystemConsumers consumerMultiplexer,
ExecutorService threadPool,
Expand All @@ -53,11 +48,11 @@ public static Runnable createRunLoop(scala.collection.immutable.Map<TaskName, Ta
TaskConfig config,
HighResolutionClock clock) {

long taskWindowMs = config.getWindowMs().getOrElse(defaultValue(DEFAULT_WINDOW_MS));
long taskWindowMs = config.getWindowMs();

log.info("Got window milliseconds: {}.", taskWindowMs);

long taskCommitMs = config.getCommitMs().getOrElse(defaultValue(DEFAULT_COMMIT_MS));
long taskCommitMs = config.getCommitMs();

log.info("Got commit milliseconds: {}.", taskCommitMs);

Expand Down Expand Up @@ -85,15 +80,15 @@ public Boolean apply(TaskInstance t) {
taskCommitMs,
asScalaClock(() -> System.nanoTime()));
} else {
Integer taskMaxConcurrency = config.getMaxConcurrency().getOrElse(defaultValue(1));
Integer taskMaxConcurrency = config.getMaxConcurrency();

log.info("Got taskMaxConcurrency: {}.", taskMaxConcurrency);

boolean isAsyncCommitEnabled = config.getAsyncCommit().getOrElse(defaultValue(false));
boolean isAsyncCommitEnabled = config.getAsyncCommit();

log.info("Got asyncCommitEnabled: {}.", isAsyncCommitEnabled);

Long callbackTimeout = config.getCallbackTimeoutMs().getOrElse(defaultValue(DEFAULT_CALLBACK_TIMEOUT_MS));
Long callbackTimeout = config.getCallbackTimeoutMs();

log.info("Got callbackTimeout: {}.", callbackTimeout);

Expand Down
40 changes: 23 additions & 17 deletions samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.samza.config

import org.apache.samza.container.RunLoopFactory
import org.apache.samza.system.SystemStream
import org.apache.samza.util.{Logging, Util}

Expand All @@ -43,6 +44,11 @@ object TaskConfig {
val CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms" // timeout period for triggering a callback
val ASYNC_COMMIT = "task.async.commit" // to enable async commit in a AsyncStreamTask

val DEFAULT_WINDOW_MS: Long = -1L
val DEFAULT_COMMIT_MS = 60000L
val DEFAULT_CALLBACK_TIMEOUT_MS: Long = -1L
val DEFAULT_MAX_CONCURRENCY: Int = 1

/**
* Samza's container polls for more messages under two conditions. The first
* condition arises when there are simply no remaining buffered messages to
Expand Down Expand Up @@ -75,14 +81,14 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
case _ => Set[SystemStream]()
}

def getWindowMs: Option[Long] = getOption(TaskConfig.WINDOW_MS) match {
case Some(ms) => Some(ms.toLong)
case _ => None
def getWindowMs: Long = getOption(TaskConfig.WINDOW_MS) match {
case Some(ms) => ms.toLong
case _ => TaskConfig.DEFAULT_WINDOW_MS
}

def getCommitMs: Option[Long] = getOption(TaskConfig.COMMIT_MS) match {
case Some(ms) => Some(ms.toLong)
case _ => None
def getCommitMs: Long = getOption(TaskConfig.COMMIT_MS) match {
case Some(ms) => ms.toLong
case _ => TaskConfig.DEFAULT_COMMIT_MS
}

def getShutdownMs: Option[Long] = getOption(TaskConfig.SHUTDOWN_MS) match {
Expand Down Expand Up @@ -123,23 +129,23 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
}
}

def getMaxConcurrency: Option[Int] = getOption(TaskConfig.MAX_CONCURRENCY) match {
case Some(count) => Some(count.toInt)
case _ => None
def getMaxConcurrency: Int = getOption(TaskConfig.MAX_CONCURRENCY) match {
case Some(count) => count.toInt
case _ => TaskConfig.DEFAULT_MAX_CONCURRENCY
}

def getCallbackTimeoutMs: Option[Long] = getOption(TaskConfig.CALLBACK_TIMEOUT_MS) match {
case Some(ms) => Some(ms.toLong)
case _ => None
def getCallbackTimeoutMs: Long = getOption(TaskConfig.CALLBACK_TIMEOUT_MS) match {
case Some(ms) => ms.toLong
case _ => TaskConfig.DEFAULT_CALLBACK_TIMEOUT_MS
}

def getAsyncCommit: Option[Boolean] = getOption(TaskConfig.ASYNC_COMMIT) match {
case Some(asyncCommit) => Some(asyncCommit.toBoolean)
case _ => None
def getAsyncCommit: Boolean = getOption(TaskConfig.ASYNC_COMMIT) match {
case Some(asyncCommit) => asyncCommit.toBoolean
case _ => false
}

def isAutoCommitEnabled() = getOption(TaskConfig.COMMIT_MS) match {
def isAutoCommitEnabled: Boolean = getOption(TaskConfig.COMMIT_MS) match {
case Some(commitMs) => commitMs.toInt > 0
case _ => true
case _ => TaskConfig.DEFAULT_COMMIT_MS > 0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ class SamzaContainer(
val shutdownMs = containerContext.config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS)
var shutdownHookThread: Thread = null
var jmxServer: JmxServer = null
val isAutoCommitEnabled = containerContext.config.isAutoCommitEnabled

@volatile private var status = SamzaContainerStatus.NOT_STARTED
private var exceptionSeen: Throwable = null
Expand Down Expand Up @@ -991,6 +992,11 @@ class SamzaContainer(
}
}

if (isAutoCommitEnabled) {
info("Committing offsets for all task instances")
taskInstances.values.foreach(_.commit)
}

taskInstances.values.foreach(_.shutdownTask)
}

Expand Down

0 comments on commit c51693b

Please sign in to comment.