Skip to content

Airflow operator #543

Open
Open
@PrateekGupta1819

Description

How to use the airflow operator?

I am able to install the docker library using requirements.txt but the dag fails. Can somebody help? My guess is that the docker daemon is not running.

$ cat requirements.txt
docker

docker run -d -p 8080:8080 -v $PWD/dags:/usr/local/airflow/dags -v $PWD/requirements.txt:/requirements.txt puckel/docker-airflow webserver

Dag details:
dags/test.py

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.docker_operator import DockerOperator

default_args = {
        'owner'                 : 'airflow',
        'description'           : 'Use of the DockerOperator',
        'depend_on_past'        : False,
        'start_date'            : datetime(2018, 1, 3),
        'email_on_failure'      : False,
        'email_on_retry'        : False,
        'retries'               : 1,
        'retry_delay'           : timedelta(minutes=5)
}

with DAG('docker_dag', default_args=default_args, schedule_interval="5 * * * *", catchup=False) as dag:
        t1 = BashOperator(
                task_id='print_current_date',
                bash_command='date'
        )

        t2 = DockerOperator(
                task_id='docker_command',
                image='centos:latest',
                api_version='auto',
                auto_remove=True,
                command="/bin/sleep 30",
                docker_url="unix://var/run/docker.sock",
                network_mode="bridge"
        )

        t3 = BashOperator(
                task_id='print_hello',
                bash_command='echo "hello world"'
        )

        t1 >> t2 >> t3

Error:

*** Reading local file: /usr/local/airflow/logs/docker_dag/docker_command/2020-04-20T11:54:30.996363+00:00/1.log
[2020-04-20 11:54:49,337] {{taskinstance.py:655}} INFO - Dependencies all met for <TaskInstance: docker_dag.docker_command 2020-04-20T11:54:30.996363+00:00 [queued]>
[2020-04-20 11:54:49,345] {{taskinstance.py:655}} INFO - Dependencies all met for <TaskInstance: docker_dag.docker_command 2020-04-20T11:54:30.996363+00:00 [queued]>
[2020-04-20 11:54:49,345] {{taskinstance.py:866}} INFO - 
--------------------------------------------------------------------------------
[2020-04-20 11:54:49,345] {{taskinstance.py:867}} INFO - Starting attempt 1 of 2
[2020-04-20 11:54:49,345] {{taskinstance.py:868}} INFO - 
--------------------------------------------------------------------------------
[2020-04-20 11:54:49,352] {{taskinstance.py:887}} INFO - Executing <Task(DockerOperator): docker_command> on 2020-04-20T11:54:30.996363+00:00
[2020-04-20 11:54:49,354] {{standard_task_runner.py:53}} INFO - Started process 1170 to run task
[2020-04-20 11:54:49,394] {{logging_mixin.py:112}} INFO - Running %s on host %s <TaskInstance: docker_dag.docker_command 2020-04-20T11:54:30.996363+00:00 [running]> 24b22e2b7d4c
[2020-04-20 11:54:49,408] {{taskinstance.py:1128}} ERROR - Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 672, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 387, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/local/lib/python3.7/http/client.py", line 1252, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1298, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1247, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1026, in _send_output
    self.send(msg)
  File "/usr/local/lib/python3.7/http/client.py", line 966, in send
    self.connect()
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/transport/unixconn.py", line 43, in connect
    sock.connect(self.unix_socket)
FileNotFoundError: [Errno 2] No such file or directory

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 720, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/usr/local/lib/python3.7/site-packages/urllib3/util/retry.py", line 400, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/usr/local/lib/python3.7/site-packages/urllib3/packages/six.py", line 734, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 672, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 387, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/local/lib/python3.7/http/client.py", line 1252, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1298, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1247, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1026, in _send_output
    self.send(msg)
  File "/usr/local/lib/python3.7/http/client.py", line 966, in send
    self.connect()
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/transport/unixconn.py", line 43, in connect
    sock.connect(self.unix_socket)
urllib3.exceptions.ProtocolError: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 207, in _retrieve_server_version
    return self.version(api_version=False)["ApiVersion"]
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/daemon.py", line 181, in version
    return self._result(self._get(url), json=True)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/utils/decorators.py", line 46, in inner
    return f(self, *args, **kwargs)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 230, in _get
    return self.get(url, **self._set_request_timeout(kwargs))
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 546, in get
    return self.request('GET', url, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 533, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 646, in send
    r = adapter.send(request, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 498, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/docker_operator.py", line 262, in execute
    tls=tls_config
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 190, in __init__
    self._version = self._retrieve_server_version()
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 215, in _retrieve_server_version
    'Error while fetching server API version: {0}'.format(e)
docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))
[2020-04-20 11:54:49,410] {{taskinstance.py:1151}} INFO - Marking task as UP_FOR_RETRY
[2020-04-20 11:54:59,319] {{logging_mixin.py:112}} INFO - [2020-04-20 11:54:59,318] {{local_task_job.py:103}} INFO - Task exited with return code 1

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions