diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowsAndCombineDoFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowsAndCombineDoFn.java new file mode 100644 index 0000000000..d3eb50b226 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowsAndCombineDoFn.java @@ -0,0 +1,114 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.util; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.common.collect.Maps; + +import org.joda.time.Instant; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; +import java.util.PriorityQueue; + +/** + * {@link GroupAlsoByWindowsDoFn} that uses combiner to accumulate input elements for non-merging + * window functions with the default triggering strategy. + * + * @param key type + * @param value input type + * @param accumulator type + * @param value output type + * @param window type + */ +@SuppressWarnings("serial") +public class GroupAlsoByWindowsAndCombineDoFn + extends GroupAlsoByWindowsDoFn { + + private final KeyedCombineFn combineFn; + + public GroupAlsoByWindowsAndCombineDoFn( + KeyedCombineFn combineFn) { + this.combineFn = combineFn; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + K key = c.element().getKey(); + Iterator> iterator = c.element().getValue().iterator(); + + PriorityQueue liveWindows = + new PriorityQueue<>(11, new Comparator() { + @Override + public int compare(BoundedWindow w1, BoundedWindow w2) { + return Long.signum(w1.maxTimestamp().getMillis() - w2.maxTimestamp().getMillis()); + } + }); + + Map accumulators = Maps.newHashMap(); + Map minTimestamps = Maps.newHashMap(); + + while (iterator.hasNext()) { + WindowedValue e = iterator.next(); + + @SuppressWarnings("unchecked") + Collection windows = (Collection) e.getWindows(); + for (W w : windows) { + Instant timestamp = minTimestamps.get(w); + if (timestamp == null || timestamp.compareTo(e.getTimestamp()) > 0) { + minTimestamps.put(w, e.getTimestamp()); + } else { + minTimestamps.put(w, timestamp); + } + + AccumT accum = accumulators.get(w); + checkState((timestamp == null && accum == null) || (timestamp != null && accum != null)); + if (accum == null) { + accum = combineFn.createAccumulator(key); + liveWindows.add(w); + } + accum = combineFn.addInput(key, accum, e.getValue()); + accumulators.put(w, accum); + } + + while (!liveWindows.isEmpty() + && liveWindows.peek().maxTimestamp().isBefore(e.getTimestamp())) { + closeWindow(key, liveWindows.poll(), accumulators, minTimestamps, c); + } + } + + while (!liveWindows.isEmpty()) { + closeWindow(key, liveWindows.poll(), accumulators, minTimestamps, c); + } + } + + private void closeWindow( + K key, W w, Map accumulators, Map minTimestamps, ProcessContext c) { + AccumT accum = accumulators.remove(w); + Instant timestamp = minTimestamps.remove(w); + checkState(accum != null && timestamp != null); + c.windowingInternals().outputWindowedValue( + KV.of(key, combineFn.extractOutput(key, accum)), timestamp, Arrays.asList(w)); + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowsDoFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowsDoFn.java index 2fd5a5afe4..1fdae8aa71 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowsDoFn.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowsDoFn.java @@ -70,6 +70,11 @@ public abstract class GroupAlsoByWindowsDoFn keyCoder, final Coder inputCoder) { Preconditions.checkNotNull(combineFn); + if (windowingStrategy.getWindowFn().isNonMerging() + && windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger + && windowingStrategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) { + return new GroupAlsoByWindowsAndCombineDoFn<>(combineFn); + } return new GABWViaOutputBufferDoFn<>(windowingStrategy, CombiningOutputBuffer.create( combineFn, keyCoder, inputCoder)); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowsDoFnTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowsDoFnTest.java index 0fc8a2dddd..4d7449db11 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowsDoFnTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowsDoFnTest.java @@ -166,6 +166,48 @@ public class GroupAlsoByWindowsDoFnTest { Matchers.contains(window(10, 30))); } + @Test public void testSlidingWindowsCombine() throws Exception { + CombineFn combineFn = new Sum.SumLongFn(); + DoFnRunner>>, KV, List> runner = + makeRunner( + WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))), + combineFn.asKeyedFn()); + + runner.startBundle(); + + runner.processElement(WindowedValue.valueInEmptyWindows( + KV.of("k", (Iterable>) Arrays.asList( + WindowedValue.of( + 1L, + new Instant(5), + Arrays.asList(window(-10, 10), window(0, 20))), + WindowedValue.of( + 2L, + new Instant(15), + Arrays.asList(window(0, 20), window(10, 30))), + WindowedValue.of( + 4L, + new Instant(18), + Arrays.asList(window(0, 20), window(10, 30))))))); + + runner.finishBundle(); + + List>> result = runner.getReceiver(outputTag); + + assertEquals(3, result.size()); + + assertThat(result, Matchers.contains( + WindowMatchers.isSingleWindowedValue( + KvMatcher.isKv(Matchers.equalTo("k"), Matchers.equalTo(1L)), + 5, -10, 10), + WindowMatchers.isSingleWindowedValue( + KvMatcher.isKv(Matchers.equalTo("k"), Matchers.equalTo(7L)), + 5, 0, 20), + WindowMatchers.isSingleWindowedValue( + KvMatcher.isKv(Matchers.equalTo("k"), Matchers.equalTo(6L)), + 15, 10, 30))); + } + @Test public void testDiscontiguousWindows() throws Exception { DoFnRunner>>, KV>, List> runner =