From e36f703a51577bcd54b62438271d04707babcb5a Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Sun, 11 Jul 2021 21:09:24 +0200 Subject: [PATCH] Adds option to disable mounting temporary folder in DockerOperator The DockerOperator by default mounts temporary folder to inside the container in order to allow to store files bigger than default size of disk for the container, however this did not work when remote Docker engine or Docker-In-Docker solution was used. This worked before the #15843 change, because the /tmp has been ignored, however when we change to "Mounts", the "/tmp" mount fails when using remote docker engine. This PR adds parameter that allows to disable this temporary directory mounting (and adds a note that it can be replaced with mounting existing volumes). Also it prints a warning if the directory cannot be mounted and attempts to re-run such failed attempt without mounting the temporary directory which brings back backwards-compatible behaviour for remote engines and docker-in-docker. Fixes: #16803 Fixes: #16806 --- airflow/providers/docker/operators/docker.py | 153 +++++++++------- .../providers/docker/operators/test_docker.py | 168 ++++++++++++++++++ 2 files changed, 260 insertions(+), 61 deletions(-) diff --git a/airflow/providers/docker/operators/docker.py b/airflow/providers/docker/operators/docker.py index 7b5d8c664d840..8769682aeede1 100644 --- a/airflow/providers/docker/operators/docker.py +++ b/airflow/providers/docker/operators/docker.py @@ -21,6 +21,7 @@ from typing import Dict, Iterable, List, Optional, Union from docker import APIClient, tls +from docker.errors import APIError from docker.types import Mount from airflow.exceptions import AirflowException @@ -32,12 +33,23 @@ class DockerOperator(BaseOperator): """ Execute a command inside a docker container. - A temporary directory is created on the host and - mounted into a container to allow storing files + By default, a temporary directory is + created on the host and mounted into a container to allow storing files that together exceed the default disk size of 10GB in a container. - The path to the mounted directory can be accessed + In this case The path to the mounted directory can be accessed via the environment variable ``AIRFLOW_TMP_DIR``. + If the volume cannot be mounted, warning is printed and an attempt is made to execute the docker + command without the temporary folder mounted. This is to make it works by default with remote docker + engine or when you run docker-in-docker solution and temporary directory is not shared with the + docker engine. Warning is printed in logs in this case. + + If you know you run DockerOperator with remote engine or via docker-in-docker + you should set ``mount_tmp_dir`` parameter to False. In this case, you can still use + ``mounts`` parameter to mount already existing named volumes in your Docker Engine + to achieve similar capability where you can store files exceeding default disk size + of the container, + If a login to a private registry is required prior to pulling the image, a Docker connection needs to be configured in Airflow and the connection ID be provided with the parameter ``docker_conn_id``. @@ -88,6 +100,9 @@ class DockerOperator(BaseOperator): :type tls_hostname: str or bool :param tls_ssl_version: Version of SSL to use when communicating with docker daemon. :type tls_ssl_version: str + :param mount_tmp_dir: Specify whether the temporary directory should be bind-mounted + from the host to the container + :type mount_tmp_dir: bool :param tmp_dir: Mount point inside the container to a temporary directory created on the host by the operator. The path is also made available via the environment variable @@ -154,6 +169,7 @@ def __init__( tls_client_key: Optional[str] = None, tls_hostname: Optional[Union[str, bool]] = None, tls_ssl_version: Optional[str] = None, + mount_tmp_dir: bool = True, tmp_dir: str = '/tmp/airflow', user: Optional[Union[str, int]] = None, mounts: Optional[List[Mount]] = None, @@ -193,6 +209,7 @@ def __init__( self.tls_client_key = tls_client_key self.tls_hostname = tls_hostname self.tls_ssl_version = tls_ssl_version + self.mount_tmp_dir = mount_tmp_dir self.tmp_dir = tmp_dir self.user = user self.mounts = mounts or [] @@ -227,66 +244,82 @@ def get_hook(self) -> DockerHook: def _run_image(self) -> Optional[str]: """Run a Docker container with the provided image""" self.log.info('Starting docker container from image %s', self.image) + if not self.cli: + raise Exception("The 'cli' should be initialized before!") + if self.mount_tmp_dir: + with TemporaryDirectory(prefix='airflowtmp', dir=self.host_tmp_dir) as host_tmp_dir: + tmp_mount = Mount(self.tmp_dir, host_tmp_dir, "bind") + try: + return self._run_image_with_mounts(self.mounts + [tmp_mount], add_tmp_variable=True) + except APIError as e: + if self.host_tmp_dir in str(e): + self.log.warning( + "Using remote engine or docker-in-docker and mounting temporary " + "volume from host is not supported. Falling back to " + "`mount_tmp_dir=False` mode. You can set `mount_tmp_dir` parameter" + " to False to disable mounting and remove the warning" + ) + return self._run_image_with_mounts(self.mounts, add_tmp_variable=False) + else: + raise Exception(str(e)) + raise + else: + return self._run_image_with_mounts(self.mounts, add_tmp_variable=False) - with TemporaryDirectory(prefix='airflowtmp', dir=self.host_tmp_dir) as host_tmp_dir: - if not self.cli: - raise Exception("The 'cli' should be initialized before!") - tmp_mount = Mount(self.tmp_dir, host_tmp_dir, "bind") - self.container = self.cli.create_container( - command=self.format_command(self.command), - name=self.container_name, - environment={**self.environment, **self._private_environment}, - host_config=self.cli.create_host_config( - auto_remove=False, - mounts=self.mounts + [tmp_mount], - network_mode=self.network_mode, - shm_size=self.shm_size, - dns=self.dns, - dns_search=self.dns_search, - cpu_shares=int(round(self.cpus * 1024)), - mem_limit=self.mem_limit, - cap_add=self.cap_add, - extra_hosts=self.extra_hosts, - privileged=self.privileged, - ), - image=self.image, - user=self.user, - entrypoint=self.format_command(self.entrypoint), - working_dir=self.working_dir, - tty=self.tty, - ) - - lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True) - - try: - self.cli.start(self.container['Id']) + def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optional[str]: + if add_tmp_variable: + self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir + else: + self.environment.pop('AIRFLOW_TMP_DIR', None) + self.container = self.cli.create_container( + command=self.format_command(self.command), + name=self.container_name, + environment={**self.environment, **self._private_environment}, + host_config=self.cli.create_host_config( + auto_remove=False, + mounts=target_mounts, + network_mode=self.network_mode, + shm_size=self.shm_size, + dns=self.dns, + dns_search=self.dns_search, + cpu_shares=int(round(self.cpus * 1024)), + mem_limit=self.mem_limit, + cap_add=self.cap_add, + extra_hosts=self.extra_hosts, + privileged=self.privileged, + ), + image=self.image, + user=self.user, + entrypoint=self.format_command(self.entrypoint), + working_dir=self.working_dir, + tty=self.tty, + ) + lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True) + try: + self.cli.start(self.container['Id']) - line = '' - res_lines = [] - for line in lines: - line = line.strip() - if hasattr(line, 'decode'): - # Note that lines returned can also be byte sequences so we have to handle decode here - line = line.decode('utf-8') - res_lines.append(line) - self.log.info(line) + line = '' + res_lines = [] + for line in lines: + line = line.strip() + if hasattr(line, 'decode'): + # Note that lines returned can also be byte sequences so we have to handle decode here + line = line.decode('utf-8') + res_lines.append(line) + self.log.info(line) - result = self.cli.wait(self.container['Id']) - if result['StatusCode'] != 0: - res_lines = "\n".join(res_lines) - raise AirflowException('docker container failed: ' + repr(result) + f"lines {res_lines}") + result = self.cli.wait(self.container['Id']) + if result['StatusCode'] != 0: + res_lines = "\n".join(res_lines) + raise AirflowException('docker container failed: ' + repr(result) + f"lines {res_lines}") - ret = None - if self.do_xcom_push: - ret = ( - self.cli.logs(container=self.container['Id']) - if self.xcom_all - else line.encode('utf-8') - ) - return ret - finally: - if self.auto_remove: - self.cli.remove_container(self.container['Id']) + ret = None + if self.do_xcom_push: + ret = self.cli.logs(container=self.container['Id']) if self.xcom_all else line.encode('utf-8') + return ret + finally: + if self.auto_remove: + self.cli.remove_container(self.container['Id']) def execute(self, context) -> Optional[str]: self.cli = self._get_cli() @@ -312,8 +345,6 @@ def execute(self, context) -> Optional[str]: if latest_status.get(output_id) != output_status: self.log.info("%s: %s", output_id, output_status) latest_status[output_id] = output_status - - self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir return self._run_image() def _get_cli(self) -> APIClient: diff --git a/tests/providers/docker/operators/test_docker.py b/tests/providers/docker/operators/test_docker.py index 84dd9c1ca3818..bb4c107e93720 100644 --- a/tests/providers/docker/operators/test_docker.py +++ b/tests/providers/docker/operators/test_docker.py @@ -18,8 +18,10 @@ import logging import unittest from unittest import mock +from unittest.mock import call import pytest +from docker.errors import APIError from airflow.exceptions import AirflowException @@ -119,6 +121,172 @@ def test_execute(self): operator.cli.pull('ubuntu:latest', stream=True, decode=True) == self.client_mock.pull.return_value ) + def test_execute_no_temp_dir(self): + operator = DockerOperator( + api_version='1.19', + command='env', + environment={'UNIT': 'TEST'}, + private_environment={'PRIVATE': 'MESSAGE'}, + image='ubuntu:latest', + network_mode='bridge', + owner='unittest', + task_id='unittest', + mounts=[Mount(source='/host/path', target='/container/path', type='bind')], + mount_tmp_dir=False, + entrypoint='["sh", "-c"]', + working_dir='/container/path', + shm_size=1000, + host_tmp_dir='/host/airflow', + container_name='test_container', + tty=True, + ) + operator.execute(None) + + self.client_class_mock.assert_called_once_with( + base_url='unix://var/run/docker.sock', tls=None, version='1.19' + ) + + self.client_mock.create_container.assert_called_once_with( + command='env', + name='test_container', + environment={'UNIT': 'TEST', 'PRIVATE': 'MESSAGE'}, + host_config=self.client_mock.create_host_config.return_value, + image='ubuntu:latest', + user=None, + entrypoint=['sh', '-c'], + working_dir='/container/path', + tty=True, + ) + self.client_mock.create_host_config.assert_called_once_with( + mounts=[ + Mount(source='/host/path', target='/container/path', type='bind'), + ], + network_mode='bridge', + shm_size=1000, + cpu_shares=1024, + mem_limit=None, + auto_remove=False, + dns=None, + dns_search=None, + cap_add=None, + extra_hosts=None, + privileged=False, + ) + self.tempdir_mock.assert_not_called() + self.client_mock.images.assert_called_once_with(name='ubuntu:latest') + self.client_mock.attach.assert_called_once_with( + container='some_id', stdout=True, stderr=True, stream=True + ) + self.client_mock.pull.assert_called_once_with('ubuntu:latest', stream=True, decode=True) + self.client_mock.wait.assert_called_once_with('some_id') + assert ( + operator.cli.pull('ubuntu:latest', stream=True, decode=True) == self.client_mock.pull.return_value + ) + + def test_execute_fallback_temp_dir(self): + self.client_mock.create_container.side_effect = [ + APIError(message="wrong path: " + "/host/airflow"), + {'Id': 'some_id'}, + ] + operator = DockerOperator( + api_version='1.19', + command='env', + environment={'UNIT': 'TEST'}, + private_environment={'PRIVATE': 'MESSAGE'}, + image='ubuntu:latest', + network_mode='bridge', + owner='unittest', + task_id='unittest', + mounts=[Mount(source='/host/path', target='/container/path', type='bind')], + mount_tmp_dir=True, + entrypoint='["sh", "-c"]', + working_dir='/container/path', + shm_size=1000, + host_tmp_dir='/host/airflow', + container_name='test_container', + tty=True, + ) + with self.assertLogs(operator.log, level=logging.WARNING) as captured: + operator.execute(None) + assert ( + "WARNING:airflow.task.operators:Using remote engine or docker-in-docker " + "and mounting temporary volume from host is not supported" in captured.output[0] + ) + self.client_class_mock.assert_called_once_with( + base_url='unix://var/run/docker.sock', tls=None, version='1.19' + ) + self.client_mock.create_container.assert_has_calls( + [ + call( + command='env', + name='test_container', + environment={'AIRFLOW_TMP_DIR': '/tmp/airflow', 'UNIT': 'TEST', 'PRIVATE': 'MESSAGE'}, + host_config=self.client_mock.create_host_config.return_value, + image='ubuntu:latest', + user=None, + entrypoint=['sh', '-c'], + working_dir='/container/path', + tty=True, + ), + call( + command='env', + name='test_container', + environment={'UNIT': 'TEST', 'PRIVATE': 'MESSAGE'}, + host_config=self.client_mock.create_host_config.return_value, + image='ubuntu:latest', + user=None, + entrypoint=['sh', '-c'], + working_dir='/container/path', + tty=True, + ), + ] + ) + self.client_mock.create_host_config.assert_has_calls( + [ + call( + mounts=[ + Mount(source='/host/path', target='/container/path', type='bind'), + Mount(source='/mkdtemp', target='/tmp/airflow', type='bind'), + ], + network_mode='bridge', + shm_size=1000, + cpu_shares=1024, + mem_limit=None, + auto_remove=False, + dns=None, + dns_search=None, + cap_add=None, + extra_hosts=None, + privileged=False, + ), + call( + mounts=[ + Mount(source='/host/path', target='/container/path', type='bind'), + ], + network_mode='bridge', + shm_size=1000, + cpu_shares=1024, + mem_limit=None, + auto_remove=False, + dns=None, + dns_search=None, + cap_add=None, + extra_hosts=None, + privileged=False, + ), + ] + ) + self.tempdir_mock.assert_called_once_with(dir='/host/airflow', prefix='airflowtmp') + self.client_mock.images.assert_called_once_with(name='ubuntu:latest') + self.client_mock.attach.assert_called_once_with( + container='some_id', stdout=True, stderr=True, stream=True + ) + self.client_mock.pull.assert_called_once_with('ubuntu:latest', stream=True, decode=True) + self.client_mock.wait.assert_called_once_with('some_id') + assert ( + operator.cli.pull('ubuntu:latest', stream=True, decode=True) == self.client_mock.pull.return_value + ) + def test_private_environment_is_private(self): operator = DockerOperator( private_environment={'PRIVATE': 'MESSAGE'}, image='ubuntu:latest', task_id='unittest'