From a70a5845230a5d3ebdbe4ed1a23de92b363b8a97 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 20 Sep 2023 06:39:51 -0700 Subject: [PATCH] Add schema-aware text file reading and writing. (#28486) --- sdks/python/apache_beam/yaml/standard_io.yaml | 2 ++ sdks/python/apache_beam/yaml/yaml_io.py | 26 +++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index e60f0026fd25..1738110539ce 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -51,6 +51,8 @@ 'ReadFromBigQuery': 'apache_beam.yaml.yaml_io.read_from_bigquery' # Disable until https://github.com/apache/beam/issues/28162 is resolved. # 'WriteToBigQuery': 'apache_beam.yaml.yaml_io.write_to_bigquery' + 'ReadFromText': 'apache_beam.yaml.yaml_io.read_from_text' + 'WriteToText': 'apache_beam.yaml.yaml_io.write_to_text' # Declared as a renaming transform to avoid exposing all # (implementation-specific) pandas arguments and aligning with possible Java diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index 2a9d1be62c6d..297c07e9abb5 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -28,12 +28,38 @@ import yaml import apache_beam as beam +import apache_beam.io as beam_io from apache_beam.io import ReadFromBigQuery from apache_beam.io import WriteToBigQuery from apache_beam.io.gcp.bigquery import BigQueryDisposition +from apache_beam.typehints.schemas import named_fields_from_element_type from apache_beam.yaml import yaml_provider +def read_from_text(path: str): + # TODO(yaml): Consider passing the filename and offset, possibly even + # by default. + return beam_io.ReadFromText(path) | beam.Map(lambda s: beam.Row(line=s)) + + +@beam.ptransform_fn +def write_to_text(pcoll, path: str): + try: + field_names = [ + name for name, _ in named_fields_from_element_type(pcoll.element_type) + ] + except Exception as exn: + raise ValueError( + "WriteToText requires an input schema with exactly one field.") from exn + if len(field_names) != 1: + raise ValueError( + "WriteToText requires an input schema with exactly one field, got %s" % + field_names) + sole_field_name, = field_names + return pcoll | beam.Map( + lambda x: str(getattr(x, sole_field_name))) | beam.io.WriteToText(path) + + def read_from_bigquery( query=None, table=None, row_restriction=None, fields=None): if query is None: