How to skip tasks on Airflow?
You can incorporate the SkipMixin that the ShortCircuitOperator uses under the hood to skip downstream tasks.
from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults
class mySkippingOperator(BaseOperator, SkipMixin)
@apply_defaults
def __init__(self,
condition,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.condition = condition
def execute(self, context):
if self.condition:
self.log.info('Proceeding with downstream tasks...')
return
self.log.info('Skipping downstream tasks...')
downstream_tasks = context['task'].get_flat_relatives(upstream=False)
self.log.debug("Downstream task_ids %s", downstream_tasks)
if downstream_tasks:
self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
self.log.info("Done.")
Yes, you just click on task 3. Toggle the check boxes to the right of the run button to ignore dependencies, then click run.
yes, you can do this by another ad-hoc basis. Found it somehow!!
You need to raise AirflowSkipException
from airflow.exceptions import AirflowSkipException
def execute():
if condition:
raise AirflowSkipException
task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)