Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Commit

Permalink
Implement GroupAlsoByWindowsAndCombineDoFn
Browse files Browse the repository at this point in the history
For non-merging window fn and default triggering,
it combines input elements with one pass.

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=96926562
  • Loading branch information
peihe authored and lukecwik committed Jun 26, 2015
1 parent 5c0be72 commit c4b7682
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.dataflow.sdk.util;

import static com.google.common.base.Preconditions.checkState;

import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.common.collect.Maps;

import org.joda.time.Instant;

import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;

/**
* {@link GroupAlsoByWindowsDoFn} that uses combiner to accumulate input elements for non-merging
* window functions with the default triggering strategy.
*
* @param <K> key type
* @param <InputT> value input type
* @param <AccumT> accumulator type
* @param <OutputT> value output type
* @param <W> window type
*/
@SuppressWarnings("serial")
public class GroupAlsoByWindowsAndCombineDoFn<K, InputT, AccumT, OutputT, W extends BoundedWindow>
extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {

private final KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn;

public GroupAlsoByWindowsAndCombineDoFn(
KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
this.combineFn = combineFn;
}

@Override
public void processElement(ProcessContext c) throws Exception {
K key = c.element().getKey();
Iterator<WindowedValue<InputT>> iterator = c.element().getValue().iterator();

PriorityQueue<W> liveWindows =
new PriorityQueue<>(11, new Comparator<BoundedWindow>() {
@Override
public int compare(BoundedWindow w1, BoundedWindow w2) {
return Long.signum(w1.maxTimestamp().getMillis() - w2.maxTimestamp().getMillis());
}
});

Map<W, AccumT> accumulators = Maps.newHashMap();
Map<W, Instant> minTimestamps = Maps.newHashMap();

while (iterator.hasNext()) {
WindowedValue<InputT> e = iterator.next();

@SuppressWarnings("unchecked")
Collection<W> windows = (Collection<W>) e.getWindows();
for (W w : windows) {
Instant timestamp = minTimestamps.get(w);
if (timestamp == null || timestamp.compareTo(e.getTimestamp()) > 0) {
minTimestamps.put(w, e.getTimestamp());
} else {
minTimestamps.put(w, timestamp);
}

AccumT accum = accumulators.get(w);
checkState((timestamp == null && accum == null) || (timestamp != null && accum != null));
if (accum == null) {
accum = combineFn.createAccumulator(key);
liveWindows.add(w);
}
accum = combineFn.addInput(key, accum, e.getValue());
accumulators.put(w, accum);
}

while (!liveWindows.isEmpty()
&& liveWindows.peek().maxTimestamp().isBefore(e.getTimestamp())) {
closeWindow(key, liveWindows.poll(), accumulators, minTimestamps, c);
}
}

while (!liveWindows.isEmpty()) {
closeWindow(key, liveWindows.poll(), accumulators, minTimestamps, c);
}
}

private void closeWindow(
K key, W w, Map<W, AccumT> accumulators, Map<W, Instant> minTimestamps, ProcessContext c) {
AccumT accum = accumulators.remove(w);
Instant timestamp = minTimestamps.remove(w);
checkState(accum != null && timestamp != null);
c.windowingInternals().outputWindowedValue(
KV.of(key, combineFn.extractOutput(key, accum)), timestamp, Arrays.asList(w));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends Bound
final Coder<K> keyCoder,
final Coder<InputT> inputCoder) {
Preconditions.checkNotNull(combineFn);
if (windowingStrategy.getWindowFn().isNonMerging()
&& windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger
&& windowingStrategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) {
return new GroupAlsoByWindowsAndCombineDoFn<>(combineFn);
}
return new GABWViaOutputBufferDoFn<>(windowingStrategy,
CombiningOutputBuffer.<K, InputT, AccumT, OutputT, W>create(
combineFn, keyCoder, inputCoder));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,48 @@ public class GroupAlsoByWindowsDoFnTest {
Matchers.contains(window(10, 30)));
}

@Test public void testSlidingWindowsCombine() throws Exception {
CombineFn<Long, ?, Long> combineFn = new Sum.SumLongFn();
DoFnRunner<KV<String, Iterable<WindowedValue<Long>>>, KV<String, Long>, List> runner =
makeRunner(
WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))),
combineFn.<String>asKeyedFn());

runner.startBundle();

runner.processElement(WindowedValue.valueInEmptyWindows(
KV.of("k", (Iterable<WindowedValue<Long>>) Arrays.asList(
WindowedValue.of(
1L,
new Instant(5),
Arrays.asList(window(-10, 10), window(0, 20))),
WindowedValue.of(
2L,
new Instant(15),
Arrays.asList(window(0, 20), window(10, 30))),
WindowedValue.of(
4L,
new Instant(18),
Arrays.asList(window(0, 20), window(10, 30)))))));

runner.finishBundle();

List<WindowedValue<KV<String, Long>>> result = runner.getReceiver(outputTag);

assertEquals(3, result.size());

assertThat(result, Matchers.contains(
WindowMatchers.isSingleWindowedValue(
KvMatcher.isKv(Matchers.equalTo("k"), Matchers.equalTo(1L)),
5, -10, 10),
WindowMatchers.isSingleWindowedValue(
KvMatcher.isKv(Matchers.equalTo("k"), Matchers.equalTo(7L)),
5, 0, 20),
WindowMatchers.isSingleWindowedValue(
KvMatcher.isKv(Matchers.equalTo("k"), Matchers.equalTo(6L)),
15, 10, 30)));
}

@Test public void testDiscontiguousWindows() throws Exception {
DoFnRunner<KV<String, Iterable<WindowedValue<String>>>,
KV<String, Iterable<String>>, List> runner =
Expand Down

0 comments on commit c4b7682

Please sign in to comment.