-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
do not push stale update to related DagRun on TI update after task execution #44547
base: main
Are you sure you want to change the base?
do not push stale update to related DagRun on TI update after task execution #44547
Conversation
b134561
to
cfb8b88
Compare
cfb8b88
to
a77aafa
Compare
airflow/models/taskinstance.py
Outdated
@@ -1127,7 +1127,8 @@ def _handle_failure( | |||
) | |||
|
|||
if not test_mode: | |||
TaskInstance.save_to_db(failure_context["ti"], session) | |||
task_instance.dag_run.refresh_from_db() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the DagRun always stale here? Or does this only applied for some code paths? I’m just trying to understand the context of this and wondering if we should check either task_instance
and/or dag_run
is in session
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is always stale if updated in the meantime by scheduler/webserver, so when it gets manually marked as failed the database is updated with stale data. TI is updated a bit before that, in TaskInstance.fetch_handle_failure_context
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@uranusjr do you think there's anything else to do here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about this one. Here you refresh the DagRun and not the TI from DB.
In the error description/PR Header you write the task instance might be updated twice. But usually when you mark a TI as "failed" manually it is terminated. Is this error situation ALWAYS happening or only in a race when you mark it manually and the TI is close to completion?
And are you talking about on_error callback on the DAG or on the TI? Assuming if you mark failed it is on TI level and not the DAG itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about this one. Here you refresh the DagRun and not the TI from DB. In the error description/PR Header you write the task instance might be updated twice.
Ah yes, I did not write this clearly enough - I thought about reading the description together with title - it's the DagRun that's update twice.
But usually when you mark a TI as "failed" manually it is terminated. Is this error situation ALWAYS happening or only in a race when you mark it manually and the TI is close to completion?
This is also happening when task is terminated, it still goes into handle_failure
method:
except (AirflowTaskTimeout, AirflowException, AirflowTaskTerminated) as e:
if not test_mode:
ti.refresh_from_db(lock_for_update=True, session=session)
# for case when task is marked as success/failed externally
# or dagrun timed out and task is marked as skipped
# current behavior doesn't hit the callbacks
if ti.state in State.finished:
ti.clear_next_method_args()
TaskInstance.save_to_db(ti=ti, session=session)
return None
else:
ti.handle_failure(e, test_mode, context, session=session)
raise
And are you talking about on_error callback on the DAG or on the TI? Assuming if you mark failed it is on TI level and not the DAG itself?
I've been testing it with listeners, but it affects callbacks as well.
- you can see the line starting with
DAG clear_test failed at 2024-12-17 12:23:56.974528+00:00. Run ID: manual__2024-12-17T12:23:56.974528+00:00
happens twice. This is the DAG I'm testing it with.
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime, timedelta
import time
def wait_function():
time.sleep(10)
def notify_failure(context):
dag_id = context['dag'].dag_id
run_id = context['run_id']
logical_date = context['logical_date']
log_url = context['task_instance'].log_url
print(f"DAG {dag_id} failed at {logical_date}. Run ID: {run_id}. See logs: {log_url}")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 10, 1),
'email_on_failure': False,
'email_on_retry': False
}
with DAG(
'clear_test',
default_args=default_args,
description='A simple example DAG',
on_failure_callback=notify_failure,
catchup=False,
) as dag:
start_task = EmptyOperator(
task_id='start_task',
)
wait_task = PythonOperator(
task_id='wait_task',
python_callable=wait_function,
)
start_task >> wait_task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay as I am not an expert in this area... used your DAG and the outcome is mixed.
When I let the task "normal" fail, the call back executed once. If I manually mark as fail the call back is executed two times - also with the patch applied. So in my view it is not fixing the problem.
I tested with main and CeleryExecutor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jscheffl can you recheck? I've moved the refresh to save_to_db
method - to cover more cases - and reverified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As said... I am not an expert. Last feedback was just from manual tests. So naive view on code looks good.
But some pytests are failing. Can you correct this? Then I could re-review.
a77aafa
to
2aeb4db
Compare
cae5cd6
to
c1e0087
Compare
…ecution Signed-off-by: Maciej Obuchowski <[email protected]>
c1e0087
to
ea2969b
Compare
When TI is marked as failed via UI, and later fails itself, scheduler changes the state to failed twice.
(some extra logs for clarity)
This happens because on
handle_failure
,TaskInstance.save_to_db
not only updates state of that task instance, it also pushes stale DagRun state - the one it got on TI start. So the actual DR state goesrunning
->failed
->running
->failed
.This causes other unintended behavior, such as calling
on_dag_run_failed
listeners twice.The solution just loads DR state from db before pushing TI state. However, there probably is better solution, that someone with more knowledge of SQLAlchemy might help with.
Link to discussion on Airflow slack: https://apache-airflow.slack.com/archives/C06K9Q5G2UA/p1732805503889679