Skip to content

Commit

Permalink
Include byte size of stateKey in estimated weight of WindmillBag, Win…
Browse files Browse the repository at this point in the history
…dmillValue, and WindmillWatermarkHold (apache#30654)

* Update WindmillBag.java

Include byte size of the stateKey on the BagState weight used to estimate and limit the total state cache size

* Update WindmillValue.java

Include stateKey size in the byte size of a WidnmillValue

* Update WindmillWatermarkHold.java

Include keyState size in the WatermarkHold estimated byte size

* Fix formatting issue

* Fix expected cache item weights in WindmillStateInternalsTest
  • Loading branch information
dmitryor authored Mar 18, 2024
1 parent 63aff7e commit c3b3fa6
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKeyA
}
// We now know the complete bag contents, and any read on it will yield a
// cached value, so cache it for future reads.
cache.put(namespace, address, this, encodedSize);
cache.put(namespace, address, this, encodedSize + stateKey.size());
}

// Don't reuse the localAdditions object; we don't want future changes to it to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ protected Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForK
coder.encode(value, stream, Coder.Context.OUTER);
}
encoded = stream.toByteString();
cachedSize = encoded.size();
cachedSize = (long) encoded.size() + stateKey.size();
}

// Place in cache to avoid a future read.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,14 @@ public Future<Windmill.WorkItemCommitRequest> persist(
throw new IllegalStateException("Unreachable condition");
}

final int estimatedByteSize = ENCODED_SIZE + stateKey.size();
return Futures.lazyTransform(
result,
result1 -> {
cleared = false;
localAdditions = null;
if (cachedValue != null) {
cache.put(namespace, address, WindmillWatermarkHold.this, ENCODED_SIZE);
cache.put(namespace, address, WindmillWatermarkHold.this, estimatedByteSize);
}
return result1;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3043,15 +3043,15 @@ public void testCachedValue() throws Exception {
value.write("Hi");
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());

assertEquals(132, cache.getWeight());
assertEquals(141, cache.getWeight());

resetUnderTest();
value = underTest.state(NAMESPACE, addr);
assertEquals("Hi", value.read());
value.clear();
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());

assertEquals(130, cache.getWeight());
assertEquals(139, cache.getWeight());

resetUnderTest();
value = underTest.state(NAMESPACE, addr);
Expand Down Expand Up @@ -3083,7 +3083,7 @@ public void testCachedBag() throws Exception {

underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());

assertEquals(140, cache.getWeight());
assertEquals(147, cache.getWeight());

resetUnderTest();
bag = underTest.state(NAMESPACE, addr);
Expand All @@ -3103,7 +3103,7 @@ public void testCachedBag() throws Exception {

underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());

assertEquals(133, cache.getWeight());
assertEquals(140, cache.getWeight());

resetUnderTest();
bag = underTest.state(NAMESPACE, addr);
Expand All @@ -3114,7 +3114,7 @@ public void testCachedBag() throws Exception {

underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());

assertEquals(134, cache.getWeight());
assertEquals(141, cache.getWeight());

resetUnderTest();
bag = underTest.state(NAMESPACE, addr);
Expand Down Expand Up @@ -3145,7 +3145,7 @@ public void testCachedWatermarkHold() throws Exception {

underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());

assertEquals(138, cache.getWeight());
assertEquals(151, cache.getWeight());

resetUnderTest();
hold = underTest.state(NAMESPACE, addr);
Expand All @@ -3154,7 +3154,7 @@ public void testCachedWatermarkHold() throws Exception {

underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());

assertEquals(138, cache.getWeight());
assertEquals(151, cache.getWeight());

resetUnderTest();
hold = underTest.state(NAMESPACE, addr);
Expand Down Expand Up @@ -3185,7 +3185,7 @@ public void testCachedCombining() throws Exception {

underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());

assertEquals(131, cache.getWeight());
assertEquals(144, cache.getWeight());

resetUnderTest();
value = underTest.state(NAMESPACE, COMBINING_ADDR);
Expand All @@ -3196,7 +3196,7 @@ public void testCachedCombining() throws Exception {

underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());

assertEquals(130, cache.getWeight());
assertEquals(143, cache.getWeight());

resetUnderTest();
value = underTest.state(NAMESPACE, COMBINING_ADDR);
Expand Down

0 comments on commit c3b3fa6

Please sign in to comment.