Macros in the Airflow Python operator
In my opinion a more native Airflow way of approaching this would be to use the included PythonOperator and use the provide_context=True
parameter as such.
t1 = MyPythonOperator(
task_id='temp_task',
python_callable=temp_def,
provide_context=True,
dag=dag)
Now you have access to all of the macros, airflow metadata and task parameters in the kwargs
of your callable
def temp_def(**kwargs):
print 'ds={}, execution_date={}'.format((str(kwargs['ds']), str(kwargs['execution_date']))
If you had some custom defined params
associated with the task you could access those as well via kwargs['params']
Macros only get processed for templated fields. To get Jinja to process this field, extend the PythonOperator
with your own.
class MyPythonOperator(PythonOperator):
template_fields = ('templates_dict','op_args')
I added 'templates_dict'
to the template_fields
because the PythonOperator
itself has this field templated:
PythonOperator
Now you should be able to use a macro within that field:
ds = '{{ ds }}'
mm = '{{ execution_date }}'
t1 = MyPythonOperator(
task_id='temp_task',
python_callable=temp_def,
op_args=[mm , ds],
provide_context=False,
dag=dag)