diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java index b3719cc666dd..702be70f411c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillBag.java @@ -193,7 +193,7 @@ public Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKeyA } // We now know the complete bag contents, and any read on it will yield a // cached value, so cache it for future reads. - cache.put(namespace, address, this, encodedSize); + cache.put(namespace, address, this, encodedSize + stateKey.size()); } // Don't reuse the localAdditions object; we don't want future changes to it to diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillValue.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillValue.java index 923d166c823a..bc3e0906f990 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillValue.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillValue.java @@ -124,7 +124,7 @@ protected Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForK coder.encode(value, stream, Coder.Context.OUTER); } encoded = stream.toByteString(); - cachedSize = encoded.size(); + cachedSize = (long) encoded.size() + stateKey.size(); } // Place in cache to avoid a future read. diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java index 9d939c759d27..e8b2290c3c79 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java @@ -175,13 +175,14 @@ public Future persist( throw new IllegalStateException("Unreachable condition"); } + final int estimatedByteSize = ENCODED_SIZE + stateKey.size(); return Futures.lazyTransform( result, result1 -> { cleared = false; localAdditions = null; if (cachedValue != null) { - cache.put(namespace, address, WindmillWatermarkHold.this, ENCODED_SIZE); + cache.put(namespace, address, WindmillWatermarkHold.this, estimatedByteSize); } return result1; }); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java index d55a20e5517c..d53b1d8c3e89 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java @@ -3043,7 +3043,7 @@ public void testCachedValue() throws Exception { value.write("Hi"); underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(132, cache.getWeight()); + assertEquals(141, cache.getWeight()); resetUnderTest(); value = underTest.state(NAMESPACE, addr); @@ -3051,7 +3051,7 @@ public void testCachedValue() throws Exception { value.clear(); underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(130, cache.getWeight()); + assertEquals(139, cache.getWeight()); resetUnderTest(); value = underTest.state(NAMESPACE, addr); @@ -3083,7 +3083,7 @@ public void testCachedBag() throws Exception { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(140, cache.getWeight()); + assertEquals(147, cache.getWeight()); resetUnderTest(); bag = underTest.state(NAMESPACE, addr); @@ -3103,7 +3103,7 @@ public void testCachedBag() throws Exception { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(133, cache.getWeight()); + assertEquals(140, cache.getWeight()); resetUnderTest(); bag = underTest.state(NAMESPACE, addr); @@ -3114,7 +3114,7 @@ public void testCachedBag() throws Exception { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(134, cache.getWeight()); + assertEquals(141, cache.getWeight()); resetUnderTest(); bag = underTest.state(NAMESPACE, addr); @@ -3145,7 +3145,7 @@ public void testCachedWatermarkHold() throws Exception { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(138, cache.getWeight()); + assertEquals(151, cache.getWeight()); resetUnderTest(); hold = underTest.state(NAMESPACE, addr); @@ -3154,7 +3154,7 @@ public void testCachedWatermarkHold() throws Exception { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(138, cache.getWeight()); + assertEquals(151, cache.getWeight()); resetUnderTest(); hold = underTest.state(NAMESPACE, addr); @@ -3185,7 +3185,7 @@ public void testCachedCombining() throws Exception { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(131, cache.getWeight()); + assertEquals(144, cache.getWeight()); resetUnderTest(); value = underTest.state(NAMESPACE, COMBINING_ADDR); @@ -3196,7 +3196,7 @@ public void testCachedCombining() throws Exception { underTest.persist(Windmill.WorkItemCommitRequest.newBuilder()); - assertEquals(130, cache.getWeight()); + assertEquals(143, cache.getWeight()); resetUnderTest(); value = underTest.state(NAMESPACE, COMBINING_ADDR);