Skip to content

Commit

Permalink
Add logic to establish quorum in case of idle tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Xinyu Liu committed Aug 15, 2024
1 parent f36e86d commit 58f27d5
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ public class TaskConfig extends MapConfig {
// so that the watermarks will still be generated from other active tasks.
public static final String WATERMARK_IDLE_TIMEOUT_MS = "task.watermark.idle.timeout.ms";
public static final long DEFAULT_TASK_WATERMARK_IDLE_TIMEOUT_MS = -1L;
// The quorum size required to generate watermarks when there are idle tasks.
public static final String WATERMARK_QUORUM_SIZE_PERCENTAGE = "task.watermark.quorum.size,percentage";
public static final double DEFAULT_WATERMARK_QUORUM_SIZE_PERCENTAGE = 0.5;

public TaskConfig(Config config) {
super(config);
Expand Down Expand Up @@ -411,4 +414,8 @@ public boolean getTransactionalStateRetainExistingState() {
public long getWatermarkIdleTimeoutMs() {
return getLong(WATERMARK_IDLE_TIMEOUT_MS, DEFAULT_TASK_WATERMARK_IDLE_TIMEOUT_MS);
}

public double getWatermarkQuorumSizePercentage() {
return getDouble(WATERMARK_QUORUM_SIZE_PERCENTAGE, DEFAULT_WATERMARK_QUORUM_SIZE_PERCENTAGE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public OperatorImplGraph(OperatorSpecGraph specGraph, Context context, Clock clo
internalTaskContext.registerObject(WatermarkStates.class.getName(),
new WatermarkStates(internalTaskContext.getSspsExcludingSideInputs(), producerTaskCounts,
context.getContainerContext().getContainerMetricsRegistry(),
taskConfig.getWatermarkIdleTimeoutMs()));
taskConfig.getWatermarkIdleTimeoutMs(),
taskConfig.getWatermarkQuorumSizePercentage()));

// set states for drain; don't include side inputs (see SAMZA-2303)
internalTaskContext.registerObject(DrainStates.class.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,19 @@ private final static class WatermarkState {
private final Map<String, Long> timestamps = new HashMap<>();
private final Map<String, Long> lastUpdateTime = new HashMap<>();
private final long watermarkIdleTimeout;
private final int quorumSize;
private final long createTime;
private final LongSupplier systemTimeFunc;
private volatile long watermarkTime = WATERMARK_NOT_EXIST;

WatermarkState(int expectedTotal, long watermarkIdleTimeout, LongSupplier systemTimeFunc) {
WatermarkState(
int expectedTotal,
long watermarkIdleTimeout,
double watermarkQuorumSizePercentage,
LongSupplier systemTimeFunc) {
this.expectedTotal = expectedTotal;
this.watermarkIdleTimeout = watermarkIdleTimeout;
this.quorumSize = (int) (expectedTotal * watermarkQuorumSizePercentage);
this.systemTimeFunc = systemTimeFunc;
this.createTime = systemTimeFunc.getAsLong();
}
Expand Down Expand Up @@ -90,13 +96,17 @@ synchronized void update(long timestamp, String taskName) {
// Exclude the tasks that have been idle in watermark emission.
long min = Long.MAX_VALUE;
long watermarkIdleThreshold = currentTime - watermarkIdleTimeout;
int updateCount = 0;
for (Map.Entry<String, Long> entry : timestamps.entrySet()) {
// Check the update happens before the idle timeout
if (lastUpdateTime.get(entry.getKey()) > watermarkIdleThreshold) {
min = Math.min(min, entry.getValue());
updateCount++;
}
}
minWatermark = min == Long.MAX_VALUE ? WATERMARK_NOT_EXIST : min;

// Active tasks must exceed the quorum size
minWatermark = (updateCount >= quorumSize && min != Long.MAX_VALUE) ? min : WATERMARK_NOT_EXIST;
}
watermarkTime = Math.max(watermarkTime, minWatermark);
}
Expand Down Expand Up @@ -124,8 +134,10 @@ long getWatermarkTime() {
Set<SystemStreamPartition> ssps,
Map<SystemStream, Integer> producerTaskCounts,
MetricsRegistry metricsRegistry,
long watermarkIdleTimeout) {
this(ssps, producerTaskCounts, metricsRegistry, watermarkIdleTimeout, System::currentTimeMillis);
long watermarkIdleTimeout,
double watermarkQuorumSizePercentage) {
this(ssps, producerTaskCounts, metricsRegistry, watermarkIdleTimeout,
watermarkQuorumSizePercentage, System::currentTimeMillis);
}

//Internal: test-only
Expand All @@ -134,13 +146,15 @@ long getWatermarkTime() {
Map<SystemStream, Integer> producerTaskCounts,
MetricsRegistry metricsRegistry,
long watermarkIdleTimeout,
double watermarkQuorumSizePercentage,
LongSupplier systemTimeFunc) {
final Map<SystemStreamPartition, WatermarkState> states = new HashMap<>();
final List<SystemStreamPartition> intSsps = new ArrayList<>();

ssps.forEach(ssp -> {
final int producerCount = producerTaskCounts.getOrDefault(ssp.getSystemStream(), 0);
states.put(ssp, new WatermarkState(producerCount, watermarkIdleTimeout, systemTimeFunc));
states.put(ssp,
new WatermarkState(producerCount, watermarkIdleTimeout, watermarkQuorumSizePercentage, systemTimeFunc));
if (producerCount != 0) {
intSsps.add(ssp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public void setup() {
public void testUpdate() {
// advance watermark on input to 5
WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(),
TaskConfig.DEFAULT_TASK_WATERMARK_IDLE_TIMEOUT_MS);
TaskConfig.DEFAULT_TASK_WATERMARK_IDLE_TIMEOUT_MS,
TaskConfig.DEFAULT_WATERMARK_QUORUM_SIZE_PERCENTAGE);
IncomingMessageEnvelope envelope = IncomingMessageEnvelope.buildWatermarkEnvelope(inputPartition0, 5L);
watermarkStates.update((WatermarkMessage) envelope.getMessage(),
envelope.getSystemStreamPartition());
Expand Down Expand Up @@ -122,7 +123,7 @@ public void testUpdate() {
public void testIdle() {
MockSystemTime systemTime = new MockSystemTime();
WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(),
TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS, systemTime);
TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS, TaskConfig.DEFAULT_WATERMARK_QUORUM_SIZE_PERCENTAGE, systemTime);

// First watermark
WatermarkMessage watermarkMessage = new WatermarkMessage(1L, "task 0");
Expand Down Expand Up @@ -183,6 +184,37 @@ public void testIdle() {
assertEquals(watermarkStates.getWatermark(intermediate), 6L);
}

@Test
public void testQuorum() {
MockSystemTime systemTime = new MockSystemTime();
WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap(),
TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS, 1.0, systemTime);

// First watermark
WatermarkMessage watermarkMessage = new WatermarkMessage(1L, "task 0");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);

// Advance currentTime to pass the idle timeout
systemTime.advance(TEST_TASK_WATERMARK_IDLE_TIMEOUT_MS);

// Watermark is computed based on "task 1" alone since "task 0" passes the idle timeout
// Not meeting quorum
watermarkMessage = new WatermarkMessage(5L, "task 1");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);

systemTime.advance(1);

// Watermark from task 0, now quorum is met.
watermarkMessage = new WatermarkMessage(3L, "task 0");
watermarkStates.update(watermarkMessage, intPartition0);
assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 3L);
assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);
}

static class MockSystemTime implements LongSupplier {
long currentTime = System.currentTimeMillis();

Expand Down

0 comments on commit 58f27d5

Please sign in to comment.