Macros in the Airflow Python operator

In my opinion a more native Airflow way of approaching this would be to use the included PythonOperator and use the provide_context=True parameter as such.

t1 = MyPythonOperator(
    task_id='temp_task',
    python_callable=temp_def,
    provide_context=True,
    dag=dag)

Now you have access to all of the macros, airflow metadata and task parameters in the kwargs of your callable

def temp_def(**kwargs):
    print 'ds={}, execution_date={}'.format((str(kwargs['ds']), str(kwargs['execution_date']))

If you had some custom defined params associated with the task you could access those as well via kwargs['params']


Macros only get processed for templated fields. To get Jinja to process this field, extend the PythonOperator with your own.

class MyPythonOperator(PythonOperator):
    template_fields = ('templates_dict','op_args')

I added 'templates_dict' to the template_fields because the PythonOperator itself has this field templated: PythonOperator

Now you should be able to use a macro within that field:

ds = '{{ ds }}'
mm = '{{ execution_date }}'

t1 = MyPythonOperator(
    task_id='temp_task',
    python_callable=temp_def,
    op_args=[mm , ds],
    provide_context=False,
    dag=dag)

Tags:

Jinja2

Airflow