Airflow ExternalTaskSensor gets stuck
To clarify something I've seen here and on other related questions, the dags don't necessarily have to run on the same schedule, as stated in the accepted answer. The dags also don't need to have the same start_date
. If you create your ExternalTaskSensor
task without the execution_delta
or execution_date_fn
, then the two dags need to have the same execution date. It so happens that if two dags have the same schedule, the scheduled runs in each interval will have the same execution date. I'm not sure what the execution date would be for manually triggered runs of scheduled dags.
For this example to work, dag b
's ExternalTaskSensor
task needs an execution_delta
or execution_date_fn
parameter. If using an execution_delta
parameter, it should be such that b
's execution date - execution_delta
= a
's execution date. If using execution_date_fn
, then that function should return a
's execution date.
If you were using the TriggerDagRunOperator
, then using an ExternalTaskSensor
to detect when that dag completed, you can do something like passing in the main dag's execution date to the triggered one with the TriggerDagRunOperator
's execution_date
parameter, like execution_date='{{ execution_date }}'
. Then the execution date of both dags would be the same, and you wouldn't need the schedules to be the same for each dag, or to use the execution_delta
or execution_date_fn
sensor parameters.
The above was written and tested on Airflow 1.10.9
From my successful case:
default_args = {
'owner': 'xx',
'retries': 2,
'email': ALERT_EMAIL_ADDRESSES,
'email_on_failure': True,
'email_on_retry': False,
'retry_delay': timedelta(seconds=30),
# avoid stopping tasks after one day
'depends_on_past': False,
}
dag = DAG(
dag_id = dag_id,
# get the datetime type value
start_date = pendulum.strptime(current_date, "%Y, %m, %d, %H").astimezone('Europe/London').subtract(hours=1),
description = 'xxx',
default_args = default_args,
schedule_interval = timedelta(hours=1),
)
...
external_sensor= ExternalTaskSensor(
task_id='ext_sensor_task_update_model',
external_dag_id='xxx',
external_task_id='xxx'.format(log_type),
# set the task_id to None because of the end_task
# external_task_id = None,
dag=dag,
timeout = 300,
)
...
You can wait until the successful automatic trigger for the tasks. Don't do it manually, the start_date will be different.
ExternalTaskSensor
assumes that you are dependent on a task in a dag run with the same execution date.
This means that in your case dags a
and b
need to run on the same schedule (e.g. every day at 9:00am or w/e).
Otherwise you need to use the execution_delta
or execution_date_fn
when you instantiate an ExternalTaskSensor
.
Here is the documentation inside the operator itself to help clarify further:
:param execution_delta: time difference with the previous execution to
look at, the default is the same execution_date as the current task.
For yesterday, use [positive!] datetime.timedelta(days=1). Either
execution_delta or execution_date_fn can be passed to
ExternalTaskSensor, but not both.
:type execution_delta: datetime.timedelta
:param execution_date_fn: function that receives the current execution date
and returns the desired execution date to query. Either execution_delta
or execution_date_fn can be passed to ExternalTaskSensor, but not both.
:type execution_date_fn: callable
As of Airflow v1.10.7, tomcm's answer is not true (at least for this version). One should use execution_delta
or execution_date_fn
to determine the date AND schedule of the external DAG if they do not have the same schedule.