Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Task Flow and enhance dynamic task mapping #314

Merged
merged 10 commits into from
Dec 6, 2024

Conversation

tatiana
Copy link
Collaborator

@tatiana tatiana commented Dec 5, 2024

Implement support for Airflow TaskFlow, available since 2.0.

How to test

The following example defines a task that generates a list of numbers and another that consumes this list and creates dynamically (using Airflow dynamic task mapping) an independent task that doubles each individual number.

example_taskflow:
  default_args:
    owner: "custom_owner"
    start_date: 2 days
  description: "Example of TaskFlow powered DAG that includes dynamic task mapping."
  schedule_interval: "0 3 * * *"
  default_view: "graph"
  tasks:

    numbers_list:
      decorator: airflow.decorators.task
      python_callable: sample.build_numbers_list

    double_number_with_dynamic_task_mapping_taskflow:
      decorator: airflow.decorators.task
      python_callable: sample.double
      expand:
          number: +numbers_list  # the prefix + tells DagFactory to resolve this value as the task `numbers_list`, previously defined

For the sample.py file below:

def build_numbers_list():
    return [2, 4, 6]


def double(number: int):
    result = 2 * number
    print(result)
    return result

In the UI, it is shown as:
Screenshot 2024-12-06 at 11 53 04

And:

Screenshot 2024-12-06 at 11 52 28

Scope

This PR includes several use cases of dynamic task mapping:

  1. Simple mapping
  2. Task-generated mapping
  3. Repeated mapping
  4. Adding parameters that do not expand (partial)
  5. Mapping over multiple parameters
  6. Named mapping (map_index_template)

The following dynamic task mapping cases were not tested but are expected to work:

  • Mapping with non-TaskFlow operators
  • Mapping over the result of classic operators
  • Filtering items from a mapped task

The following dynamic task mapping cases were not tested and should not work (they were considered outside of the scope of the current ticket):

  • Assigning multiple parameters to a non-TaskFlow operator
  • Mapping over a task group
  • Transforming expanding data
  • Combining upstream data (aka “zipping”)

Tests

The feature is being tested by running the example DAGs introduced in this PR, which validate various scenarios of task flow and dynamic task mapping and serve as documentation.

As with other parts of DAG Factory, we can and should improve the overall unit test coverage.

Two example DAG files were added, containing multiple examples of TaskFlow and Dynamic Task mapping. This is how they are displayed in the AIrflow UI:
Screenshot 2024-12-06 at 16 11 10
Screenshot 2024-12-06 at 16 11 42
Screenshot 2024-12-06 at 16 11 32

Docs

This PR does not contain user-facing docs other than the README. However, we'll address this as part of #278.

Related issues

This PR closes two open tickets:

Closes: #302 (support named mapping, via the map_index_template argument)

Example of usage of map_index_template:

    dynamic_task_with_named_mapping:
      decorator: airflow.decorators.task
      python_callable: sample.extract_last_name
      map_index_template: "{{ custom_mapping_key }}"
      expand:
        full_name:
          - Lucy Black
          - Vera Santos
          - Marks Spencer

Closes: #301 (Mapping over multiple parameters)

Example of multiple parameters:

    multiply_with_multiple_parameters:
      decorator: airflow.decorators.task
      python_callable: sample.multiply
      expand:
          a: +numbers_list  # the prefix + tells DagFactory to resolve this value as the task `numbers_list`, previously defined
          b: +another_numbers_list # the prefix + tells DagFactory to resolve this value as the task `another_numbers_list`, previously defined

Using explicit args, kwargs and kwargs that rely on other taskflow tasks.
example_taskflow:
  default_args:
    owner: "custom_owner"
    start_date: 2 days
  description: "Example of TaskFlow powered DAG that includes dynamic task mapping"
  schedule_interval: "0 3 * * *"
  default_view: "graph"
  tasks:
    some_number:
      decorator: airflow.decorators.task
      python_callable: "sample.some_number"
    numbers_list:
      decorator: airflow.decorators.task
      python_callable_name: build_numbers_list
      python_callable_file: $CONFIG_ROOT_DIR/sample.py
    double_number_from_arg:
      decorator: airflow.decorators.task
      python_callable: "sample.double"
      number: 2
    double_number_from_task:
      decorator: airflow.decorators.task
      python_callable: "sample.double"
      number: some_number  # this is a task previously defined
@tatiana tatiana force-pushed the dynamic-task-mapping-with-taskflow branch from 2a2f978 to 1879270 Compare December 6, 2024 12:01
dagfactory/dagbuilder.py Outdated Show resolved Hide resolved
dagfactory/dagbuilder.py Outdated Show resolved Hide resolved
@tatiana tatiana changed the title Add support to TaskFlow Add support to TaskFlow and improve dynamic task mapping support Dec 6, 2024
@tatiana tatiana marked this pull request as ready for review December 6, 2024 12:35
@tatiana tatiana requested a review from a team as a code owner December 6, 2024 12:35
Copy link
Contributor

@pankajastro pankajastro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I'm really excited for for this feature

dagfactory/dagbuilder.py Show resolved Hide resolved
dagfactory/dagbuilder.py Outdated Show resolved Hide resolved
dagfactory/dagbuilder.py Outdated Show resolved Hide resolved
@tatiana tatiana mentioned this pull request Dec 6, 2024
@tatiana tatiana changed the title Add support to TaskFlow and improve dynamic task mapping support Support Task Flow and improve dynamic task mapping support Dec 6, 2024
@tatiana tatiana changed the title Support Task Flow and improve dynamic task mapping support Support Task Flow and enhance dynamic task mapping Dec 6, 2024
@tatiana tatiana force-pushed the dynamic-task-mapping-with-taskflow branch from 3f9af2f to aa0ef3e Compare December 6, 2024 15:57
@codecov-commenter
Copy link

codecov-commenter commented Dec 6, 2024

Codecov Report

Attention: Patch coverage is 89.47368% with 10 lines in your changes missing coverage. Please review.

Project coverage is 93.29%. Comparing base (80b885e) to head (aa0ef3e).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
dagfactory/dagbuilder.py 89.47% 10 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #314      +/-   ##
==========================================
+ Coverage   92.69%   93.29%   +0.59%     
==========================================
  Files          10       10              
  Lines         726      776      +50     
==========================================
+ Hits          673      724      +51     
+ Misses         53       52       -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@tatiana tatiana merged commit 1f6525c into main Dec 6, 2024
67 checks passed
@tatiana tatiana deleted the dynamic-task-mapping-with-taskflow branch December 6, 2024 16:14
tatiana added a commit that referenced this pull request Dec 6, 2024
### Added

- Add support to TaskFlow and improve dynamic task mapping support by
@tatiana in #314
- Render YML DAG config as DAG Docs by @pankajastro #305
- Support building DAGs out of topologically unsorted YAML files by
@tatiana in #307
- Add support for nested task groups by @glazunov996 and @pankajastro in
#292
- Add support for templating `on_failure_callback` by @jroach-astronomer
#252

### Fixed

- Fix compatibility with
apache-airflow-providers-cncf-kubernetes>=10.0.0 by @tatiana in #311
- Refactor telemetry to collect events during DAG run and not during DAG
parsing by @pankajastro #300

### Docs

- Fix reference for HttpSensor in README.md by @pankajastro in #277
- Add example DAG for task group by @pankajastro in #293
- Add CODEOWNERS by @pankajkoti in #270
- Update CODEOWNERS to track all files by @pankajkoti in #276
- Modified Status badge in README by @jaejun #298

### Others

- Refactor dynamic task mapping implementation by @tatiana in #313
- Remove pytest durations from tests by @tatiana in #309
- Remove DAG retries check since many DAGs have different retry values
by @tatiana in #310
- Lint fixes after running `pre-commit run --all-files` by @tatiana in
#312
- Remove redundant exception code by @pankajastro #294
- Add GitHub issue template for bug reports and feature requests by
@pankajkoti in #269

Closes: #223
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants