Skip to content

Commit

Permalink
Refactor and cleanup yaml MapToFields. (#28462)
Browse files Browse the repository at this point in the history
* Avoid the use of MetaProviders, which was always kind of hacky.
  We may want to remove this infrastructure altogether as it
  does not play nicely with provider inference.

* Split MapToFields into separate mapping, filtering, and exploding
  operations.

* Allow MapToFields to act on non-schema'd PCollections.

The various langauge flavors of these UDFs are now handled by a preprocessing
step.  This will make it easier to extend to other langauges, including
in particular possible multiple (equivalent) implementations of javascript to
minimize cross-langauge boundary crossings.

---------

Co-authored-by: Danny McCormick <[email protected]>
  • Loading branch information
robertwb and damccorm authored Sep 20, 2023
1 parent 1d94f5f commit c5e6c79
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 228 deletions.
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2258,6 +2258,10 @@ def __init__(self, pcoll, exception_handling_args, upstream_errors=()):
self._exception_handling_args = exception_handling_args
self._upstream_errors = upstream_errors

@property
def element_type(self):
return self._pcoll.element_type

def main_output_tag(self):
return self._exception_handling_args.get('main_tag', 'good')

Expand Down Expand Up @@ -2309,6 +2313,10 @@ def __init__(self, pvalue, exception_handling_args=None):
else:
self._pvalue = _PValueWithErrors(pvalue, exception_handling_args)

@property
def element_type(self):
return self._pvalue.element_type

def __or__(self, transform):
return self.apply(transform)

Expand Down
23 changes: 14 additions & 9 deletions sdks/python/apache_beam/yaml/readme_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.typehints import trivial_inference
from apache_beam.yaml import yaml_mapping
from apache_beam.yaml import yaml_provider
from apache_beam.yaml import yaml_transform

Expand Down Expand Up @@ -85,13 +86,16 @@ def guess_name_and_type(expr):
typ, = [t for t in typ.__args__ if t is not type(None)]
return name, typ

output_schema = [
guess_name_and_type(expr) for expr in m.group(1).split(',')
]
output_element = beam.Row(**{name: typ() for name, typ in output_schema})
return next(iter(inputs.values())) | beam.Map(
lambda _: output_element).with_output_types(
trivial_inference.instance_to_type(output_element))
if m.group(1) == '*':
return inputs['PCOLLECTION'] | beam.Filter(lambda _: True)
else:
output_schema = [
guess_name_and_type(expr) for expr in m.group(1).split(',')
]
output_element = beam.Row(**{name: typ() for name, typ in output_schema})
return next(iter(inputs.values())) | beam.Map(
lambda _: output_element).with_output_types(
trivial_inference.instance_to_type(output_element))


class FakeReadFromPubSub(beam.PTransform):
Expand Down Expand Up @@ -204,12 +208,13 @@ def test(self):
]
options['render_leaf_composite_nodes'] = ['.*']
test_provider = TestProvider(TEST_TRANSFORMS)
test_sql_mapping_provider = yaml_mapping.SqlMappingProvider(test_provider)
p = beam.Pipeline(options=PipelineOptions(**options))
yaml_transform.expand_pipeline(
p,
modified_yaml,
{t: test_provider
for t in test_provider.provided_transforms()})
yaml_provider.merge_providers(
[test_provider, test_sql_mapping_provider]))
if test_type == 'BUILD':
return
p.run().wait_until_finish()
Expand Down
35 changes: 14 additions & 21 deletions sdks/python/apache_beam/yaml/yaml_mapping.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ Currently, in addition to Python, SQL expressions are supported as well

Sometimes it may be desirable to emit more (or less) than one record for each
input record. This can be accomplished by mapping to an iterable type and
noting that the specific field should be exploded, e.g.
following the mapping with an Explode operation, e.g.

```
- type: MapToFields
Expand All @@ -140,7 +140,9 @@ noting that the specific field should be exploded, e.g.
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "col2 + col3"
explode: new_col
- type: Explode
config:
fields: new_col
```

will result in three output records for every input record.
Expand All @@ -155,7 +157,9 @@ product over all fields should be taken. For example
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "[col2 - 1, col2, col2 + 1]"
explode: [new_col, another_col]
- type: Explode
config:
fields: [new_col, another_col]
cross_product: true
```

Expand All @@ -168,38 +172,27 @@ will emit nine records whereas
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "[col2 - 1, col2, col2 + 1]"
explode: [new_col, another_col]
- type: Explode
config:
fields: [new_col, another_col]
cross_product: false
```

will only emit three.

If one is only exploding existing fields, a simpler `Explode` transform may be
used instead
The `Explode` operation can be used on its own if the field in question is
already an iterable type.

```
- type: Explode
config:
explode: [col1]
fields: [col1]
```

## Filtering

Sometimes it can be desirable to only keep records that satisfy a certain
criteria. This can be accomplished by specifying a keep parameter, e.g.

```
- type: MapToFields
config:
language: python
fields:
new_col: "col1.upper()"
another_col: "col2 + col3"
keep: "col2 > 0"
```

Like explode, there is a simpler `Filter` transform useful when no mapping is
being done
criteria. This can be accomplished with a `Filter` transform, e.g.

```
- type: Filter
Expand Down
Loading

0 comments on commit c5e6c79

Please sign in to comment.