How to delete XCOM objects once the DAG finishes its run in Airflow
Below is the code that worked for me,this will delete xcom of all tasks in DAG(Add task_id to SQL if xcom of only specific task needs to be deleted):
As dag_id is dynamic and dates should follow respective syntax of SQL.
from airflow.operators.postgres_operator import PostgresOperator
delete_xcom_task_inst = PostgresOperator(task_id='delete_xcom',
postgres_conn_id='your_conn_id',
sql="delete from xcom where dag_id= '"+dag.dag_id+"' and date(execution_date)=date('{{ ds }}')"
)
My solution to this problem is:
from airflow.utils.db import provide_session
from airflow.models import XCom
dag = DAG(...)
@provide_session
def cleanup_xcom(**context):
dag = context["dag"]
dag_id = dag._dag_id
session=context["session"]
session.query(XCom).filter(XCom.dag_id == dag_id).delete()
clean_xcom = PythonOperator(
task_id="clean_xcom",
python_callable = cleanup_xcom,
provide_context=True,
dag=dag
)
clean_xcom
In Airflow 2.1.x, the code below likes not to work ...
from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom
@provide_session
def cleanup_xcom(context, session=None):
dag_id = context["ti"]["dag_id"]
session.query(XCom).filter(XCom.dag_id == dag_id).delete()
dag = DAG( ...
on_success_callback=cleanup_xcom,
)
so change to
from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
with DAG(dag_id="cleanup_xcom_demo", schedule_interval=None, start_date=days_ago(2)) as dag:
# cleanup_xcom
@provide_session
def cleanup_xcom(session=None, **context):
dag = context["dag"]
dag_id = dag._dag_id
# It will delete all xcom of the dag_id
session.query(XCom).filter(XCom.dag_id == dag_id).delete()
clean_xcom = PythonOperator(
task_id="clean_xcom",
python_callable = cleanup_xcom,
provide_context=True,
# dag=dag
)
start = DummyOperator(task_id="start")
end = DummyOperator(task_id="end", trigger_rule="none_failed")
start >> clean_xcom >> end
You have to add a task depends on you metadatadb (sqllite, PostgreSql, MySql..) that delete XCOM once the DAG run is finished.
delete_xcom_task = PostgresOperator(
task_id='delete-xcom-task',
postgres_conn_id='airflow_db',
sql="delete from xcom where dag_id=dag.dag_id and
task_id='your_task_id' and execution_date={{ ds }}",
dag=dag)
You can verify your query before you run the dag.
Data Profiling -> Ad Hoc Query -> airflow_db -> query -> Run!
You can perform the cleanup programmatically through sqlalchemy so your solution won't break if the database structure changes:
from airflow.utils.db import provide_session
from airflow.models import XCom
@provide_session
def cleanup_xcom(session=None):
session.query(XCom).filter(XCom.dag_id == "your dag id").delete()
You can also purge old XCom data:
from airflow.utils.db import provide_session
from airflow.models import XCom
from sqlalchemy import func
@provide_session
def cleanup_xcom(session=None):
session.query(XCom).filter(XCom.execution_date <= func.date('2019-06-01')).delete()
If you want to purge the XCom once the dag is finished I think the cleanest solution is to use the "on_success_callback" property of the DAG model class:
from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom
@provide_session
def cleanup_xcom(context, session=None):
dag_id = context["ti"]["dag_id"]
session.query(XCom).filter(XCom.dag_id == dag_id).delete()
dag = DAG( ...
on_success_callback=cleanup_xcom,
)