diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java index 8b027605d2..0f168be18d 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java @@ -151,6 +151,9 @@ public class TaskConfig extends MapConfig { // so that the watermarks will still be generated from other active tasks. public static final String WATERMARK_IDLE_TIMEOUT_MS = "task.watermark.idle.timeout.ms"; public static final long DEFAULT_TASK_WATERMARK_IDLE_TIMEOUT_MS = -1L; + // The quorum size required to generate watermarks when there are idle tasks. + public static final String WATERMARK_QUORUM_SIZE_PERCENTAGE = "task.watermark.quorum.size,percentage"; + public static final double DEFAULT_WATERMARK_QUORUM_SIZE_PERCENTAGE = 0.5; public TaskConfig(Config config) { super(config); @@ -411,4 +414,8 @@ public boolean getTransactionalStateRetainExistingState() { public long getWatermarkIdleTimeoutMs() { return getLong(WATERMARK_IDLE_TIMEOUT_MS, DEFAULT_TASK_WATERMARK_IDLE_TIMEOUT_MS); } + + public double getWatermarkQuorumSizePercentage() { + return getDouble(WATERMARK_QUORUM_SIZE_PERCENTAGE, DEFAULT_WATERMARK_QUORUM_SIZE_PERCENTAGE); + } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index 93fa0be657..761fd30465 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -123,7 +123,8 @@ public OperatorImplGraph(OperatorSpecGraph specGraph, Context context, Clock clo internalTaskContext.registerObject(WatermarkStates.class.getName(), new WatermarkStates(internalTaskContext.getSspsExcludingSideInputs(), producerTaskCounts, context.getContainerContext().getContainerMetricsRegistry(), - taskConfig.getWatermarkIdleTimeoutMs())); + taskConfig.getWatermarkIdleTimeoutMs(), + taskConfig.getWatermarkQuorumSizePercentage())); // set states for drain; don't include side inputs (see SAMZA-2303) internalTaskContext.registerObject(DrainStates.class.getName(), diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java index 18d8b51380..f5eca17033 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java @@ -54,13 +54,19 @@ private final static class WatermarkState { private final Map timestamps = new HashMap<>(); private final Map lastUpdateTime = new HashMap<>(); private final long watermarkIdleTimeout; + private final int quorumSize; private final long createTime; private final LongSupplier systemTimeFunc; private volatile long watermarkTime = WATERMARK_NOT_EXIST; - WatermarkState(int expectedTotal, long watermarkIdleTimeout, LongSupplier systemTimeFunc) { + WatermarkState( + int expectedTotal, + long watermarkIdleTimeout, + double watermarkQuorumSizePercentage, + LongSupplier systemTimeFunc) { this.expectedTotal = expectedTotal; this.watermarkIdleTimeout = watermarkIdleTimeout; + this.quorumSize = (int) (expectedTotal * watermarkQuorumSizePercentage); this.systemTimeFunc = systemTimeFunc; this.createTime = systemTimeFunc.getAsLong(); } @@ -90,13 +96,17 @@ synchronized void update(long timestamp, String taskName) { // Exclude the tasks that have been idle in watermark emission. long min = Long.MAX_VALUE; long watermarkIdleThreshold = currentTime - watermarkIdleTimeout; + int updateCount = 0; for (Map.Entry entry : timestamps.entrySet()) { // Check the update happens before the idle timeout if (lastUpdateTime.get(entry.getKey()) > watermarkIdleThreshold) { min = Math.min(min, entry.getValue()); + updateCount++; } } - minWatermark = min == Long.MAX_VALUE ? WATERMARK_NOT_EXIST : min; + + // Active tasks must exceed the quorum size + minWatermark = (updateCount >= quorumSize && min != Long.MAX_VALUE) ? min : WATERMARK_NOT_EXIST; } watermarkTime = Math.max(watermarkTime, minWatermark); } @@ -124,8 +134,10 @@ long getWatermarkTime() { Set ssps, Map producerTaskCounts, MetricsRegistry metricsRegistry, - long watermarkIdleTimeout) { - this(ssps, producerTaskCounts, metricsRegistry, watermarkIdleTimeout, System::currentTimeMillis); + long watermarkIdleTimeout, + double watermarkQuorumSizePercentage) { + this(ssps, producerTaskCounts, metricsRegistry, watermarkIdleTimeout, + watermarkQuorumSizePercentage, System::currentTimeMillis); } //Internal: test-only @@ -134,13 +146,15 @@ long getWatermarkTime() { Map producerTaskCounts, MetricsRegistry metricsRegistry, long watermarkIdleTimeout, + double watermarkQuorumSizePercentage, LongSupplier systemTimeFunc) { final Map states = new HashMap<>(); final List intSsps = new ArrayList<>(); ssps.forEach(ssp -> { final int producerCount = producerTaskCounts.getOrDefault(ssp.getSystemStream(), 0); - states.put(ssp, new WatermarkState(producerCount, watermarkIdleTimeout, systemTimeFunc)); + states.put(ssp, + new WatermarkState(producerCount, watermarkIdleTimeout, watermarkQuorumSizePercentage, systemTimeFunc)); if (producerCount != 0) { intSsps.add(ssp); } diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java index 68b47b92dd..abc4a52df3 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java @@ -71,7 +71,8 @@ public void setup() { public void testUpdate() { // advance watermark on input to 5 WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(), - TaskConfig.DEFAULT_TASK_WATERMARK_IDLE_TIMEOUT_MS); + TaskConfig.DEFAULT_TASK_WATERMARK_IDLE_TIMEOUT_MS, + TaskConfig.DEFAULT_WATERMARK_QUORUM_SIZE_PERCENTAGE); IncomingMessageEnvelope envelope = IncomingMessageEnvelope.buildWatermarkEnvelope(inputPartition0, 5L); watermarkStates.update((WatermarkMessage) envelope.getMessage(), envelope.getSystemStreamPartition()); @@ -122,7 +123,7 @@ public void testUpdate() { public void testIdle() { MockSystemTime systemTime = new MockSystemTime(); WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(), - TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS, systemTime); + TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS, TaskConfig.DEFAULT_WATERMARK_QUORUM_SIZE_PERCENTAGE, systemTime); // First watermark WatermarkMessage watermarkMessage = new WatermarkMessage(1L, "task 0"); @@ -183,6 +184,37 @@ public void testIdle() { assertEquals(watermarkStates.getWatermark(intermediate), 6L); } + @Test + public void testQuorum() { + MockSystemTime systemTime = new MockSystemTime(); + WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(), + TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS, 1.0, systemTime); + + // First watermark + WatermarkMessage watermarkMessage = new WatermarkMessage(1L, "task 0"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST); + assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); + + // Advance currentTime to pass the idle timeout + systemTime.advance(TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS); + + // Watermark is computed based on "task 1" alone since "task 0" passes the idle timeout + // Not meeting quorum + watermarkMessage = new WatermarkMessage(5L, "task 1"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST); + assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); + + systemTime.advance(1); + + // Watermark from task 0, now quorum is met. + watermarkMessage = new WatermarkMessage(3L, "task 0"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 3L); + assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); + } + static class MockSystemTime implements LongSupplier { long currentTime = System.currentTimeMillis();