Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data access in task_name.output when expanding #207

Closed
dpro-shc opened this issue Jul 25, 2024 · 1 comment
Closed

Data access in task_name.output when expanding #207

dpro-shc opened this issue Jul 25, 2024 · 1 comment
Assignees
Labels
customer enhancement New feature or request

Comments

@dpro-shc
Copy link

Let's say we have two tasks, extract and load where extract returns a list of json objects say, [{"foo":"dan","bar":"shc"},{"foo":"alf","bar":"shc"}], and load expands on the output of extract

tasks:
    extract
      operator: airflow.operators.python.PythonOperator
      python_callable_name: extract_fn
      python_callable_file: /usr/local/airflow/dags/extractLoad/utils/python_callables/bronze_tasks.py

    load:
      operator: airflow.operators.python.PythonOperator
      python_callable_name: load_fn
      python_callable_file: /usr/local/airflow/dags/extractLoad/utils/python_callables/bronze_tasks.py
      expand:
        input: extract.output
      dependencies: [extract]

This works, but it's not ideal. The solution is sort of clunky and the internals are obscured-- I have to make the signature of the load task callable load_fn(input), one argument and unpack the values in the function. I'm proposing this functionality:

tasks:
    extract
      operator: airflow.operators.python.PythonOperator
      python_callable_name: extract_fn
      python_callable_file: /usr/local/airflow/dags/extractLoad/utils/python_callables/bronze_tasks.py

    load:
      operator: airflow.operators.python.PythonOperator
      python_callable_name: load_fn
      python_callable_file: /usr/local/airflow/dags/extractLoad/utils/python_callables/bronze_tasks.py
      expand:
        foo: extract.output.foo
        bar: extract.output.bar
      dependencies: [extract]
      

so that I can leave the signature of the load callable load_fn(foo,bar).

@tatiana
Copy link
Collaborator

tatiana commented Aug 15, 2024

Hi @dpro-shc,

I initially thought this feature was supported by Airflow dynamic task mapping, but I just realised it is not. I'm still learning about DAG Factory. However, I understand that the DAG factory is an abstraction for representing Airflow DAGs as YAMLs. I see this as a layer on top of this. The risk with us adding this type of behaviour is that we'd add additional overhead to the maintenance of DAG Factory with a custom feature that needs to be more generic.

We just released 0.21.0, where we added Dynamic Task mapping - which would probably make it easier to do what you want:

For now, I am closing this issue. If you feel strongly about this feature and would like to contribute, please, reopen this ticket and open a PR. I would be happy to review the PR, and we can discuss it further.

@tatiana tatiana added this to the DAG Factory 0.21.0 milestone Nov 1, 2024
@tatiana tatiana added bug Something isn't working and removed triage-needed labels Nov 29, 2024
@tatiana tatiana self-assigned this Nov 29, 2024
@tatiana tatiana added enhancement New feature or request and removed bug Something isn't working labels Dec 6, 2024
@tatiana tatiana closed this as completed Dec 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
customer enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants