Skip to content

Commit

Permalink
Merge pull request #2 from apache/master
Browse files Browse the repository at this point in the history
merge-beam changes #2
  • Loading branch information
amoght authored Dec 2, 2019
2 parents 0de962c + 1f64ba3 commit ace8d21
Show file tree
Hide file tree
Showing 37 changed files with 1,530 additions and 76 deletions.
1 change: 1 addition & 0 deletions .test-infra/jenkins/job_PreCommit_Python.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ PrecommitJobBuilder builder = new PrecommitJobBuilder(
scope: this,
nameBase: 'Python',
gradleTask: ':pythonPreCommit',
timeoutMins: 180,
triggerPathPatterns: [
'^model/.*$',
'^sdks/python/.*$',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@

import com.google.auto.service.AutoService;
import java.util.Map;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.AvroGenericCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;

/** Coder registrar for AvroCoder. */
/** Coder registrar for AvroGenericCoder. */
@AutoService(CoderTranslatorRegistrar.class)
public class AvroCoderRegistrar implements CoderTranslatorRegistrar {
public static final String AVRO_CODER_URN = "beam:coder:avro:v1";
public class AvroGenericCoderRegistrar implements CoderTranslatorRegistrar {
public static final String AVRO_CODER_URN = "beam:coder:avro:generic:v1";

@Override
public Map<Class<? extends Coder>, String> getCoderURNs() {
return ImmutableMap.of(AvroCoder.class, AVRO_CODER_URN);
return ImmutableMap.of(AvroGenericCoder.class, AVRO_CODER_URN);
}

@Override
public Map<Class<? extends Coder>, CoderTranslator<? extends Coder>> getCoderTranslators() {
return ImmutableMap.of(AvroCoder.class, new AvroCoderTranslator());
return ImmutableMap.of(AvroGenericCoder.class, new AvroGenericCoderTranslator());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,25 @@
import java.util.Collections;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.AvroGenericCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;

/** Coder translator for AvroCoder. */
public class AvroCoderTranslator implements CoderTranslator<AvroCoder<?>> {
/** Coder translator for AvroGenericCoder. */
public class AvroGenericCoderTranslator implements CoderTranslator<AvroGenericCoder> {
@Override
public List<? extends Coder<?>> getComponents(AvroCoder from) {
public List<? extends Coder<?>> getComponents(AvroGenericCoder from) {
return Collections.emptyList();
}

@Override
public byte[] getPayload(AvroCoder from) {
public byte[] getPayload(AvroGenericCoder from) {
return from.getSchema().toString().getBytes(Charsets.UTF_8);
}

@Override
public AvroCoder fromComponents(List<Coder<?>> components, byte[] payload) {
public AvroGenericCoder fromComponents(List<Coder<?>> components, byte[] payload) {
Schema schema = new Schema.Parser().parse(new String(payload, Charsets.UTF_8));
return AvroCoder.of(schema);
return AvroGenericCoder.of(schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ public static Iterable<Coder<?>> data() {
KvCoder.of(
new RecordCoder(),
AvroCoder.of(SchemaBuilder.record("record").fields().endRecord())))
.add(
StringUtf8Coder.of(),
SerializableCoder.of(Record.class),
new RecordCoder(),
KvCoder.of(new RecordCoder(), AvroCoder.of(Record.class)))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,35 @@ public class MetricsPusherTest {
private static final Logger LOG = LoggerFactory.getLogger(MetricsPusherTest.class);

private static final long NUM_ELEMENTS = 1000L;
private static final String COUNTER_NAME = "counter";
@Rule public final TestPipeline pipeline = TestPipeline.create();

@Before
public void init() {
TestMetricsSink.clear();
MetricsOptions options = pipeline.getOptions().as(MetricsOptions.class);
options.setMetricsSink(TestMetricsSink.class);
}

@Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class})
@Test
public void test() throws Exception {
public void pushesUserMetrics() throws Exception {
TestMetricsSink.clear();
pipeline
.apply(
// Use maxReadTime to force unbounded mode.
GenerateSequence.from(0).to(NUM_ELEMENTS).withMaxReadTime(Duration.standardDays(1)))
.apply(ParDo.of(new CountingDoFn()));
pipeline.run();
// give metrics pusher time to push
Thread.sleep(
(pipeline.getOptions().as(MetricsOptions.class).getMetricsPushPeriod() + 1L) * 1000);
assertThat(TestMetricsSink.getCounterValue(COUNTER_NAME), is(NUM_ELEMENTS));
}

@Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class})
@Test
public void pushesSystemMetrics() throws InterruptedException {
TestMetricsSink.clear();
pipeline
.apply(
// Use maxReadTime to force unbounded mode.
Expand All @@ -69,11 +86,11 @@ public void test() throws Exception {
// give metrics pusher time to push
Thread.sleep(
(pipeline.getOptions().as(MetricsOptions.class).getMetricsPushPeriod() + 1L) * 1000);
assertThat(TestMetricsSink.getCounterValue(), is(NUM_ELEMENTS));
assertThat(TestMetricsSink.getSystemCounters().isEmpty(), is(false));
}

private static class CountingDoFn extends DoFn<Long, Long> {
private final Counter counter = Metrics.counter(MetricsPusherTest.class, "counter");
private final Counter counter = Metrics.counter(MetricsPusherTest.class, COUNTER_NAME);

@ProcessElement
public void processElement(ProcessContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
*/
package org.apache.beam.runners.core.metrics;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.metrics.MetricsSink;

Expand All @@ -27,23 +31,34 @@
*/
public class TestMetricsSink implements MetricsSink {

private static long counterValue;
private static MetricQueryResults metricQueryResults;
private static final String SYSTEM_METRIC_PREFIX = "metric:";

public TestMetricsSink(MetricsOptions pipelineOptions) {}

public static long getCounterValue() {
return counterValue;
public static long getCounterValue(String counterName) {
for (MetricResult<Long> metricResult : metricQueryResults.getCounters()) {
if (metricResult.getName().getName().equals(counterName)) {
return metricResult.getAttempted();
}
}
return 0L;
}

public static List<MetricResult<Long>> getSystemCounters() {
List<MetricResult<Long>> counters =
StreamSupport.stream(metricQueryResults.getCounters().spliterator(), false)
.filter(result -> result.getKey().metricName().getName().contains(SYSTEM_METRIC_PREFIX))
.collect(Collectors.toList());
return counters;
}

public static void clear() {
counterValue = 0L;
metricQueryResults = null;
}

@Override
public void writeMetrics(MetricQueryResults metricQueryResults) throws Exception {
counterValue =
metricQueryResults.getCounters().iterator().hasNext()
? metricQueryResults.getCounters().iterator().next().getAttempted()
: 0L;
public void writeMetrics(MetricQueryResults metricQueryResult) throws Exception {
metricQueryResults = metricQueryResult;
}
}
3 changes: 0 additions & 3 deletions runners/flink/job-server/flink_job_server.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,9 @@ def portableValidatesRunnerTask(String name, Boolean streaming) {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesCounterMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms'
excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
excludeCategories 'org.apache.beam.sdk.testing.UsesDistributionMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ public DataflowPackage stageToFile(byte[] bytes, String baseName) {
}

private GcsCreateOptions buildCreateOptions() {
// Default is 1M, to avoid excessive memory use when uploading, but can be changed with
// {@link DataflowPipelineOptions#getGcsUploadBufferSizeBytes()}.
int uploadSizeBytes = firstNonNull(options.getGcsUploadBufferSizeBytes(), 1024 * 1024);
checkArgument(uploadSizeBytes > 0, "gcsUploadBufferSizeBytes must be > 0");
uploadSizeBytes = Math.min(uploadSizeBytes, 1024 * 1024);

return GcsCreateOptions.builder()
.setGcsUploadBufferSizeBytes(uploadSizeBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class JobServicePipelineResult implements PipelineResult, AutoCloseable {
private final CloseableResource<JobServiceBlockingStub> jobService;
@Nullable private State terminationState;
@Nullable private final Runnable cleanup;
private org.apache.beam.model.jobmanagement.v1.JobApi.MetricResults jobMetrics;

JobServicePipelineResult(
ByteString jobId, CloseableResource<JobServiceBlockingStub> jobService, Runnable cleanup) {
Expand Down Expand Up @@ -121,12 +122,15 @@ public State waitUntilFinish() {

@Override
public MetricResults metrics() {
throw new UnsupportedOperationException("Not yet implemented.");
return PortableMetrics.of(jobMetrics);
}

@Override
public void close() {
try (CloseableResource<JobServiceBlockingStub> jobService = this.jobService) {
JobApi.GetJobMetricsRequest metricsRequest =
JobApi.GetJobMetricsRequest.newBuilder().setJobIdBytes(jobId).build();
jobMetrics = jobService.get().getJobMetrics(metricsRequest).getMetrics();
if (cleanup != null) {
cleanup.run();
}
Expand Down
Loading

0 comments on commit ace8d21

Please sign in to comment.