Skip to content

Commit

Permalink
[#32222] Actually maintain the heap invariant for timers. (#33270)
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck authored Dec 4, 2024
1 parent 122368f commit 2e43e29
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
9 changes: 1 addition & 8 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,7 @@ def sickbayTests = [
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testTwoRequiresTimeSortedInputWithLateData',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateData',

// Timer race condition/ordering issue in Prism.
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testTwoTimersSettingEachOtherWithCreateAsInputUnbounded',

// Missing output due to timer skew.
// Missing output due to processing time timer skew.
'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew',

// TestStream + BundleFinalization.
Expand Down Expand Up @@ -241,10 +238,6 @@ def createPrismValidatesRunnerTask = { name, environmentType ->
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
}
filter {
// Hangs forever with prism. Put here instead of sickbay to allow sickbay runs to terminate.
// https://github.com/apache/beam/issues/32222
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerOrderingWithCreate'

for (String test : sickbayTests) {
excludeTestsMatching test
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,7 @@ func (ss *stageState) AddPending(newPending []element) int {
}
ss.pendingByKeys[string(e.keyBytes)] = dnt
}
dnt.elements.Push(e)
heap.Push(&dnt.elements, e)

if e.IsTimer() {
if lastSet, ok := dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}]; ok {
Expand Down Expand Up @@ -1576,6 +1576,8 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
// They'll never be read in again.
for _, wins := range ss.sideInputs {
for win := range wins {
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
// Clear out anything we've already used.
if win.MaxTimestamp() < newOut {
delete(wins, win)
Expand All @@ -1584,7 +1586,8 @@ func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
}
for _, wins := range ss.state {
for win := range wins {
// Clear out anything we've already used.
// TODO(#https://github.com/apache/beam/issues/31438):
// Adjust with AllowedLateness
if win.MaxTimestamp() < newOut {
delete(wins, win)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,8 @@ def process(
state.clear()
yield buffer
else:
timer.set(ts + 1)
# Set the timer to fire within it's window.
timer.set(ts + (1 - timestamp.Duration(micros=1000)))

@userstate.on_timer(timer_spec)
def process_timer(self, state=beam.DoFn.StateParam(state_spec)):
Expand All @@ -790,8 +791,10 @@ def is_buffered_correctly(actual):
# Acutal should be a grouping of the inputs into batches of size
# at most buffer_size, but the actual batching is nondeterministic
# based on ordering and trigger firing timing.
self.assertEqual(sorted(sum((list(b) for b in actual), [])), elements)
self.assertEqual(max(len(list(buffer)) for buffer in actual), buffer_size)
self.assertEqual(
sorted(sum((list(b) for b in actual), [])), elements, actual)
self.assertEqual(
max(len(list(buffer)) for buffer in actual), buffer_size, actual)
if windowed:
# Elements were assigned to windows based on their parity.
# Assert that each grouping consists of elements belonging to the
Expand Down

0 comments on commit 2e43e29

Please sign in to comment.