Airflow - How to pass xcom variable into Python function
Upvoted both the question and the answer, but I think that this can be made a little more clear for those users who just want to pass small data objects between PythonOperator
tasks in their DAGs. Referencing this question and this XCom example got me to the following solution. Super simple:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
DAG = DAG(
dag_id='example_dag',
start_date=datetime.now(),
schedule_interval='@once'
)
def push_function(**kwargs):
ls = ['a', 'b', 'c']
return ls
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=DAG)
def pull_function(**kwargs):
ti = kwargs['ti']
ls = ti.xcom_pull(task_ids='push_task')
print(ls)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=DAG)
push_task >> pull_task
I'm not sure why this works, but it does. A few questions for the community:
- What's happening with
ti
here? How is that built in to**kwargs
? - Is
provide_context=True
necessary for both functions?
Any edits to make this answer clearer are very welcome!
Templates like {{ ti.xcom_pull(...) }}
can only be used inside of parameters that support templates or they won't be rendered prior to execution. See the template_fields
, template_fields_renderers
and template_ext
attributes of the PythonOperator and BashOperator.
So templates_dict
is what you use to pass templates to your python operator:
def func_archive_s3_file(**context):
archive(context['templates_dict']['s3_path_filename'])
task_archive_s3_file = PythonOperator(
task_id='archive_s3_file',
dag=dag,
python_callable=obj.func_archive_s3_file,
provide_context=True, # must pass this because templates_dict gets passed via context
templates_dict={'s3_path_filename': "{{ ti.xcom_pull(task_ids='submit_file_to_spark') }}" })
However in the case of fetching an XCom value, another alternative is just using the TaskInstance
object made available to you via context:
def func_archive_s3_file(**context):
archive(context['ti'].xcom_pull(task_ids='submit_file_to_spark'))
task_archive_s3_file = PythonOperator(
task_id='archive_s3_file',
dag=dag,
python_callable=obj.func_archive_s3_file,
provide_context=True,