From dfdfd0a577ba8f4ef094a001c9b03722fccb5d49 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Thu, 19 Dec 2024 18:10:13 +0000 Subject: [PATCH] Improve existing Python multi-lang SchemaTransform examples (#33361) * improve python multi-lang examples * minor adjustments --- .../python/wordcount_external.py | 52 +++++++------- .../ExtractWordsProvider.java | 72 +++++++++++++------ .../schematransforms/JavaCountProvider.java | 52 +++++++------- .../schematransforms/WriteWordsProvider.java | 34 +++++---- .../python/apache_beam/transforms/external.py | 3 +- 5 files changed, 126 insertions(+), 87 deletions(-) diff --git a/examples/multi-language/python/wordcount_external.py b/examples/multi-language/python/wordcount_external.py index 580c0269d361..7298d81c1b44 100644 --- a/examples/multi-language/python/wordcount_external.py +++ b/examples/multi-language/python/wordcount_external.py @@ -18,8 +18,8 @@ import logging import apache_beam as beam -from apache_beam.io import ReadFromText from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.transforms.external import BeamJarExpansionService from apache_beam.transforms.external_transform_provider import ExternalTransformProvider from apache_beam.typehints.row_type import RowTypeConstraint """A Python multi-language pipeline that counts words using multiple Java SchemaTransforms. @@ -60,39 +60,35 @@ --expansion_service_port """ -# Original Java transform is in ExtractWordsProvider.java EXTRACT_IDENTIFIER = "beam:schematransform:org.apache.beam:extract_words:v1" -# Original Java transform is in JavaCountProvider.java COUNT_IDENTIFIER = "beam:schematransform:org.apache.beam:count:v1" -# Original Java transform is in WriteWordsProvider.java WRITE_IDENTIFIER = "beam:schematransform:org.apache.beam:write_words:v1" def run(input_path, output_path, expansion_service_port, pipeline_args): pipeline_options = PipelineOptions(pipeline_args) - # Discover and get external transforms from this expansion service - provider = ExternalTransformProvider("localhost:" + expansion_service_port) - # Get transforms with identifiers, then use them as you would a regular - # native PTransform - Extract = provider.get_urn(EXTRACT_IDENTIFIER) - Count = provider.get_urn(COUNT_IDENTIFIER) - Write = provider.get_urn(WRITE_IDENTIFIER) - with beam.Pipeline(options=pipeline_options) as p: - lines = p | 'Read' >> ReadFromText(input_path) - - words = (lines - | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line)) - | 'Extract Words' >> Extract()) - word_counts = words | 'Count Words' >> Count() - formatted_words = ( - word_counts - | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % ( - row.word, row.count))).with_output_types( - RowTypeConstraint.from_fields([('line', str)]))) - - formatted_words | 'Write' >> Write(file_path_prefix=output_path) + expansion_service = BeamJarExpansionService( + "examples:multi-language:shadowJar") + if expansion_service_port: + expansion_service = "localhost:" + expansion_service_port + + provider = ExternalTransformProvider(expansion_service) + # Retrieve portable transforms + Extract = provider.get_urn(EXTRACT_IDENTIFIER) + Count = provider.get_urn(COUNT_IDENTIFIER) + Write = provider.get_urn(WRITE_IDENTIFIER) + + _ = (p + | 'Read' >> beam.io.ReadFromText(input_path) + | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line)) + | 'Extract Words' >> Extract(drop=["king", "palace"]) + | 'Count Words' >> Count() + | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % ( + row.word, row.count))).with_output_types( + RowTypeConstraint.from_fields([('line', str)])) + | 'Write' >> Write(file_path_prefix=output_path)) if __name__ == '__main__': @@ -110,8 +106,10 @@ def run(input_path, output_path, expansion_service_port, pipeline_args): help='Output file') parser.add_argument('--expansion_service_port', dest='expansion_service_port', - required=True, - help='Expansion service port') + required=False, + help='Expansion service port. If left empty, the ' + 'existing multi-language examples service will ' + 'be used by default.') known_args, pipeline_args = parser.parse_known_args() run(known_args.input, known_args.output, known_args.expansion_service_port, diff --git a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java index 724dbce276fb..b7224ecec6b4 100644 --- a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java +++ b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java @@ -21,9 +21,12 @@ import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; +import java.util.Arrays; +import java.util.List; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; @@ -36,7 +39,6 @@ /** Splits a line into separate words and returns each word. */ @AutoService(SchemaTransformProvider.class) public class ExtractWordsProvider extends TypedSchemaTransformProvider { - public static final Schema OUTPUT_SCHEMA = Schema.builder().addStringField("word").build(); @Override public String identifier() { @@ -45,32 +47,60 @@ public String identifier() { @Override protected SchemaTransform from(Configuration configuration) { - return new SchemaTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - return PCollectionRowTuple.of( - "output", - input.get("input").apply(ParDo.of(new ExtractWordsFn())).setRowSchema(OUTPUT_SCHEMA)); - } - }; + return new ExtractWordsTransform(configuration); } - static class ExtractWordsFn extends DoFn { - @ProcessElement - public void processElement(@Element Row element, OutputReceiver receiver) { - // Split the line into words. - String line = Preconditions.checkStateNotNull(element.getString("line")); - String[] words = line.split("[^\\p{L}]+", -1); + static class ExtractWordsTransform extends SchemaTransform { + private static final Schema OUTPUT_SCHEMA = Schema.builder().addStringField("word").build(); + private final List drop; - for (String word : words) { - if (!word.isEmpty()) { - receiver.output(Row.withSchema(OUTPUT_SCHEMA).withFieldValue("word", word).build()); - } - } + ExtractWordsTransform(Configuration configuration) { + this.drop = configuration.getDrop(); + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + return PCollectionRowTuple.of( + "output", + input + .getSinglePCollection() + .apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void process(@Element Row element, OutputReceiver receiver) { + // Split the line into words. + String line = Preconditions.checkStateNotNull(element.getString("line")); + String[] words = line.split("[^\\p{L}]+", -1); + Arrays.stream(words) + .filter(w -> !drop.contains(w)) + .forEach( + word -> + receiver.output( + Row.withSchema(OUTPUT_SCHEMA) + .withFieldValue("word", word) + .build())); + } + })) + .setRowSchema(OUTPUT_SCHEMA)); } } @DefaultSchema(AutoValueSchema.class) @AutoValue - protected abstract static class Configuration {} + public abstract static class Configuration { + public static Builder builder() { + return new AutoValue_ExtractWordsProvider_Configuration.Builder(); + } + + @SchemaFieldDescription("List of words to drop.") + public abstract List getDrop(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setDrop(List foo); + + public abstract Configuration build(); + } + } } diff --git a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java index cabea594ae18..90d02d92c3cb 100644 --- a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java +++ b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java @@ -44,35 +44,37 @@ public String identifier() { @Override protected SchemaTransform from(Configuration configuration) { - return new SchemaTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - Schema outputSchema = - Schema.builder().addStringField("word").addInt64Field("count").build(); + return new JavaCountTransform(); + } + + static class JavaCountTransform extends SchemaTransform { + static final Schema OUTPUT_SCHEMA = + Schema.builder().addStringField("word").addInt64Field("count").build(); - PCollection wordCounts = - input - .get("input") - .apply(Count.perElement()) - .apply( - MapElements.into(TypeDescriptors.rows()) - .via( - kv -> - Row.withSchema(outputSchema) - .withFieldValue( - "word", - Preconditions.checkStateNotNull( - kv.getKey().getString("word"))) - .withFieldValue("count", kv.getValue()) - .build())) - .setRowSchema(outputSchema); + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + PCollection wordCounts = + input + .get("input") + .apply(Count.perElement()) + .apply( + MapElements.into(TypeDescriptors.rows()) + .via( + kv -> + Row.withSchema(OUTPUT_SCHEMA) + .withFieldValue( + "word", + Preconditions.checkStateNotNull( + kv.getKey().getString("word"))) + .withFieldValue("count", kv.getValue()) + .build())) + .setRowSchema(OUTPUT_SCHEMA); - return PCollectionRowTuple.of("output", wordCounts); - } - }; + return PCollectionRowTuple.of("output", wordCounts); + } } @DefaultSchema(AutoValueSchema.class) @AutoValue - protected abstract static class Configuration {} + public abstract static class Configuration {} } diff --git a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java index 0b2017c5587a..faf9590a7f16 100644 --- a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java +++ b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java @@ -42,24 +42,32 @@ public String identifier() { @Override protected SchemaTransform from(Configuration configuration) { - return new SchemaTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - input - .get("input") - .apply( - MapElements.into(TypeDescriptors.strings()) - .via(row -> Preconditions.checkStateNotNull(row.getString("line")))) - .apply(TextIO.write().to(configuration.getFilePathPrefix())); + return new WriteWordsTransform(configuration); + } + + static class WriteWordsTransform extends SchemaTransform { + private final String filePathPrefix; + + WriteWordsTransform(Configuration configuration) { + this.filePathPrefix = configuration.getFilePathPrefix(); + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + input + .get("input") + .apply( + MapElements.into(TypeDescriptors.strings()) + .via(row -> Preconditions.checkStateNotNull(row.getString("line")))) + .apply(TextIO.write().to(filePathPrefix)); - return PCollectionRowTuple.empty(input.getPipeline()); - } - }; + return PCollectionRowTuple.empty(input.getPipeline()); + } } @DefaultSchema(AutoValueSchema.class) @AutoValue - protected abstract static class Configuration { + public abstract static class Configuration { public static Builder builder() { return new AutoValue_WriteWordsProvider_Configuration.Builder(); } diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index fb37a8fd974d..9ca5886f4cc2 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -239,7 +239,8 @@ def dict_to_row(schema_proto, py_value): extra = set(py_value.keys()) - set(row_type._fields) if extra: raise ValueError( - f"Unknown fields: {extra}. Valid fields: {row_type._fields}") + f"Transform '{self.identifier()}' was configured with unknown " + f"fields: {extra}. Valid fields: {set(row_type._fields)}") return row_type( *[ dict_to_row_recursive(