Skip to content

Commit

Permalink
Merge pull request #30007 [YAML] Several improvements to the document…
Browse files Browse the repository at this point in the history
…ation.
  • Loading branch information
robertwb authored Jan 29, 2024
2 parents ee4f8cb + e199e42 commit ce28c33
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 40 deletions.
122 changes: 90 additions & 32 deletions sdks/python/apache_beam/yaml/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,42 @@ It should be noted that everything here is still under development, but any
features already included are considered stable. Feedback is welcome at
[email protected].

## Running pipelines

The Beam yaml parser is currently included as part of the Apache Beam Python SDK.
This can be installed (e.g. within a virtual environment) as

```
pip install apache_beam[yaml,gcp]
```

In addition, several of the provided transforms (such as SQL) are implemented
in Java and their expansion will require a working Java interpeter. (The
requisite artifacts will be automatically downloaded from the apache maven
repositories, so no further installs will be required.)
Docker is also currently required for local execution of these
cross-language-requiring transforms, but not for submission to a non-local
runner such as Flink or Dataflow.

Once the prerequisites are installed, you can execute a pipeline defined
in a yaml file as

```
python -m apache_beam.yaml.main --yaml_pipeline_file=/path/to/pipeline.yaml [other pipeline options such as the runner]
```

You can do a dry-run of your pipeline using the render runner to see what the
execution graph is, e.g.

```
python -m apache_beam.yaml.main --yaml_pipeline_file=/path/to/pipeline.yaml --runner=apache_beam.runners.render.RenderRunner --render_output=out.png [--render_port=0]
```

(This requires [Graphviz](https://graphviz.org/download/) to be installed to render the pipeline.)

We intend to support running a pipeline on Dataflow by directly passing the
yaml specification to a template, no local installation of the Beam SDKs required.

## Example pipelines

Here is a simple pipeline that reads some data from csv files and
Expand Down Expand Up @@ -98,16 +134,45 @@ pipeline:
keep: "col3 > 100"
input: ReadFromCsv
- type: Sql
name: MySqlTransform
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
input: Filter
- type: WriteToJson
config:
path: /path/to/output.json
input: Sql
```

Transforms can be named to help with monitoring and debugging.

```
pipeline:
transforms:
- type: ReadFromCsv
name: ReadMyData
config:
path: /path/to/input*.csv
- type: Filter
name: KeepBigRecords
config:
language: python
keep: "col3 > 100"
input: ReadMyData
- type: Sql
name: MySqlTransform
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
input: KeepBigRecords
- type: WriteToJson
name: WriteTheOutput
config:
path: /path/to/output.json
input: MySqlTransform
```

(This is also needed to disambiguate if more than one transform of the same
type is used.)

If the pipeline is linear, we can let the inputs be implicit by designating
the pipeline as a `chain` type.

Expand Down Expand Up @@ -180,10 +245,10 @@ pipeline:
- type: Sql
config:
query: select left.col1, right.col2 from left join right using (col3)
query: select A.col1, B.col2 from A join B using (col3)
input:
left: ReadLeft
right: ReadRight
A: ReadLeft
B: ReadRight
- type: WriteToJson
name: WriteAll
Expand Down Expand Up @@ -224,10 +289,10 @@ pipeline:
- type: Sql
config:
query: select left.col1, right.col2 from left join right using (col3)
query: select A.col1, B.col2 from A join B using (col3)
input:
left: ReadLeft
right: ReadRight
A: ReadLeft
B: ReadRight
- type: WriteToJson
name: WriteAll
Expand Down Expand Up @@ -285,7 +350,9 @@ pipeline:
windowing:
type: fixed
size: 60s
- type: SomeAggregation
- type: SomeGroupingTransform
config:
arg: ...
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
Expand All @@ -305,7 +372,9 @@ pipeline:
topic: myPubSubTopic
format: ...
schema: ...
- type: SomeAggregation
- type: SomeGroupingTransform
config:
arg: ...
windowing:
type: sliding
size: 60s
Expand Down Expand Up @@ -363,10 +432,10 @@ pipeline:
- type: Sql
config:
query: select left.col1, right.col2 from left join right using (col3)
query: select A.col1, B.col2 from A join B using (col3)
input:
left: ReadLeft
right: ReadRight
A: ReadLeft
B: ReadRight
windowing:
type: fixed
size: 60s
Expand Down Expand Up @@ -504,26 +573,15 @@ providers:
MyCustomTransform: "pkg.subpkg.PTransformClassOrCallable"
```

## Running pipelines
## Other Resources

The Beam yaml parser is currently included as part of the Apache Beam Python SDK.
This can be installed (e.g. within a virtual environment) as

```
pip install apache_beam[yaml,gcp]
```

In addition, several of the provided transforms (such as SQL) are implemented
in Java and their expansion will require a working Java interpeter. (The
requisite artifacts will be automatically downloaded from the apache maven
repositories, so no further installs will be required.)
Docker is also currently required for local execution of these
cross-language-requiring transforms, but not for submission to a non-local
runner such as Flink or Dataflow.
* [Example pipelines](https://gist.github.com/robertwb/2cb26973f1b1203e8f5f8f88c5764da0)
* [More examples](https://github.com/Polber/beam/tree/jkinard/bug-bash/sdks/python/apache_beam/yaml/examples)
* [Transform glossary](https://gist.github.com/robertwb/64e2f51ff88320eeb6ffd96634202df7)

Once the prerequisites are installed, you can execute a pipeline defined
in a yaml file as
Additional documentation in this directory

```
python -m apache_beam.yaml.main --yaml_pipeline_file=/path/to/pipeline.yaml [other pipeline options such as the runner]
```
* [Mapping](yaml_mapping.md)
* [Aggregation](yaml_combine.md)
* [Error handling](yaml_errors.md)
* [Inlining Python](inline_python.md)
7 changes: 5 additions & 2 deletions sdks/python/apache_beam/yaml/readme_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ def expand(self, pcoll):
return pcoll


class SomeAggregation(beam.PTransform):
class FakeAggregation(beam.PTransform):
def __init__(self, **unused_kwargs):
pass

def expand(self, pcoll):
return pcoll | beam.GroupBy(lambda _: 'key').aggregate_field(
lambda _: 1, sum, 'count')
Expand All @@ -130,7 +133,7 @@ def expand(self, pcoll):
'Sql': FakeSql,
'ReadFromPubSub': FakeReadFromPubSub,
'WriteToPubSub': FakeWriteToPubSub,
'SomeAggregation': SomeAggregation,
'SomeGroupingTransform': FakeAggregation,
}


Expand Down
8 changes: 5 additions & 3 deletions sdks/python/apache_beam/yaml/yaml_mapping.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ To rename fields one can write

will result in an output where each record has two fields,
`new_col1` and `new_col2`, whose values are those of `col1` and `col2`
respectively.
respectively (which are the names of two fields from the input schema).

One can specify the append parameter which indicates the original fields should
be retained similar to the use of `*` in an SQL select statement. For example
Expand Down Expand Up @@ -73,7 +73,8 @@ two new ones.

Of course one may want to do transformations beyond just dropping and renaming
fields. Beam YAML has the ability to inline simple UDFs.
This requires a language specification. For example
This requires a language specification. For example, we can provide a
Python expression referencing the input fields

```
- type: MapToFields
Expand Down Expand Up @@ -116,7 +117,8 @@ this up as a dependency and simply refer to it by fully qualified name, e.g.
callable: pkg.module.fn
```

Currently, in addition to Python, SQL expressions are supported as well
Currently, in addition to Python, Java, SQL, and JavaScript (experimental)
expressions are supported as well

```
- type: MapToFields
Expand Down
31 changes: 28 additions & 3 deletions sdks/python/apache_beam/yaml/yaml_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,17 +558,42 @@ class YamlProviders:
def create(elements: Iterable[Any], reshuffle: Optional[bool] = True):
"""Creates a collection containing a specified set of elements.
YAML/JSON-style mappings will be interpreted as Beam rows. For example::
This transform always produces schema'd data. For example::
type: Create
elements:
- {first: 0, second: {str: "foo", values: [1, 2, 3]}}
config:
elements: [1, 2, 3]
will result in an output with three elements with a schema of
Row(element=int) whereas YAML/JSON-style mappings will be interpreted
directly as Beam rows, e.g.::
type: Create
config:
elements:
- {first: 0, second: {str: "foo", values: [1, 2, 3]}}
- {first: 1, second: {str: "bar", values: [4, 5, 6]}}
will result in a schema of the form (int, Row(string, List[int])).
This can also be expressed as YAML::
type: Create
config:
elements:
- first: 0
second:
str: "foo"
values: [1, 2, 3]
- first: 1
second:
str: "bar"
values: [4, 5, 6]
Args:
elements: The set of elements that should belong to the PCollection.
YAML/JSON-style mappings will be interpreted as Beam rows.
Primitives will be mapped to rows with a single "element" field.
reshuffle: (optional) Whether to introduce a reshuffle (to possibly
redistribute the work) if there is more than one element in the
collection. Defaults to True.
Expand Down

0 comments on commit ce28c33

Please sign in to comment.