How to mark an Airflow DAG run as failed if any task fails?
Facing a similar problem. It is not a bug but it could be a nice feature to add this property to Dag.
As a workaround, I you can push a XCOM variable during the task that is allowed to fail and in the downstream tasks do something like
if ti.xcom_pull(key='state', task_ids=task_allowed_to_fail_id) == 'FAILED':
raise ValueError('Force failure because upstream task has failed')
Another solution can be to add a final PythonOperator that checks the status of all tasks in this run:
final_status = PythonOperator(
task_id='final_status',
provide_context=True,
python_callable=final_status,
trigger_rule=TriggerRule.ALL_DONE, # Ensures this task runs even if upstream fails
dag=dag,
)
def final_status(**kwargs):
for task_instance in kwargs['dag_run'].get_task_instances():
if task_instance.current_state() != State.SUCCESS and \
task_instance.task_id != kwargs['task_instance'].task_id:
raise Exception("Task {} failed. Failing this DAG run".format(task_instance.task_id))