From 3cb57c22784448b991d1ef4e3a2928f1901b0f3a Mon Sep 17 00:00:00 2001 From: Xinyu Liu Date: Wed, 21 Aug 2024 14:28:05 -0700 Subject: [PATCH] SAMZA-2801: fix the case when waterrmark is not aggregated when quorum is not met (#1707) --- .../operators/impl/WatermarkMetrics.java | 8 +++++ .../samza/operators/impl/WatermarkStates.java | 30 +++++++++------- .../operators/impl/TestWatermarkStates.java | 34 ++++++++++++++++++- 3 files changed, 58 insertions(+), 14 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkMetrics.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkMetrics.java index 2104c4efca..ef9d2e75f2 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkMetrics.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkMetrics.java @@ -22,6 +22,7 @@ import org.apache.samza.metrics.Gauge; import org.apache.samza.metrics.MetricsBase; import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import java.util.Map; @@ -29,6 +30,7 @@ class WatermarkMetrics extends MetricsBase { private final Map> aggregates = new ConcurrentHashMap<>(); + private final Map> quorumCounts = new ConcurrentHashMap<>(); WatermarkMetrics(MetricsRegistry registry) { super("watermark-", registry); @@ -40,4 +42,10 @@ void setAggregateTime(SystemStreamPartition systemStreamPartition, long time) { ssp.getStream(), ssp.getPartition().getPartitionId()), 0L)); aggregate.set(time); } + + void setQuorumCount(SystemStream stream, int quorumCount) { + final Gauge gauge = quorumCounts.computeIfAbsent(stream, + ssp -> newGauge(String.format("%s-quorum-count", ssp.getStream()), 0)); + gauge.set(quorumCount); + } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java index 11957711dc..48d1df2b44 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java @@ -89,11 +89,9 @@ synchronized void update(long timestamp, String taskName) { // we get watermark either from the source or from the aggregator task watermarkTime = Math.max(watermarkTime, timestamp); } else if (canUpdateWatermark(currentTime)) { - final long minWatermark; - if (watermarkIdleTimeout <= 0) { - // All upstream tasks are required in the computation - minWatermark = timestamps.values().stream().min(Long::compare).orElse(timestamp); - } else { + long minWatermark = timestamps.values().stream().min(Long::compare).orElse(timestamp); + + if (minWatermark <= watermarkTime && watermarkIdleTimeout > 0) { // Exclude the tasks that have been idle in watermark emission. long min = Long.MAX_VALUE; long watermarkIdleThreshold = currentTime - watermarkIdleTimeout; @@ -108,14 +106,11 @@ synchronized void update(long timestamp, String taskName) { // Active tasks must exceed the quorum size minWatermark = (updateCount >= quorumSize && min != Long.MAX_VALUE) ? min : WATERMARK_NOT_EXIST; - - // Log the current quorum count - if (this.quorumCount != updateCount) { - this.quorumCount = updateCount; - LOG.info("Current quorum count is {} for watermark aggregation, and the expected quorum size is {}", - this.quorumCount, this.quorumSize); - } + quorumCount = updateCount; + } else { + quorumCount = timestamps.size(); } + watermarkTime = Math.max(watermarkTime, minWatermark); } } @@ -126,12 +121,16 @@ private boolean canUpdateWatermark(long currentTime) { // 2. we allow task idle in emitting watermarks and the idle time has passed. return (timestamps.size() == expectedTotal) // Handle the case we didn't receive the watermarks from some tasks since startup - || (watermarkIdleTimeout > 0 && currentTime - createTime > watermarkIdleTimeout); + || (watermarkIdleTimeout > 0 && currentTime - createTime >= watermarkIdleTimeout && timestamps.size() >= quorumSize); } long getWatermarkTime() { return watermarkTime; } + + int getQuorumCount() { + return quorumCount; + } } private final Map watermarkStates; @@ -205,6 +204,11 @@ void updateAggregateMetric(SystemStreamPartition ssp, long time) { // Only report the aggregates watermarks for intermediate streams // to reduce the amount of metrics watermarkMetrics.setAggregateTime(ssp, time); + + final WatermarkState state = watermarkStates.get(ssp); + if (state != null && state.getQuorumCount() != 0) { + watermarkMetrics.setQuorumCount(ssp.getSystemStream(), state.getQuorumCount()); + } } } } diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java index abc4a52df3..1a4e786fb2 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java @@ -134,6 +134,12 @@ public void testIdle() { // Advance currentTime to pass the idle timeout systemTime.advance(TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS); + // Watermark is computed based on both task 0 and task 1 + watermarkMessage = new WatermarkMessage(4L, "task 1"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 1L); + assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); + // Watermark is computed based on "task 1" alone since "task 0" passes the idle timeout watermarkMessage = new WatermarkMessage(5L, "task 1"); watermarkStates.update(watermarkMessage, intPartition0); @@ -196,6 +202,12 @@ public void testQuorum() { assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST); assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); + // Watermark is computed based on both task 0 and task 1 + watermarkMessage = new WatermarkMessage(3L, "task 1"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 1L); + assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); + // Advance currentTime to pass the idle timeout systemTime.advance(TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS); @@ -203,7 +215,7 @@ public void testQuorum() { // Not meeting quorum watermarkMessage = new WatermarkMessage(5L, "task 1"); watermarkStates.update(watermarkMessage, intPartition0); - assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 1L); assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); systemTime.advance(1); @@ -215,6 +227,26 @@ public void testQuorum() { assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST); } + @Test + public void testStartup() { + MockSystemTime systemTime = new MockSystemTime(); + WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(), + TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS, TaskConfig.DEFAULT_WATERMARK_QUORUM_SIZE_PERCENTAGE, systemTime); + + // Only one active task in the startup + WatermarkMessage watermarkMessage = new WatermarkMessage(1L, "task 0"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST); + + // Advance currentTime to pass the idle timeout + systemTime.advance(TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS); + + // Watermark will be soly computed based on task 0 + watermarkMessage = new WatermarkMessage(5L, "task 0"); + watermarkStates.update(watermarkMessage, intPartition0); + assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 5L); + } + static class MockSystemTime implements LongSupplier { long currentTime = System.currentTimeMillis();