From 387e05b3a2c055f5170439fb5e14baf3da717301 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 11 Dec 2024 18:15:03 -0500 Subject: [PATCH 1/2] improve python multi-lang examples --- .../python/wordcount_external.py | 30 +++----- .../ExtractWordsProvider.java | 72 +++++++++++++------ .../schematransforms/JavaCountProvider.java | 52 +++++++------- .../schematransforms/WriteWordsProvider.java | 34 +++++---- 4 files changed, 109 insertions(+), 79 deletions(-) diff --git a/examples/multi-language/python/wordcount_external.py b/examples/multi-language/python/wordcount_external.py index 580c0269d361..eec64e0d0d10 100644 --- a/examples/multi-language/python/wordcount_external.py +++ b/examples/multi-language/python/wordcount_external.py @@ -18,7 +18,6 @@ import logging import apache_beam as beam -from apache_beam.io import ReadFromText from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.transforms.external_transform_provider import ExternalTransformProvider from apache_beam.typehints.row_type import RowTypeConstraint @@ -60,39 +59,30 @@ --expansion_service_port """ -# Original Java transform is in ExtractWordsProvider.java EXTRACT_IDENTIFIER = "beam:schematransform:org.apache.beam:extract_words:v1" -# Original Java transform is in JavaCountProvider.java COUNT_IDENTIFIER = "beam:schematransform:org.apache.beam:count:v1" -# Original Java transform is in WriteWordsProvider.java WRITE_IDENTIFIER = "beam:schematransform:org.apache.beam:write_words:v1" def run(input_path, output_path, expansion_service_port, pipeline_args): pipeline_options = PipelineOptions(pipeline_args) - # Discover and get external transforms from this expansion service provider = ExternalTransformProvider("localhost:" + expansion_service_port) - # Get transforms with identifiers, then use them as you would a regular - # native PTransform + # Retrieve portable transforms Extract = provider.get_urn(EXTRACT_IDENTIFIER) Count = provider.get_urn(COUNT_IDENTIFIER) Write = provider.get_urn(WRITE_IDENTIFIER) with beam.Pipeline(options=pipeline_options) as p: - lines = p | 'Read' >> ReadFromText(input_path) - - words = (lines - | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line)) - | 'Extract Words' >> Extract()) - word_counts = words | 'Count Words' >> Count() - formatted_words = ( - word_counts - | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % ( - row.word, row.count))).with_output_types( - RowTypeConstraint.from_fields([('line', str)]))) - - formatted_words | 'Write' >> Write(file_path_prefix=output_path) + _ = (p + | 'Read' >> beam.io.ReadFromText(input_path) + | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line)) + | 'Extract Words' >> Extract(filter=["king", "palace"]) + | 'Count Words' >> Count() + | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % ( + row.word, row.count))).with_output_types( + RowTypeConstraint.from_fields([('line', str)])) + | 'Write' >> Write(file_path_prefix=output_path)) if __name__ == '__main__': diff --git a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java index 724dbce276fb..66bab336ce95 100644 --- a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java +++ b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java @@ -21,9 +21,12 @@ import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; +import java.util.Arrays; +import java.util.List; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; @@ -36,7 +39,6 @@ /** Splits a line into separate words and returns each word. */ @AutoService(SchemaTransformProvider.class) public class ExtractWordsProvider extends TypedSchemaTransformProvider { - public static final Schema OUTPUT_SCHEMA = Schema.builder().addStringField("word").build(); @Override public String identifier() { @@ -45,32 +47,60 @@ public String identifier() { @Override protected SchemaTransform from(Configuration configuration) { - return new SchemaTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - return PCollectionRowTuple.of( - "output", - input.get("input").apply(ParDo.of(new ExtractWordsFn())).setRowSchema(OUTPUT_SCHEMA)); - } - }; + return new ExtractWordsTransform(configuration); } - static class ExtractWordsFn extends DoFn { - @ProcessElement - public void processElement(@Element Row element, OutputReceiver receiver) { - // Split the line into words. - String line = Preconditions.checkStateNotNull(element.getString("line")); - String[] words = line.split("[^\\p{L}]+", -1); + static class ExtractWordsTransform extends SchemaTransform { + private static final Schema OUTPUT_SCHEMA = Schema.builder().addStringField("word").build(); + private final List filter; - for (String word : words) { - if (!word.isEmpty()) { - receiver.output(Row.withSchema(OUTPUT_SCHEMA).withFieldValue("word", word).build()); - } - } + ExtractWordsTransform(Configuration configuration) { + this.filter = configuration.getFilter(); + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + return PCollectionRowTuple.of( + "output", + input + .getSinglePCollection() + .apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void process(@Element Row element, OutputReceiver receiver) { + // Split the line into words. + String line = Preconditions.checkStateNotNull(element.getString("line")); + String[] words = line.split("[^\\p{L}]+", -1); + Arrays.stream(words) + .filter(filter::contains) + .forEach( + word -> + receiver.output( + Row.withSchema(OUTPUT_SCHEMA) + .withFieldValue("word", word) + .build())); + } + })) + .setRowSchema(OUTPUT_SCHEMA)); } } @DefaultSchema(AutoValueSchema.class) @AutoValue - protected abstract static class Configuration {} + public abstract static class Configuration { + public static Builder builder() { + return new AutoValue_ExtractWordsProvider_Configuration.Builder(); + } + + @SchemaFieldDescription("List of words to filter out.") + public abstract List getFilter(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setFilter(List foo); + + public abstract Configuration build(); + } + } } diff --git a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java index cabea594ae18..90d02d92c3cb 100644 --- a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java +++ b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java @@ -44,35 +44,37 @@ public String identifier() { @Override protected SchemaTransform from(Configuration configuration) { - return new SchemaTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - Schema outputSchema = - Schema.builder().addStringField("word").addInt64Field("count").build(); + return new JavaCountTransform(); + } + + static class JavaCountTransform extends SchemaTransform { + static final Schema OUTPUT_SCHEMA = + Schema.builder().addStringField("word").addInt64Field("count").build(); - PCollection wordCounts = - input - .get("input") - .apply(Count.perElement()) - .apply( - MapElements.into(TypeDescriptors.rows()) - .via( - kv -> - Row.withSchema(outputSchema) - .withFieldValue( - "word", - Preconditions.checkStateNotNull( - kv.getKey().getString("word"))) - .withFieldValue("count", kv.getValue()) - .build())) - .setRowSchema(outputSchema); + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + PCollection wordCounts = + input + .get("input") + .apply(Count.perElement()) + .apply( + MapElements.into(TypeDescriptors.rows()) + .via( + kv -> + Row.withSchema(OUTPUT_SCHEMA) + .withFieldValue( + "word", + Preconditions.checkStateNotNull( + kv.getKey().getString("word"))) + .withFieldValue("count", kv.getValue()) + .build())) + .setRowSchema(OUTPUT_SCHEMA); - return PCollectionRowTuple.of("output", wordCounts); - } - }; + return PCollectionRowTuple.of("output", wordCounts); + } } @DefaultSchema(AutoValueSchema.class) @AutoValue - protected abstract static class Configuration {} + public abstract static class Configuration {} } diff --git a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java index 0b2017c5587a..faf9590a7f16 100644 --- a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java +++ b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java @@ -42,24 +42,32 @@ public String identifier() { @Override protected SchemaTransform from(Configuration configuration) { - return new SchemaTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - input - .get("input") - .apply( - MapElements.into(TypeDescriptors.strings()) - .via(row -> Preconditions.checkStateNotNull(row.getString("line")))) - .apply(TextIO.write().to(configuration.getFilePathPrefix())); + return new WriteWordsTransform(configuration); + } + + static class WriteWordsTransform extends SchemaTransform { + private final String filePathPrefix; + + WriteWordsTransform(Configuration configuration) { + this.filePathPrefix = configuration.getFilePathPrefix(); + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + input + .get("input") + .apply( + MapElements.into(TypeDescriptors.strings()) + .via(row -> Preconditions.checkStateNotNull(row.getString("line")))) + .apply(TextIO.write().to(filePathPrefix)); - return PCollectionRowTuple.empty(input.getPipeline()); - } - }; + return PCollectionRowTuple.empty(input.getPipeline()); + } } @DefaultSchema(AutoValueSchema.class) @AutoValue - protected abstract static class Configuration { + public abstract static class Configuration { public static Builder builder() { return new AutoValue_WriteWordsProvider_Configuration.Builder(); } From 34df8bfc3b53c710ea97ba662d82c9bd9b12fca6 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 18 Dec 2024 15:57:04 -0500 Subject: [PATCH 2/2] minor adjustments --- .../python/wordcount_external.py | 26 ++++++++++++------- .../ExtractWordsProvider.java | 12 ++++----- .../python/apache_beam/transforms/external.py | 3 ++- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/examples/multi-language/python/wordcount_external.py b/examples/multi-language/python/wordcount_external.py index eec64e0d0d10..7298d81c1b44 100644 --- a/examples/multi-language/python/wordcount_external.py +++ b/examples/multi-language/python/wordcount_external.py @@ -19,6 +19,7 @@ import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.transforms.external import BeamJarExpansionService from apache_beam.transforms.external_transform_provider import ExternalTransformProvider from apache_beam.typehints.row_type import RowTypeConstraint """A Python multi-language pipeline that counts words using multiple Java SchemaTransforms. @@ -67,17 +68,22 @@ def run(input_path, output_path, expansion_service_port, pipeline_args): pipeline_options = PipelineOptions(pipeline_args) - provider = ExternalTransformProvider("localhost:" + expansion_service_port) - # Retrieve portable transforms - Extract = provider.get_urn(EXTRACT_IDENTIFIER) - Count = provider.get_urn(COUNT_IDENTIFIER) - Write = provider.get_urn(WRITE_IDENTIFIER) - with beam.Pipeline(options=pipeline_options) as p: + expansion_service = BeamJarExpansionService( + "examples:multi-language:shadowJar") + if expansion_service_port: + expansion_service = "localhost:" + expansion_service_port + + provider = ExternalTransformProvider(expansion_service) + # Retrieve portable transforms + Extract = provider.get_urn(EXTRACT_IDENTIFIER) + Count = provider.get_urn(COUNT_IDENTIFIER) + Write = provider.get_urn(WRITE_IDENTIFIER) + _ = (p | 'Read' >> beam.io.ReadFromText(input_path) | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line)) - | 'Extract Words' >> Extract(filter=["king", "palace"]) + | 'Extract Words' >> Extract(drop=["king", "palace"]) | 'Count Words' >> Count() | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % ( row.word, row.count))).with_output_types( @@ -100,8 +106,10 @@ def run(input_path, output_path, expansion_service_port, pipeline_args): help='Output file') parser.add_argument('--expansion_service_port', dest='expansion_service_port', - required=True, - help='Expansion service port') + required=False, + help='Expansion service port. If left empty, the ' + 'existing multi-language examples service will ' + 'be used by default.') known_args, pipeline_args = parser.parse_known_args() run(known_args.input, known_args.output, known_args.expansion_service_port, diff --git a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java index 66bab336ce95..b7224ecec6b4 100644 --- a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java +++ b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java @@ -52,10 +52,10 @@ protected SchemaTransform from(Configuration configuration) { static class ExtractWordsTransform extends SchemaTransform { private static final Schema OUTPUT_SCHEMA = Schema.builder().addStringField("word").build(); - private final List filter; + private final List drop; ExtractWordsTransform(Configuration configuration) { - this.filter = configuration.getFilter(); + this.drop = configuration.getDrop(); } @Override @@ -73,7 +73,7 @@ public void process(@Element Row element, OutputReceiver receiver) { String line = Preconditions.checkStateNotNull(element.getString("line")); String[] words = line.split("[^\\p{L}]+", -1); Arrays.stream(words) - .filter(filter::contains) + .filter(w -> !drop.contains(w)) .forEach( word -> receiver.output( @@ -93,12 +93,12 @@ public static Builder builder() { return new AutoValue_ExtractWordsProvider_Configuration.Builder(); } - @SchemaFieldDescription("List of words to filter out.") - public abstract List getFilter(); + @SchemaFieldDescription("List of words to drop.") + public abstract List getDrop(); @AutoValue.Builder public abstract static class Builder { - public abstract Builder setFilter(List foo); + public abstract Builder setDrop(List foo); public abstract Configuration build(); } diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index e44f7482dc61..a4dd449e115a 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -239,7 +239,8 @@ def dict_to_row(schema_proto, py_value): extra = set(py_value.keys()) - set(row_type._fields) if extra: raise ValueError( - f"Unknown fields: {extra}. Valid fields: {row_type._fields}") + f"Transform '{self.identifier()}' was configured with unknown " + f"fields: {extra}. Valid fields: {set(row_type._fields)}") return row_type( *[ dict_to_row_recursive(