Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add schema-aware text file reading and writing. #28486

Merged
merged 1 commit into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
'ReadFromBigQuery': 'apache_beam.yaml.yaml_io.read_from_bigquery'
# Disable until https://github.com/apache/beam/issues/28162 is resolved.
# 'WriteToBigQuery': 'apache_beam.yaml.yaml_io.write_to_bigquery'
'ReadFromText': 'apache_beam.yaml.yaml_io.read_from_text'
'WriteToText': 'apache_beam.yaml.yaml_io.write_to_text'

# Declared as a renaming transform to avoid exposing all
# (implementation-specific) pandas arguments and aligning with possible Java
Expand Down
26 changes: 26 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,38 @@
import yaml

import apache_beam as beam
import apache_beam.io as beam_io
from apache_beam.io import ReadFromBigQuery
from apache_beam.io import WriteToBigQuery
from apache_beam.io.gcp.bigquery import BigQueryDisposition
from apache_beam.typehints.schemas import named_fields_from_element_type
from apache_beam.yaml import yaml_provider


def read_from_text(path: str):
# TODO(yaml): Consider passing the filename and offset, possibly even
# by default.
Comment on lines +40 to +41
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, you're saying pass them as fields in the returned beam.Row? I'm +1 on optionally doing that in the future FWIW (it would maybe be generally useful for ReadFromText) - in particular, getting the filename would likely be helpful in some use cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, exactly.

return beam_io.ReadFromText(path) | beam.Map(lambda s: beam.Row(line=s))


@beam.ptransform_fn
def write_to_text(pcoll, path: str):
try:
field_names = [
name for name, _ in named_fields_from_element_type(pcoll.element_type)
]
except Exception as exn:
raise ValueError(
"WriteToText requires an input schema with exactly one field.") from exn
if len(field_names) != 1:
raise ValueError(
"WriteToText requires an input schema with exactly one field, got %s" %
field_names)
sole_field_name, = field_names
return pcoll | beam.Map(
lambda x: str(getattr(x, sole_field_name))) | beam.io.WriteToText(path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we take a (required?) file_extension parameter as well? Right now, this will output files like: <path>-<shard_id>, but I'd guess most people will want <path>-<shard_id>.<extension>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One may want to be able to specify a general suffix, not just an extension, and maybe other sharding parameters (like the shard format). I think we'll want to add this in a consistent way to all file output types. I'm not confident enough as to what that'll look like to get something in right now though, and it is something additive. (I don't think it should be required, but perhaps could see using .txt as a default and having to override it with an empty string to get nothing. TBD)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but perhaps could see using .txt as a default

That is still technically breaking fwiw (though I think its fine to do at this stage)

One may want to be able to specify a general suffix, not just an extension, and maybe other sharding parameters (like the shard format). I think we'll want to add this in a consistent way to all file output types. I'm not confident enough as to what that'll look like to get something in right now though, and it is something additive.

I generally agree, though I think I am very confident we will want to allow folks to specify a suffix or extension (naming it suffix instead of extension is fine, though I think the latter is more intuitive for a potentially less technical audience).

Regardless, I am ok leaving this for now since I think getting something before the cut is worthwhile



def read_from_bigquery(
query=None, table=None, row_restriction=None, fields=None):
if query is None:
Expand Down