Skip to content

DockerSwarmOperator retrieve_output XCom not working #41445

@rgriffier

Description

@rgriffier

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.10.0rc1

What happened?

Using a DockerSwarmOperator, with retrieve_output=True and retrieve_output_path='/path/to/pickle/file', the content of the file located at /path/to/pickle/file is not set as XCom prior to deletion of the container.

This functionality is described in the DockerOperatordocumentation. As DockerSwarmOperator inherits from DockerOperator (cf. source), I was expecting the retrieve_output feature to work using DockerSwarmOperator.

What you think should happen instead?

  • The same behavior as when using DockerOperator, i.e. the contents of the file located at /path/to/pickle/file is set as XCom ("return_value" key) when retrieve_output=True and retrieve_output_path='/path/to/pickle/file'.

How to reproduce

Below is an example of how to reproduce the issue.
The docker_url argument must be adapted to the deployment context.

The DAG is made up of 2 identical tasks, using either a DockerOperator (write_xcom_docker) or a DockerSwarmOperator (write_xcom_docker_swarm). In these two tasks :

  • A python script writes a dictionary to a Pickle file (/tmp/variable.pickle).
  • The path to this Pickle file is associated with the retrieve_output_path argument in the Docker(Swarm)Operator.
from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta

from docker import types
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.docker.operators.docker_swarm import DockerSwarmOperator

params = {
    'dag_id': 'test_xcom_docker_docker_swarm',
    'catchup': False,
    'max_active_runs': 1,
    'default_args': {
        'owner': 'airflow',
        'start_date': days_ago(1),
        'retries': 1,
        'retry_delay': timedelta(minutes=5)
    }
}

with DAG(**params) as dag:
    write_xcom_docker = DockerOperator(
        task_id='write_xcom_docker',
        image='python:latest',
        api_version='auto',
        command="""python -c '

import os
import pickle

capitals = {
  "Canada": "Ottawa", 
  "England": "London",
  "France": "Paris",
  "Germany": "Berlin", 
}

file_path = "/tmp/variable.pickle"
with open(file_path, "wb+") as file:
    pickle.dump(capitals, file)

    '
    """,
        retrieve_output=True,
        retrieve_output_path='/tmp/variable.pickle')

    write_xcom_docker_swarm = DockerSwarmOperator(
        task_id='write_xcom_docker_swarm',
        image='python:latest',
        api_version='auto',
        command="""python -c '

import os
import pickle

capitals = {
  "Canada": "Ottawa", 
  "England": "London",
  "France": "Paris",
  "Germany": "Berlin", 
}

file_path = "/tmp/variable.pickle"
with open(file_path, "wb+") as file:
    pickle.dump(capitals, file)

    '
    """,
        retrieve_output=True,
        retrieve_output_path='/tmp/variable.pickle',
        mode=types.ServiceMode(mode="replicated", replicas=2))

write_xcom_docker >> write_xcom_docker_swarm
  • For the task using a DockerOperator, the content of the Pickle file is set as XCom as expected.

image

  • For the task using a DockerSwarmOperator, nothing is set as XCom.

image

Operating System

Docker host : Debian GNU/Linux 11 (bullseye)
Container OS : Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

apache-airflow-providers-docker==3.12.3

Deployment

Other Docker-based deployment

Deployment details

Airflow was deployed using Docker Swarm, based on Docker version 26.1.4 (build 5650f9b)

Anything else?

The issue has also been tested in version 2.9.2 (using apache-airflow-providers-docker==3.12.0), with the same result.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions