Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial start of a yaml-based declarative way of building pipelines. #24667

Merged
merged 19 commits into from
Jan 24, 2023

Conversation

robertwb
Copy link
Contributor

E.g.

pipeline:
  - type: chain
    transforms:
      - type: ReadFromText
        args:
         file_pattern: "wordcount.yaml"
      - type: PyFlatMap
        fn: "import re\nlambda line: re.findall('\\w+', line)"
      - type: PyMap
        fn: "str.lower"
      - type: PyTransform
        name: Count
        constructor: "apache_beam.transforms.combiners.Count.PerElement"
      - type: PyMap
        fn: str
      - type: WriteToText
        file_path_prefix: "counts.txt"

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@codecov
Copy link

codecov bot commented Dec 14, 2022

Codecov Report

Merging #24667 (b78009c) into master (6bbc396) will decrease coverage by 0.21%.
The diff coverage is 69.21%.

@@            Coverage Diff             @@
##           master   #24667      +/-   ##
==========================================
- Coverage   73.35%   73.15%   -0.21%     
==========================================
  Files         719      732      +13     
  Lines       97137    98093     +956     
==========================================
+ Hits        71257    71762     +505     
- Misses      24532    24983     +451     
  Partials     1348     1348              
Flag Coverage Δ
python 82.56% <69.21%> (-0.43%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/python/apache_beam/yaml/main.py 0.00% <0.00%> (ø)
sdks/python/apache_beam/transforms/external.py 78.74% <50.00%> (-0.14%) ⬇️
sdks/python/apache_beam/yaml/yaml_provider.py 63.47% <63.47%> (ø)
sdks/python/apache_beam/yaml/yaml_transform.py 80.60% <80.60%> (ø)
sdks/python/apache_beam/yaml/__init__.py 100.00% <100.00%> (ø)
sdks/python/apache_beam/utils/python_callable.py 86.88% <0.00%> (-11.16%) ⬇️
sdks/python/apache_beam/utils/interactive_utils.py 95.12% <0.00%> (-2.44%) ⬇️
...apache_beam/typehints/native_type_compatibility.py 82.82% <0.00%> (-2.34%) ⬇️
...python/apache_beam/runners/worker/worker_status.py 75.33% <0.00%> (-1.34%) ⬇️
sdks/python/apache_beam/internal/metrics/metric.py 93.00% <0.00%> (-1.00%) ⬇️
... and 31 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Just adding some initial fly by comments.

sdks/python/apache_beam/yaml/standard_providers.yaml Outdated Show resolved Hide resolved
self._service = service
self._schema_transforms = None

def provided_transforms(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be "provided_transform_urns" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, these aren't the URNs but the "friendly" names.

sdks/python/apache_beam/yaml/yaml_provider.py Show resolved Hide resolved
Copy link
Contributor Author

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated per some of the feedback.

sdks/python/apache_beam/yaml/yaml_provider.py Show resolved Hide resolved
sdks/python/apache_beam/yaml/standard_providers.yaml Outdated Show resolved Hide resolved
@robertwb robertwb marked this pull request as ready for review December 20, 2022 00:41
@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @AnandInguva for label python.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@github-actions
Copy link
Contributor

github-actions bot commented Jan 4, 2023

Reminder, please take a look at this pr: @AnandInguva

@github-actions
Copy link
Contributor

github-actions bot commented Jan 9, 2023

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @damccorm for label python.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@damccorm
Copy link
Contributor

damccorm commented Jan 9, 2023

stop reviewer notifications

@damccorm
Copy link
Contributor

damccorm commented Jan 9, 2023

(looks like Cham has been reviewing, I'll let him take the review side to completion)

@github-actions
Copy link
Contributor

github-actions bot commented Jan 9, 2023

Stopping reviewer notifications for this pull request: requested by reviewer

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

sdks/python/apache_beam/yaml/main.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/yaml/main.py Outdated Show resolved Hide resolved
pipeline_spec = yaml.load(
known_args.pipeline_spec, Loader=yaml_transform.SafeLineLoader)
else:
raise ValueError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this check is not done at the correct location (it will be triggered if just known_args.pipeline_spec_file is set).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

known_args.pipeline_spec is was populated from the file above in that case. But this didn't catch the case where both were set. Spelling it out more explicitly now.

sdks/python/apache_beam/yaml/yaml_provider.py Outdated Show resolved Hide resolved
sdks/python/apache_beam/yaml/yaml_provider.py Outdated Show resolved Hide resolved
def __init__(self, spec, providers={}): # pylint: disable=dangerous-default-value
if isinstance(spec, str):
spec = yaml.load(spec, Loader=SafeLineLoader)
self._spec = spec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can/should we validate the spec somehow before accepting it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a schema that we can validate against.

self._schema_transforms = []
urn = self._urns[type]
if urn in self._schema_transforms:
return external.SchemaAwareExternalTransform(urn, self._service, **args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting that this will limit Yaml-based pipelines to portable runners (which is fine but should be documented).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but all available runners in Python are portable runners (and I'd be tempted to say that the other runners aren't "real" beam Runners :-). Is there somewhere specific you'd want this mentioned?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a good place here but this should go to Yaml runner documentation when we have it.

input:
elements: input
transforms:
- type: PyMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any meaning to deeper levels of nesting or should all transforms listed at the top level ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates a composite transform. One could also do

YamlTransform(
    '''
    type: PyMap
    name: Cube
    '''
)

to get a single transform. I agree the input here is a bit hacky, dropped a TODO to think about this.

assert_that(result, equal_to([1, 4, 9, 1, 8, 27]))

def test_chain_with_input(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add tests for other types of providers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but that requires invoking the whole cross-langauge testing infrastructure framework, and I was hoping to run these as unit tests (mostly validating the parsing and construction).

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. LGTM.

Seems like we might need to add more tests and documentation but those can be added in follow up PRs.

self._schema_transforms = []
urn = self._urns[type]
if urn in self._schema_transforms:
return external.SchemaAwareExternalTransform(urn, self._service, **args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a good place here but this should go to Yaml runner documentation when we have it.

return constructor >> FullyQualifiedNamedTransform(
constructor, args, kwargs)

class Flatten(beam.PTransform):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Let's add a comment here regarding needing the extra Flatten to support one or zero PCollections.

@robertwb
Copy link
Contributor Author

Thanks!

@robertwb robertwb merged commit 6b8f8c0 into apache:master Jan 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants