Skip to content

Commit

Permalink
Merge pull request #24215: Fix OrderedListState for Dataflow Streamin…
Browse files Browse the repository at this point in the history
…g pipelines on SE.
  • Loading branch information
reuvenlax authored Nov 20, 2022
2 parents 36b7985 + 10a9dc2 commit a04682a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,11 @@ static final class IdTracker {
static final String IDS_AVAILABLE_STR = "IdsAvailable";
static final String DELETIONS_STR = "Deletions";

static final long MIN_ID = Long.MIN_VALUE;
static final long MAX_ID = Long.MAX_VALUE;
// Note that this previously was Long.MIN_VALUE but ids are unsigned when
// sending to windmill for Streaming Engine. For updated appliance
// pipelines with existing state, there may be negative ids.
static final long NEW_RANGE_MIN_ID = 0;
static final long NEW_RANGE_MAX_ID = Long.MAX_VALUE;

// We track ids on five-minute boundaries.
private static final Duration RESOLUTION = Duration.standardMinutes(5);
Expand Down Expand Up @@ -755,7 +758,9 @@ <T> void add(
availableIdsForTsRange =
idsAvailable.computeIfAbsent(
currentTsRange,
r -> TreeRangeSet.create(ImmutableList.of(Range.closedOpen(MIN_ID, MAX_ID))));
r ->
TreeRangeSet.create(
ImmutableList.of(Range.closedOpen(NEW_RANGE_MIN_ID, NEW_RANGE_MAX_ID))));
idRangeIter = availableIdsForTsRange.asRanges().iterator();
currentIdRange = null;
currentTsRangeDeletions = subRangeDeletions.get(currentTsRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ public void testOrderedListAddPersist() throws Exception {

assertEquals("hello", updates.getInserts(0).getEntries(0).getValue().toStringUtf8());
assertEquals(1000, updates.getInserts(0).getEntries(0).getSortKey());
assertEquals(IdTracker.MIN_ID, updates.getInserts(0).getEntries(0).getId());
assertEquals(IdTracker.NEW_RANGE_MIN_ID, updates.getInserts(0).getEntries(0).getId());
}

@Test
Expand Down Expand Up @@ -829,8 +829,8 @@ public void testOrderedListClearPersist() throws Exception {
assertEquals("world", updates.getInserts(0).getEntries(1).getValue().toStringUtf8());
assertEquals(2000, updates.getInserts(0).getEntries(0).getSortKey());
assertEquals(2000, updates.getInserts(0).getEntries(1).getSortKey());
assertEquals(IdTracker.MIN_ID, updates.getInserts(0).getEntries(0).getId());
assertEquals(IdTracker.MIN_ID + 1, updates.getInserts(0).getEntries(1).getId());
assertEquals(IdTracker.NEW_RANGE_MIN_ID, updates.getInserts(0).getEntries(0).getId());
assertEquals(IdTracker.NEW_RANGE_MIN_ID + 1, updates.getInserts(0).getEntries(1).getId());
Mockito.verifyNoMoreInteractions(mockReader);
}

Expand Down Expand Up @@ -877,8 +877,8 @@ public void testOrderedListDeleteRangePersist() {
assertEquals("world", updates.getInserts(0).getEntries(1).getValue().toStringUtf8());
assertEquals(1000, updates.getInserts(0).getEntries(0).getSortKey());
assertEquals(4000, updates.getInserts(0).getEntries(1).getSortKey());
assertEquals(IdTracker.MIN_ID, updates.getInserts(0).getEntries(0).getId());
assertEquals(IdTracker.MIN_ID + 1, updates.getInserts(0).getEntries(1).getId());
assertEquals(IdTracker.NEW_RANGE_MIN_ID, updates.getInserts(0).getEntries(0).getId());
assertEquals(IdTracker.NEW_RANGE_MIN_ID + 1, updates.getInserts(0).getEntries(1).getId());
}

@Test
Expand Down

0 comments on commit a04682a

Please sign in to comment.