How can you re-run upstream task if a downstream task fails in Airflow (using Sub Dags)

I've found a solution to this by creating a retry on callback method in main dag:

(original source: https://gist.github.com/nathairtras/6ce0b0294be8c27d672e2ad52e8f2117 )

from airflow.models import DagBag

def callback_subdag_clear(context):
    """Clears a subdag's tasks on retry."""
    dag_id = "{}.{}".format(
        context['dag'].dag_id,
        context['ti'].task_id
)
    execution_date = context['execution_date']
    sdag = DagBag().get_dag(dag_id)
    sdag.clear(
        start_date=execution_date,
        end_date=execution_date,
        only_failed=False,
        only_running=False,
        confirm_prompt=False,
        include_subdags=False)

Then for my task that runs subdagoperator, it has:

on_retry_callback=callback_subdag_clear,

It now clears out the task instance history of each task and re-runs each task in the sub dag up to the number of retries in the main dag.


There's a simpler alternative. Full snippet

Instead of

dag_id = "{}.{}".format(
    context['dag'].dag_id,
    context['ti'].task_id
)
sdag = DagBag().get_dag(dag_id)

you can do

task = context['task']
sdag = task.subdag

Why?

Because (most likely) your task is related to a SubDagOperator which has a subdag attribute.

I had issues using the solution by Alistair. When I was trying to call clear on the sdag variable I would get an exception because it was None.

I drilled down the issue to improper parsing of Dags while filling the DagBag, which I could not figure out. Instead, I found a workaround by looking into what was passed in the context and noticing that it has a reference to the task which has the subdag attribute as long as it comes from a SubDag operator