Skip to content

Commit

Permalink
Add schema-aware text file reading and writing.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Sep 15, 2023
1 parent 1f980ea commit ee4278f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
'ReadFromBigQuery': 'apache_beam.yaml.yaml_io.read_from_bigquery'
# Disable until https://github.com/apache/beam/issues/28162 is resolved.
# 'WriteToBigQuery': 'apache_beam.yaml.yaml_io.write_to_bigquery'
'ReadFromText': 'apache_beam.yaml.yaml_io.read_from_text'
'WriteToText': 'apache_beam.yaml.yaml_io.write_to_text'

# Declared as a renaming transform to avoid exposing all
# (implementation-specific) pandas arguments and aligning with possible Java
Expand Down
26 changes: 26 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,38 @@
import yaml

import apache_beam as beam
import apache_beam.io as beam_io
from apache_beam.io import ReadFromBigQuery
from apache_beam.io import WriteToBigQuery
from apache_beam.io.gcp.bigquery import BigQueryDisposition
from apache_beam.typehints.schemas import named_fields_from_element_type
from apache_beam.yaml import yaml_provider


def read_from_text(path: str):
# TODO(yaml): Consider passing the filename and offset, possibly even
# by default.
return beam_io.ReadFromText(path) | beam.Map(lambda s: beam.Row(line=s))


@beam.ptransform_fn
def write_to_text(pcoll, path: str):
try:
field_names = [
name for name, _ in named_fields_from_element_type(pcoll.element_type)
]
except Exception as exn:
raise ValueError(
"WriteToText requires an input schema with exactly one field.") from exn
if len(field_names) != 1:
raise ValueError(
"WriteToText requires an input schema with exactly one field, got %s" %
field_names)
sole_field_name, = field_names
return pcoll | beam.Map(
lambda x: str(getattr(x, sole_field_name))) | beam.io.WriteToText(path)


def read_from_bigquery(
query=None, table=None, row_restriction=None, fields=None):
if query is None:
Expand Down

0 comments on commit ee4278f

Please sign in to comment.