Skip to content

Commit

Permalink
Merge branch 'master' into reorganize_iceberg_its
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 authored Dec 23, 2024
2 parents 156ad2b + 7c86bf3 commit 4d39646
Show file tree
Hide file tree
Showing 44 changed files with 318 additions and 364 deletions.
1 change: 1 addition & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ github:

protected_branches:
master: {}
release-2.62.0: {}
release-2.61.0: {}
release-2.60.0: {}
release-2.59.0: {}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
5 changes: 3 additions & 2 deletions .github/workflows/beam_Python_CostBenchmarks_Dataflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ jobs:
-PloadTest.mainClass=apache_beam.testing.benchmarks.wordcount.wordcount \
-Prunner=DataflowRunner \
-PpythonVersion=3.10 \
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_1 }} --job_name=benchmark-tests-wordcount-python-${{env.NOW_UTC}} --output_file=gs://temp-storage-for-end-to-end-tests/wordcount/result_wordcount-${{env.NOW_UTC}}.txt' \
'-PloadTest.args=${{ env.beam_Python_Cost_Benchmarks_Dataflow_test_arguments_1 }} --job_name=benchmark-tests-wordcount-python-${{env.NOW_UTC}} --output_file=gs://temp-storage-for-end-to-end-tests/wordcount/result_wordcount-${{env.NOW_UTC}}.txt' \
- name: Run Tensorflow MNIST Image Classification on Dataflow
uses: ./.github/actions/gradle-command-self-hosted-action
timeout-minutes: 30
Expand All @@ -102,4 +102,5 @@ jobs:
-PloadTest.mainClass=apache_beam.testing.benchmarks.inference.tensorflow_mnist_classification_cost_benchmark \
-Prunner=DataflowRunner \
-PpythonVersion=3.10 \
'-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_2 }} --job_name=benchmark-tests-tf-mnist-classification-python-${{env.NOW_UTC}} --input_file=gs://apache-beam-ml/testing/inputs/it_mnist_data.csv --output_file=gs://temp-storage-for-end-to-end-tests/wordcount/result_tf_mnist-${{env.NOW_UTC}}.txt --model=gs://apache-beam-ml/models/tensorflow/mnist/' \
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/tensorflow_tests_requirements.txt \
'-PloadTest.args=${{ env.beam_Python_Cost_Benchmarks_Dataflow_test_arguments_2 }} --job_name=benchmark-tests-tf-mnist-classification-python-${{env.NOW_UTC}} --input_file=gs://apache-beam-ml/testing/inputs/it_mnist_data.csv --output_file=gs://temp-storage-for-end-to-end-tests/inference/result_tf_mnist-${{env.NOW_UTC}}.txt --model=gs://apache-beam-ml/models/tensorflow/mnist/' \
45 changes: 31 additions & 14 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
## New Features / Improvements
* The datetime module is now available for use in jinja templatization for yaml.
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
## Breaking Changes
Expand All @@ -54,7 +53,7 @@
* ([#X](https://github.com/apache/beam/issues/X)).
-->

# [2.62.0] - Unreleased
# [2.63.0] - Unreleased

## Highlights

Expand All @@ -63,24 +62,14 @@

## I/Os

* gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)).
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Managed Iceberg] Support partitioning by time (year, month, day, hour) for types `date`, `time`, `timestamp`, and `timestamp(tz)` ([#32939](https://github.com/apache/beam/pull/32939))
* Upgraded the default version of Hadoop dependencies to 3.4.1. Hadoop 2.10.2 is still supported (Java) ([#33011](https://github.com/apache/beam/issues/33011)).
* [BigQueryIO] Create managed BigLake tables dynamically ([#33125](https://github.com/apache/beam/pull/33125))

## New Features / Improvements

* Added support for stateful processing in Spark Runner for streaming pipelines. Timer functionality is not yet supported and will be implemented in a future release ([#33237](https://github.com/apache/beam/issues/33237)).
* Improved batch performance of SparkRunner's GroupByKey ([#20943](https://github.com/apache/beam/pull/20943)).
* Support OnWindowExpiration in Prism ([#32211](https://github.com/apache/beam/issues/32211)).
* This enables initial Java GroupIntoBatches support.
* Support OrderedListState in Prism ([#32929](https://github.com/apache/beam/issues/32929)).
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Breaking Changes

* Upgraded ZetaSQL to 2024.11.1 ([#32902](https://github.com/apache/beam/pull/32902)). Java11+ is now needed if Beam's ZetaSQL component is used.
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).

## Deprecations
Expand All @@ -90,16 +79,44 @@
## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed (CVE-2024-47561)[https://www.cve.org/CVERecord?id=CVE-2024-47561] (Java) by upgrading Avro version to 1.11.4

## Known Issues

* ([#X](https://github.com/apache/beam/issues/X)).

# [2.62.0] - Unreleased

## I/Os

* gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)).
* [Managed Iceberg] Support partitioning by time (year, month, day, hour) for types `date`, `time`, `timestamp`, and `timestamp(tz)` ([#32939](https://github.com/apache/beam/pull/32939))
* Upgraded the default version of Hadoop dependencies to 3.4.1. Hadoop 2.10.2 is still supported (Java) ([#33011](https://github.com/apache/beam/issues/33011)).
* [BigQueryIO] Create managed BigLake tables dynamically ([#33125](https://github.com/apache/beam/pull/33125))

## New Features / Improvements

* Added support for stateful processing in Spark Runner for streaming pipelines. Timer functionality is not yet supported and will be implemented in a future release ([#33237](https://github.com/apache/beam/issues/33237)).
* The datetime module is now available for use in jinja templatization for yaml.
* Improved batch performance of SparkRunner's GroupByKey ([#20943](https://github.com/apache/beam/pull/20943)).
* Support OnWindowExpiration in Prism ([#32211](https://github.com/apache/beam/issues/32211)).
* This enables initial Java GroupIntoBatches support.
* Support OrderedListState in Prism ([#32929](https://github.com/apache/beam/issues/32929)).

## Breaking Changes

* Upgraded ZetaSQL to 2024.11.1 ([#32902](https://github.com/apache/beam/pull/32902)). Java11+ is now needed if Beam's ZetaSQL component is used.

## Bugfixes

* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)).

## Security Fixes

* Fixed (CVE-2024-47561)[https://www.cve.org/CVERecord?id=CVE-2024-47561] (Java) by upgrading Avro version to 1.11.4

# [2.61.0] - 2024-11-25

## Highlights
Expand Down
52 changes: 25 additions & 27 deletions examples/multi-language/python/wordcount_external.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import logging

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.external import BeamJarExpansionService
from apache_beam.transforms.external_transform_provider import ExternalTransformProvider
from apache_beam.typehints.row_type import RowTypeConstraint
"""A Python multi-language pipeline that counts words using multiple Java SchemaTransforms.
Expand Down Expand Up @@ -60,39 +60,35 @@
--expansion_service_port <PORT>
"""

# Original Java transform is in ExtractWordsProvider.java
EXTRACT_IDENTIFIER = "beam:schematransform:org.apache.beam:extract_words:v1"
# Original Java transform is in JavaCountProvider.java
COUNT_IDENTIFIER = "beam:schematransform:org.apache.beam:count:v1"
# Original Java transform is in WriteWordsProvider.java
WRITE_IDENTIFIER = "beam:schematransform:org.apache.beam:write_words:v1"


def run(input_path, output_path, expansion_service_port, pipeline_args):
pipeline_options = PipelineOptions(pipeline_args)

# Discover and get external transforms from this expansion service
provider = ExternalTransformProvider("localhost:" + expansion_service_port)
# Get transforms with identifiers, then use them as you would a regular
# native PTransform
Extract = provider.get_urn(EXTRACT_IDENTIFIER)
Count = provider.get_urn(COUNT_IDENTIFIER)
Write = provider.get_urn(WRITE_IDENTIFIER)

with beam.Pipeline(options=pipeline_options) as p:
lines = p | 'Read' >> ReadFromText(input_path)

words = (lines
| 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line))
| 'Extract Words' >> Extract())
word_counts = words | 'Count Words' >> Count()
formatted_words = (
word_counts
| 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % (
row.word, row.count))).with_output_types(
RowTypeConstraint.from_fields([('line', str)])))

formatted_words | 'Write' >> Write(file_path_prefix=output_path)
expansion_service = BeamJarExpansionService(
"examples:multi-language:shadowJar")
if expansion_service_port:
expansion_service = "localhost:" + expansion_service_port

provider = ExternalTransformProvider(expansion_service)
# Retrieve portable transforms
Extract = provider.get_urn(EXTRACT_IDENTIFIER)
Count = provider.get_urn(COUNT_IDENTIFIER)
Write = provider.get_urn(WRITE_IDENTIFIER)

_ = (p
| 'Read' >> beam.io.ReadFromText(input_path)
| 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line))
| 'Extract Words' >> Extract(drop=["king", "palace"])
| 'Count Words' >> Count()
| 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % (
row.word, row.count))).with_output_types(
RowTypeConstraint.from_fields([('line', str)]))
| 'Write' >> Write(file_path_prefix=output_path))


if __name__ == '__main__':
Expand All @@ -110,8 +106,10 @@ def run(input_path, output_path, expansion_service_port, pipeline_args):
help='Output file')
parser.add_argument('--expansion_service_port',
dest='expansion_service_port',
required=True,
help='Expansion service port')
required=False,
help='Expansion service port. If left empty, the '
'existing multi-language examples service will '
'be used by default.')
known_args, pipeline_args = parser.parse_known_args()

run(known_args.input, known_args.output, known_args.expansion_service_port,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
Expand All @@ -36,7 +39,6 @@
/** Splits a line into separate words and returns each word. */
@AutoService(SchemaTransformProvider.class)
public class ExtractWordsProvider extends TypedSchemaTransformProvider<Configuration> {
public static final Schema OUTPUT_SCHEMA = Schema.builder().addStringField("word").build();

@Override
public String identifier() {
Expand All @@ -45,32 +47,60 @@ public String identifier() {

@Override
protected SchemaTransform from(Configuration configuration) {
return new SchemaTransform() {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
return PCollectionRowTuple.of(
"output",
input.get("input").apply(ParDo.of(new ExtractWordsFn())).setRowSchema(OUTPUT_SCHEMA));
}
};
return new ExtractWordsTransform(configuration);
}

static class ExtractWordsFn extends DoFn<Row, Row> {
@ProcessElement
public void processElement(@Element Row element, OutputReceiver<Row> receiver) {
// Split the line into words.
String line = Preconditions.checkStateNotNull(element.getString("line"));
String[] words = line.split("[^\\p{L}]+", -1);
static class ExtractWordsTransform extends SchemaTransform {
private static final Schema OUTPUT_SCHEMA = Schema.builder().addStringField("word").build();
private final List<String> drop;

for (String word : words) {
if (!word.isEmpty()) {
receiver.output(Row.withSchema(OUTPUT_SCHEMA).withFieldValue("word", word).build());
}
}
ExtractWordsTransform(Configuration configuration) {
this.drop = configuration.getDrop();
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
return PCollectionRowTuple.of(
"output",
input
.getSinglePCollection()
.apply(
ParDo.of(
new DoFn<Row, Row>() {
@ProcessElement
public void process(@Element Row element, OutputReceiver<Row> receiver) {
// Split the line into words.
String line = Preconditions.checkStateNotNull(element.getString("line"));
String[] words = line.split("[^\\p{L}]+", -1);
Arrays.stream(words)
.filter(w -> !drop.contains(w))
.forEach(
word ->
receiver.output(
Row.withSchema(OUTPUT_SCHEMA)
.withFieldValue("word", word)
.build()));
}
}))
.setRowSchema(OUTPUT_SCHEMA));
}
}

@DefaultSchema(AutoValueSchema.class)
@AutoValue
protected abstract static class Configuration {}
public abstract static class Configuration {
public static Builder builder() {
return new AutoValue_ExtractWordsProvider_Configuration.Builder();
}

@SchemaFieldDescription("List of words to drop.")
public abstract List<String> getDrop();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setDrop(List<String> foo);

public abstract Configuration build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,35 +44,37 @@ public String identifier() {

@Override
protected SchemaTransform from(Configuration configuration) {
return new SchemaTransform() {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
Schema outputSchema =
Schema.builder().addStringField("word").addInt64Field("count").build();
return new JavaCountTransform();
}

static class JavaCountTransform extends SchemaTransform {
static final Schema OUTPUT_SCHEMA =
Schema.builder().addStringField("word").addInt64Field("count").build();

PCollection<Row> wordCounts =
input
.get("input")
.apply(Count.perElement())
.apply(
MapElements.into(TypeDescriptors.rows())
.via(
kv ->
Row.withSchema(outputSchema)
.withFieldValue(
"word",
Preconditions.checkStateNotNull(
kv.getKey().getString("word")))
.withFieldValue("count", kv.getValue())
.build()))
.setRowSchema(outputSchema);
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
PCollection<Row> wordCounts =
input
.get("input")
.apply(Count.perElement())
.apply(
MapElements.into(TypeDescriptors.rows())
.via(
kv ->
Row.withSchema(OUTPUT_SCHEMA)
.withFieldValue(
"word",
Preconditions.checkStateNotNull(
kv.getKey().getString("word")))
.withFieldValue("count", kv.getValue())
.build()))
.setRowSchema(OUTPUT_SCHEMA);

return PCollectionRowTuple.of("output", wordCounts);
}
};
return PCollectionRowTuple.of("output", wordCounts);
}
}

@DefaultSchema(AutoValueSchema.class)
@AutoValue
protected abstract static class Configuration {}
public abstract static class Configuration {}
}
Loading

0 comments on commit 4d39646

Please sign in to comment.