Skip to content

XCom delete in ui and db clean do not trigger delete for custom backend #31774

@ronald-fenner

Description

@ronald-fenner

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

When using the delete button in the UI or running the airflow db clean on the XCom table the delete method does not seem to get triggered for a custom backend.

What you think should happen instead

Expect the delete method to be called so the custom backend can remove the linked file.

How to reproduce

Using this custom backend

import warnings
from typing import Any, Iterable, TYPE_CHECKING

from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models.xcom import BaseXCom
import json
import os

from airflow.utils.helpers import exactly_one
from airflow.utils.json import XComDecoder, XComEncoder
from airflow.utils.session import NEW_SESSION, provide_session
from sqlalchemy.orm import Session


class CustomXComBackendEFS(BaseXCom):
    DATA_FILE_PATH = '/opt/airflow/efs/data_files/xcoms'
    PREFIX = 'efs://'

    @staticmethod
    def serialize_value(
            value: Any,
            *,
            key: str | None = None,
            task_id: str | None = None,
            dag_id: str | None = None,
            run_id: str | None = None,
            map_index: int | None = None,
    ) -> Any:

        filename = "xcom_" + run_id.replace(':', '_').replace('+', '_') + ".json"
        file_path = f"{CustomXComBackendEFS.DATA_FILE_PATH}/{dag_id}/{task_id}"
        os.makedirs(file_path, exist_ok=True)
        with open(file_path + '/' + filename, 'w') as handle:
            handle.write(json.dumps(value, cls=XComEncoder))
            handle.flush()

        return BaseXCom.serialize_value(value=f'{CustomXComBackendEFS.PREFIX}{file_path}/{filename}')

    # noinspection PyUnusedLocal
    @staticmethod
    def deserialize_value(result) -> Any:
        file_path = BaseXCom.deserialize_value(result=result)
        if isinstance(file_path, str):
            if file_path.startswith(CustomXComBackendEFS.PREFIX):
                file_path = file_path.replace(CustomXComBackendEFS.PREFIX, '')
                if file_path.startswith(CustomXComBackendEFS.DATA_FILE_PATH):
                    with open(file_path, 'r') as handle:
                        content = handle.read()
                        return json.loads(content, cls=XComDecoder)

        return result

    @classmethod
    @provide_session
    def delete(cls, xcoms, session: Session) -> None:
        if isinstance(xcoms, XCom):
            xcoms = [xcoms]
        for xcom in xcoms:
            file_path = BaseXCom.deserialize_value(result=xcom)
            if isinstance(file_path, str):
                if file_path.startswith(CustomXComBackendEFS.PREFIX):
                    file_path = file_path.replace(CustomXComBackendEFS.PREFIX, '')
                    if file_path.startswith(CustomXComBackendEFS.DATA_FILE_PATH):
                        os.unlink(file_path)
        BaseXCom.delete(xcoms, session)

    @classmethod
    @provide_session
    def clear(
            cls,
            execution_date=None,
            dag_id=None,
            task_id=None,
            session=NEW_SESSION,
            *,
            run_id=None,
            map_index=None,
    ) -> None:
        from airflow.models import DagRun

        if dag_id is None:
            raise TypeError("clear() missing required argument: dag_id")
        if task_id is None:
            raise TypeError("clear() missing required argument: task_id")

        if not exactly_one(execution_date is not None, run_id is not None):
            raise ValueError(
                f"Exactly one of run_id or execution_date must be passed. "
                f"Passed execution_date={execution_date}, run_id={run_id}"
            )

        if execution_date is not None:
            message = "Passing 'execution_date' to 'XCom.clear()' is deprecated. Use 'run_id' instead."
            warnings.warn(message, RemovedInAirflow3Warning, stacklevel=3)
            run_id = (
                session.query(DagRun.run_id)
                .filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date)
                .scalar()
            )

        filename = "xcom_" + run_id.replace(':', '_').replace('+', '_') + ".json"
        file_path = f"{CustomXComBackendEFS.DATA_FILE_PATH}/{dag_id}/{task_id}"
        if os.path.exists(f"{file_path}/{filename}"):
            os.unlink(f"{file_path}/{filename}")

        query = session.query(cls).filter_by(dag_id=dag_id, task_id=task_id, run_id=run_id)
        if map_index is not None:
            query = query.filter_by(map_index=map_index)
        query.delete()

The delete method never seems to be called. The clear method works as i can see the XCom disappear and reappear.

Operating System

Linux 9e28031d19f0 5.15.49-linuxkit #1 SMP PREEMPT Tue Sep 13 07:51:32 UTC 2022 x86_64 GNU/Linux

Versions of Apache Airflow Providers

apache-airflow 2.5.3
apache-airflow-providers-amazon 8.0.0
apache-airflow-providers-asana 2.1.0
apache-airflow-providers-celery 3.1.0
apache-airflow-providers-cncf-kubernetes 6.1.0
apache-airflow-providers-common-sql 1.4.0
apache-airflow-providers-databricks 4.1.0
apache-airflow-providers-ftp 3.3.1
apache-airflow-providers-google 10.0.0
apache-airflow-providers-http 4.3.0
apache-airflow-providers-imap 3.1.1
apache-airflow-providers-jdbc 3.3.0
apache-airflow-providers-mysql 5.0.0
apache-airflow-providers-postgres 5.4.0
apache-airflow-providers-redis 3.1.0
apache-airflow-providers-salesforce 5.3.0
apache-airflow-providers-slack 7.2.0
apache-airflow-providers-snowflake 4.0.5
apache-airflow-providers-sqlite 3.3.2
apache-airflow-providers-ssh 3.6.0
google-cloud-orchestration-airflow 1.4.1

Deployment

Official Apache Airflow Helm Chart

Deployment details

I'm testing this in Docker using a custom image that we deploy onto a Kubernetes cluster. It is designed to store the XComs on an efs volume attached to it. Locally in docker, the location is a mapped volume onto my local HD so I can see the files.

Anything else

every time

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

affected_version:main_branchIssues Reported for main brancharea:UIRelated to UI/UX. For Frontend Developers.kind:bugThis is a clearly a bug

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions