Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airflow/decorators/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ class TaskDecoratorCollection:
skip_on_exit_code: int | Container[int] | None = None,
port_bindings: dict | None = None,
ulimits: list[dict] | None = None,
labels: dict[str, str] | list[str] | None = None,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to convert the decorated callable to a Docker task.
Expand Down Expand Up @@ -508,6 +509,8 @@ class TaskDecoratorCollection:
Incompatible with ``"host"`` in ``network_mode``.
:param ulimits: List of ulimit options to set for the container. Each item should
be a :py:class:`docker.types.Ulimit` instance.
:param labels: A dictionary of name-value labels (e.g. ``{"label1": "value1", "label2": "value2"}``)
or a list of names of labels to set with empty values (e.g. ``["label1", "label2"]``)
"""
# [END decorator_signature]
def kubernetes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ class DockerOperator(BaseOperator):
Incompatible with ``"host"`` in ``network_mode``.
:param ulimits: List of ulimit options to set for the container. Each item should
be a :py:class:`docker.types.Ulimit` instance.
:param labels: A dictionary of name-value labels (e.g. ``{"label1": "value1", "label2": "value2"}``)
or a list of names of labels to set with empty values (e.g. ``["label1", "label2"]``)
"""

# !!! Changes in DockerOperator's arguments should be also reflected in !!!
Expand Down Expand Up @@ -255,6 +257,7 @@ def __init__(
skip_on_exit_code: int | Container[int] | None = None,
port_bindings: dict | None = None,
ulimits: list[Ulimit] | None = None,
labels: dict[str, str] | list[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand Down Expand Up @@ -301,6 +304,7 @@ def __init__(
self.cap_add = cap_add
self.extra_hosts = extra_hosts
self.ulimits = ulimits or []
self.labels = labels

self.container: dict = None # type: ignore[assignment]
self.retrieve_output = retrieve_output
Expand Down Expand Up @@ -407,6 +411,7 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> list[
working_dir=self.working_dir,
tty=self.tty,
hostname=self.hostname,
labels=self.labels,
)
log_stream = self.cli.attach(container=self.container["Id"], stdout=True, stderr=True, stream=True)
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ def test_execute(self):
tty=True,
hostname=TEST_CONTAINER_HOSTNAME,
ports=[],
labels=None,
)
self.client_mock.create_host_config.assert_called_once_with(
mounts=[
Expand Down Expand Up @@ -299,6 +300,7 @@ def test_execute_no_temp_dir(self):
tty=True,
hostname=TEST_CONTAINER_HOSTNAME,
ports=[],
labels=None,
)
self.client_mock.create_host_config.assert_called_once_with(
mounts=[
Expand Down Expand Up @@ -392,6 +394,7 @@ def test_execute_fallback_temp_dir(self, caplog):
tty=True,
hostname=None,
ports=[],
labels=None,
),
call(
command="env",
Expand All @@ -405,6 +408,7 @@ def test_execute_fallback_temp_dir(self, caplog):
tty=True,
hostname=None,
ports=[],
labels=None,
),
]
)
Expand Down Expand Up @@ -508,6 +512,7 @@ def test_environment_overrides_env_file(self, stringio_mock):
tty=True,
hostname=None,
ports=[],
labels=None,
)
stringio_mock.assert_called_once_with("UNIT=FILE\nPRIVATE=FILE\nVAR=VALUE")
self.dotenv_mock.assert_called_once_with(stream="UNIT=FILE\nPRIVATE=FILE\nVAR=VALUE")
Expand Down Expand Up @@ -781,3 +786,11 @@ def test_docker_host_env_unset(self, monkeypatch):
def test_fetch_logs(self, logger_mock, log_lines, expected_lines):
fetch_logs(log_lines, logger_mock)
assert logger_mock.info.call_args_list == [call("%s", line) for line in expected_lines]

@pytest.mark.parametrize("labels", ({"key": "value"}, ["key=value"]))
def test_labels(self, labels: dict[str, str] | list[str]):
operator = DockerOperator(task_id="test", image="test", labels=labels)
operator.execute(None)
self.client_mock.create_container.assert_called_once()
assert "labels" in self.client_mock.create_container.call_args.kwargs
assert labels == self.client_mock.create_container.call_args.kwargs["labels"]
Loading