-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
Other Airflow 2 version (please specify below)
What happened
When using the delete button in the UI or running the airflow db clean on the XCom table the delete method does not seem to get triggered for a custom backend.
What you think should happen instead
Expect the delete method to be called so the custom backend can remove the linked file.
How to reproduce
Using this custom backend
import warnings
from typing import Any, Iterable, TYPE_CHECKING
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models.xcom import BaseXCom
import json
import os
from airflow.utils.helpers import exactly_one
from airflow.utils.json import XComDecoder, XComEncoder
from airflow.utils.session import NEW_SESSION, provide_session
from sqlalchemy.orm import Session
class CustomXComBackendEFS(BaseXCom):
DATA_FILE_PATH = '/opt/airflow/efs/data_files/xcoms'
PREFIX = 'efs://'
@staticmethod
def serialize_value(
value: Any,
*,
key: str | None = None,
task_id: str | None = None,
dag_id: str | None = None,
run_id: str | None = None,
map_index: int | None = None,
) -> Any:
filename = "xcom_" + run_id.replace(':', '_').replace('+', '_') + ".json"
file_path = f"{CustomXComBackendEFS.DATA_FILE_PATH}/{dag_id}/{task_id}"
os.makedirs(file_path, exist_ok=True)
with open(file_path + '/' + filename, 'w') as handle:
handle.write(json.dumps(value, cls=XComEncoder))
handle.flush()
return BaseXCom.serialize_value(value=f'{CustomXComBackendEFS.PREFIX}{file_path}/{filename}')
# noinspection PyUnusedLocal
@staticmethod
def deserialize_value(result) -> Any:
file_path = BaseXCom.deserialize_value(result=result)
if isinstance(file_path, str):
if file_path.startswith(CustomXComBackendEFS.PREFIX):
file_path = file_path.replace(CustomXComBackendEFS.PREFIX, '')
if file_path.startswith(CustomXComBackendEFS.DATA_FILE_PATH):
with open(file_path, 'r') as handle:
content = handle.read()
return json.loads(content, cls=XComDecoder)
return result
@classmethod
@provide_session
def delete(cls, xcoms, session: Session) -> None:
if isinstance(xcoms, XCom):
xcoms = [xcoms]
for xcom in xcoms:
file_path = BaseXCom.deserialize_value(result=xcom)
if isinstance(file_path, str):
if file_path.startswith(CustomXComBackendEFS.PREFIX):
file_path = file_path.replace(CustomXComBackendEFS.PREFIX, '')
if file_path.startswith(CustomXComBackendEFS.DATA_FILE_PATH):
os.unlink(file_path)
BaseXCom.delete(xcoms, session)
@classmethod
@provide_session
def clear(
cls,
execution_date=None,
dag_id=None,
task_id=None,
session=NEW_SESSION,
*,
run_id=None,
map_index=None,
) -> None:
from airflow.models import DagRun
if dag_id is None:
raise TypeError("clear() missing required argument: dag_id")
if task_id is None:
raise TypeError("clear() missing required argument: task_id")
if not exactly_one(execution_date is not None, run_id is not None):
raise ValueError(
f"Exactly one of run_id or execution_date must be passed. "
f"Passed execution_date={execution_date}, run_id={run_id}"
)
if execution_date is not None:
message = "Passing 'execution_date' to 'XCom.clear()' is deprecated. Use 'run_id' instead."
warnings.warn(message, RemovedInAirflow3Warning, stacklevel=3)
run_id = (
session.query(DagRun.run_id)
.filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date)
.scalar()
)
filename = "xcom_" + run_id.replace(':', '_').replace('+', '_') + ".json"
file_path = f"{CustomXComBackendEFS.DATA_FILE_PATH}/{dag_id}/{task_id}"
if os.path.exists(f"{file_path}/{filename}"):
os.unlink(f"{file_path}/{filename}")
query = session.query(cls).filter_by(dag_id=dag_id, task_id=task_id, run_id=run_id)
if map_index is not None:
query = query.filter_by(map_index=map_index)
query.delete()The delete method never seems to be called. The clear method works as i can see the XCom disappear and reappear.
Operating System
Linux 9e28031d19f0 5.15.49-linuxkit #1 SMP PREEMPT Tue Sep 13 07:51:32 UTC 2022 x86_64 GNU/Linux
Versions of Apache Airflow Providers
apache-airflow 2.5.3
apache-airflow-providers-amazon 8.0.0
apache-airflow-providers-asana 2.1.0
apache-airflow-providers-celery 3.1.0
apache-airflow-providers-cncf-kubernetes 6.1.0
apache-airflow-providers-common-sql 1.4.0
apache-airflow-providers-databricks 4.1.0
apache-airflow-providers-ftp 3.3.1
apache-airflow-providers-google 10.0.0
apache-airflow-providers-http 4.3.0
apache-airflow-providers-imap 3.1.1
apache-airflow-providers-jdbc 3.3.0
apache-airflow-providers-mysql 5.0.0
apache-airflow-providers-postgres 5.4.0
apache-airflow-providers-redis 3.1.0
apache-airflow-providers-salesforce 5.3.0
apache-airflow-providers-slack 7.2.0
apache-airflow-providers-snowflake 4.0.5
apache-airflow-providers-sqlite 3.3.2
apache-airflow-providers-ssh 3.6.0
google-cloud-orchestration-airflow 1.4.1
Deployment
Official Apache Airflow Helm Chart
Deployment details
I'm testing this in Docker using a custom image that we deploy onto a Kubernetes cluster. It is designed to store the XComs on an efs volume attached to it. Locally in docker, the location is a mapped volume onto my local HD so I can see the files.
Anything else
every time
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct