Skip to content

Commit

Permalink
Allow dropLataData in GBK for SamzaRunner
Browse files Browse the repository at this point in the history
  • Loading branch information
Xinyu Liu committed Sep 14, 2023
1 parent ec422db commit 160fdf5
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,10 @@ public ExecutorService create(PipelineOptions options) {
new ThreadFactoryBuilder().setNameFormat("Process Element Thread-%d").build());
}
}

@Description("Enable/disable late data dropping in GroupByKey/Combine transforms")
@Default.Boolean(false)
boolean getDropLateData();

void setDropLateData(boolean dropLateData);
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,19 @@ public TimerInternals timerInternals() {
DoFnSchemaInformation.create(),
Collections.emptyMap());

final DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> dropLateDataRunner =
pipelineOptions.getDropLateData()
? DoFnRunners.lateDataDroppingRunner(
doFnRunner, keyedInternals.timerInternals(), windowingStrategy)
: doFnRunner;

final SamzaExecutionContext executionContext =
(SamzaExecutionContext) context.getApplicationContainerContext();
this.fnRunner =
final DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunnerWithMetrics =
DoFnRunnerWithMetrics.wrap(
doFnRunner, executionContext.getMetricsContainer(), transformFullName);
dropLateDataRunner, executionContext.getMetricsContainer(), transformFullName);

this.fnRunner = new DoFnRunnerWithKeyedInternals<>(doFnRunnerWithMetrics, keyedInternals);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.samza.runtime;

import java.io.Serializable;
import java.util.Arrays;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;

/** Tests for GroupByKeyOp. */
public class GroupByKeyOpTest implements Serializable {
@Rule
public final transient TestPipeline pipeline =
TestPipeline.fromOptions(
PipelineOptionsFactory.fromArgs("--runner=TestSamzaRunner").create());

@Rule
public final transient TestPipeline dropLateDataPipeline =
TestPipeline.fromOptions(
PipelineOptionsFactory.fromArgs("--runner=TestSamzaRunner", "--dropLateData=true")
.create());

@Test
public void testDefaultGbk() {
TestStream.Builder<Integer> testStream =
TestStream.create(VarIntCoder.of())
.addElements(TimestampedValue.of(1, new Instant(1000)))
.addElements(TimestampedValue.of(2, new Instant(2000)))
.advanceWatermarkTo(new Instant(3000))
.addElements(TimestampedValue.of(10, new Instant(1000)))
.advanceWatermarkTo(new Instant(10000));

PCollection<Integer> aggregated =
pipeline
.apply(testStream.advanceWatermarkToInfinity())
.apply(
Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(3)))
.accumulatingFiredPanes())
.apply(Combine.globally(Sum.ofIntegers()).withoutDefaults());

PAssert.that(aggregated).containsInAnyOrder(Arrays.asList(3, 10));

pipeline.run().waitUntilFinish();
}

@Test
public void testDropLateDataNonKeyed() {
TestStream.Builder<Integer> testStream =
TestStream.create(VarIntCoder.of())
.addElements(TimestampedValue.of(1, new Instant(1000)))
.addElements(TimestampedValue.of(2, new Instant(2000)))
.advanceWatermarkTo(new Instant(3000))
.addElements(TimestampedValue.of(10, new Instant(1000)))
.advanceWatermarkTo(new Instant(10000));

PCollection<Integer> aggregated =
dropLateDataPipeline
.apply(testStream.advanceWatermarkToInfinity())
.apply(
Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(3)))
.accumulatingFiredPanes())
.apply(Combine.globally(Sum.ofIntegers()).withoutDefaults());

PAssert.that(aggregated).containsInAnyOrder(3);

dropLateDataPipeline.run().waitUntilFinish();
}

@Test
public void testDropLateDataKeyed() {
TestStream.Builder<KV<String, Integer>> testStream =
TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
.addElements(TimestampedValue.of(KV.of("a", 1), new Instant(1000)))
.addElements(TimestampedValue.of(KV.of("b", 2), new Instant(2000)))
.addElements(TimestampedValue.of(KV.of("a", 3), new Instant(2500)))
.advanceWatermarkTo(new Instant(3000))
.addElements(TimestampedValue.of(KV.of("a", 10), new Instant(1000)))
.advanceWatermarkTo(new Instant(10000));

PCollection<KV<String, Integer>> aggregated =
dropLateDataPipeline
.apply(testStream.advanceWatermarkToInfinity())
.apply(
Window.<KV<String, Integer>>into(FixedWindows.of(Duration.standardSeconds(3)))
.accumulatingFiredPanes())
.apply(Sum.integersPerKey());

PAssert.that(aggregated).containsInAnyOrder(Arrays.asList(KV.of("a", 4), KV.of("b", 2)));

dropLateDataPipeline.run().waitUntilFinish();
}
}

0 comments on commit 160fdf5

Please sign in to comment.