Skip to content

Conversation

@ashb
Copy link
Member

@ashb ashb commented Mar 31, 2021

Closes #7935, #15037 (I hope!)

Thanks to @uranusjr for making the draft PR that pointed me to this solution in the first place.

There have been long standing issues where the scheduler would "stop
responding" that we haven't been able to track down.

Someone was able to catch the scheduler in this state in 2.0.1 and inspect
it with py-spy (thanks, MatthewRBruce!)

The stack traces (slightly shortened) were:

Python v3.8.7 (/usr/local/bin/python3.8) Thread 0x7FF5C09C8740 (active):
"MainThread"
   _send (multiprocessing/connection.py:368)
   _send_bytes (multiprocessing/connection.py:411)
   send (multiprocessing/connection.py:206)
   send_callback_to_execute (airflow/utils/dag_processing.py:283)
   _send_dag_callbacks_to_processor (airflow/jobs/scheduler_job.py:1795)
   _schedule_dag_run (airflow/jobs/scheduler_job.py:1762)

Process 77: airflow scheduler -- DagFileProcessorManager Python v3.8.7
(/usr/local/bin/python3.8) Thread 0x7FF5C09C8740 (active): "MainThread"
   _send (multiprocessing/connection.py:368)
   _send_bytes (multiprocessing/connection.py:405)
   send (multiprocessing/connection.py:206)
   _run_parsing_loop (airflow/utils/dag_processing.py:698)
   start (airflow/utils/dag_processing.py:596)

What this shows is that both processes are stuck trying to send data to
each other, but neither can proceed as both buffers are full, but since
both are trying to send, neither side is going to read and make more space
in the buffer. A classic deadlock!

The fix for this is two fold:

  1. Enable non-blocking IO on the DagFileProcessorManager side.

    The only thing the Manager sends back up the pipe is (now, as of 2.0)
    the DagParsingStat object, and the scheduler will happily continue
    without receiving these, so in the case of a blocking error, it is
    simply better to ignore the error, continue the loop and try sending
    one again later.

  2. Reduce the size of DagParsingStat

    In the case of a large number of dag files we included the path for
    each and every one (in full) in each parsing stat. Not only did the
    scheduler do nothing with this field, meaning it was larger than it
    needed to be, by making it such a large object, it increases the
    likely hood of hitting this send-buffer-full deadlock case!


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@ashb ashb added this to the Airflow 2.0.2 milestone Mar 31, 2021
@ashb
Copy link
Member Author

ashb commented Mar 31, 2021

/cc @uranusjr

@ashb ashb requested a review from kaxil March 31, 2021 13:41
@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Mar 31, 2021
There have been long standing issues where the scheduler would "stop
responding" that we haven't been able to track down.

Someone was able to catch the scheduler in this state in 2.0.1 and
inspect it with py-spy (thanks, MatthewRBruce!)

The stack traces (slightly shortened) were:

```
Process 6: /usr/local/bin/python /usr/local/bin/airflow scheduler
Python v3.8.7 (/usr/local/bin/python3.8)
Thread 0x7FF5C09C8740 (active): "MainThread"
    _send (multiprocessing/connection.py:368)
    _send_bytes (multiprocessing/connection.py:411)
    send (multiprocessing/connection.py:206)
    send_callback_to_execute (airflow/utils/dag_processing.py:283)
    _send_dag_callbacks_to_processor (airflow/jobs/scheduler_job.py:1795)
    _schedule_dag_run (airflow/jobs/scheduler_job.py:1762)

Process 77: airflow scheduler -- DagFileProcessorManager
Python v3.8.7 (/usr/local/bin/python3.8)
Thread 0x7FF5C09C8740 (active): "MainThread"
    _send (multiprocessing/connection.py:368)
    _send_bytes (multiprocessing/connection.py:405)
    send (multiprocessing/connection.py:206)
    _run_parsing_loop (airflow/utils/dag_processing.py:698)
    start (airflow/utils/dag_processing.py:596)
```

What this shows is that both processes are stuck trying to send data to
each other, but neither can proceed as both buffers are full, but since
both are trying to send, neither side is going to read and make more
space in the buffer. A classic deadlock!

The fix for this is two fold:

1) Enable non-blocking IO on the DagFileProcessorManager side.

   The only thing the Manager sends back up the pipe is (now, as of 2.0)
   the DagParsingStat object, and the scheduler will happily continue
   without receiving these, so in the case of a blocking error, it is
   simply better to ignore the error, continue the loop and try sending
   one again later.

2) Reduce the size of DagParsingStat

   In the case of a large number of dag files we included the path for
   each and every one (in full) in _each_ parsing stat. Not only did the
   scheduler do nothing with this field, meaning it was larger than it
   needed to be, by making it such a large object, it increases the
   likely hood of hitting this send-buffer-full deadlock case!
@ashb ashb marked this pull request as ready for review March 31, 2021 15:46
@ashb ashb requested review from XD-DENG and turbaszek as code owners March 31, 2021 15:46
@ashb ashb force-pushed the schedule-deadlock-fix branch from d57ce92 to cfec563 Compare March 31, 2021 15:46
)
self._signal_conn.send(dag_parsing_stat)
try:
self._signal_conn.send(dag_parsing_stat)
Copy link
Member

@XD-DENG XD-DENG Mar 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure: does applying a timeout context here makes sense?
The reason is it may try to send and get stuck but not fail explicitly. So to make it fail after specific timeout time may help.

May be a dumb question.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these are local sockets, with the change to Non-blocking mode, it will either succeed, or fail entirely, "instantly", so we don't need a timeout, no.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wouldn't be a bad idea to add a timeout, but that's extremely rarely an issue for local sockets (things need to get pretty wrong for that to happen at all). This can always be revisited if someone ever complains.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On Linux these are AF_UNIX sockets too, so even harder to get it to not get instant fail/success in non-blocking mode I think

self._signal_conn.send(dag_parsing_stat)
try:
self._signal_conn.send(dag_parsing_stat)
except BlockingIOError:
Copy link
Member

@XD-DENG XD-DENG Mar 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any possibility of other error type that we may want to ignore? Again, may be a dumb question.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, no. Anything else is an "error" condition, and we want to die (and the scheduler will notice and restart the manager process)

@github-actions
Copy link

The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.

@ashb ashb merged commit b0e68eb into apache:master Mar 31, 2021
@ashb ashb deleted the schedule-deadlock-fix branch March 31, 2021 22:18
ashb added a commit that referenced this pull request Apr 1, 2021
)

There have been long standing issues where the scheduler would "stop
responding" that we haven't been able to track down.

Someone was able to catch the scheduler in this state in 2.0.1 and
inspect it with py-spy (thanks, MatthewRBruce!)

The stack traces (slightly shortened) were:

```
Process 6: /usr/local/bin/python /usr/local/bin/airflow scheduler
Python v3.8.7 (/usr/local/bin/python3.8)
Thread 0x7FF5C09C8740 (active): "MainThread"
    _send (multiprocessing/connection.py:368)
    _send_bytes (multiprocessing/connection.py:411)
    send (multiprocessing/connection.py:206)
    send_callback_to_execute (airflow/utils/dag_processing.py:283)
    _send_dag_callbacks_to_processor (airflow/jobs/scheduler_job.py:1795)
    _schedule_dag_run (airflow/jobs/scheduler_job.py:1762)

Process 77: airflow scheduler -- DagFileProcessorManager
Python v3.8.7 (/usr/local/bin/python3.8)
Thread 0x7FF5C09C8740 (active): "MainThread"
    _send (multiprocessing/connection.py:368)
    _send_bytes (multiprocessing/connection.py:405)
    send (multiprocessing/connection.py:206)
    _run_parsing_loop (airflow/utils/dag_processing.py:698)
    start (airflow/utils/dag_processing.py:596)
```

What this shows is that both processes are stuck trying to send data to
each other, but neither can proceed as both buffers are full, but since
both are trying to send, neither side is going to read and make more
space in the buffer. A classic deadlock!

The fix for this is two fold:

1) Enable non-blocking IO on the DagFileProcessorManager side.

   The only thing the Manager sends back up the pipe is (now, as of 2.0)
   the DagParsingStat object, and the scheduler will happily continue
   without receiving these, so in the case of a blocking error, it is
   simply better to ignore the error, continue the loop and try sending
   one again later.

2) Reduce the size of DagParsingStat

   In the case of a large number of dag files we included the path for
   each and every one (in full) in _each_ parsing stat. Not only did the
   scheduler do nothing with this field, meaning it was larger than it
   needed to be, by making it such a large object, it increases the
   likely hood of hitting this send-buffer-full deadlock case!

(cherry picked from commit b0e68eb)
kaxil pushed a commit to astronomer/airflow that referenced this pull request Apr 12, 2021
…che#15112)

There have been long standing issues where the scheduler would "stop
responding" that we haven't been able to track down.

Someone was able to catch the scheduler in this state in 2.0.1 and
inspect it with py-spy (thanks, MatthewRBruce!)

The stack traces (slightly shortened) were:

```
Process 6: /usr/local/bin/python /usr/local/bin/airflow scheduler
Python v3.8.7 (/usr/local/bin/python3.8)
Thread 0x7FF5C09C8740 (active): "MainThread"
    _send (multiprocessing/connection.py:368)
    _send_bytes (multiprocessing/connection.py:411)
    send (multiprocessing/connection.py:206)
    send_callback_to_execute (airflow/utils/dag_processing.py:283)
    _send_dag_callbacks_to_processor (airflow/jobs/scheduler_job.py:1795)
    _schedule_dag_run (airflow/jobs/scheduler_job.py:1762)

Process 77: airflow scheduler -- DagFileProcessorManager
Python v3.8.7 (/usr/local/bin/python3.8)
Thread 0x7FF5C09C8740 (active): "MainThread"
    _send (multiprocessing/connection.py:368)
    _send_bytes (multiprocessing/connection.py:405)
    send (multiprocessing/connection.py:206)
    _run_parsing_loop (airflow/utils/dag_processing.py:698)
    start (airflow/utils/dag_processing.py:596)
```

What this shows is that both processes are stuck trying to send data to
each other, but neither can proceed as both buffers are full, but since
both are trying to send, neither side is going to read and make more
space in the buffer. A classic deadlock!

The fix for this is two fold:

1) Enable non-blocking IO on the DagFileProcessorManager side.

   The only thing the Manager sends back up the pipe is (now, as of 2.0)
   the DagParsingStat object, and the scheduler will happily continue
   without receiving these, so in the case of a blocking error, it is
   simply better to ignore the error, continue the loop and try sending
   one again later.

2) Reduce the size of DagParsingStat

   In the case of a large number of dag files we included the path for
   each and every one (in full) in _each_ parsing stat. Not only did the
   scheduler do nothing with this field, meaning it was larger than it
   needed to be, by making it such a large object, it increases the
   likely hood of hitting this send-buffer-full deadlock case!

(cherry picked from commit b0e68eb)
ashb added a commit that referenced this pull request Apr 15, 2021
)

There have been long standing issues where the scheduler would "stop
responding" that we haven't been able to track down.

Someone was able to catch the scheduler in this state in 2.0.1 and
inspect it with py-spy (thanks, MatthewRBruce!)

The stack traces (slightly shortened) were:

```
Process 6: /usr/local/bin/python /usr/local/bin/airflow scheduler
Python v3.8.7 (/usr/local/bin/python3.8)
Thread 0x7FF5C09C8740 (active): "MainThread"
    _send (multiprocessing/connection.py:368)
    _send_bytes (multiprocessing/connection.py:411)
    send (multiprocessing/connection.py:206)
    send_callback_to_execute (airflow/utils/dag_processing.py:283)
    _send_dag_callbacks_to_processor (airflow/jobs/scheduler_job.py:1795)
    _schedule_dag_run (airflow/jobs/scheduler_job.py:1762)

Process 77: airflow scheduler -- DagFileProcessorManager
Python v3.8.7 (/usr/local/bin/python3.8)
Thread 0x7FF5C09C8740 (active): "MainThread"
    _send (multiprocessing/connection.py:368)
    _send_bytes (multiprocessing/connection.py:405)
    send (multiprocessing/connection.py:206)
    _run_parsing_loop (airflow/utils/dag_processing.py:698)
    start (airflow/utils/dag_processing.py:596)
```

What this shows is that both processes are stuck trying to send data to
each other, but neither can proceed as both buffers are full, but since
both are trying to send, neither side is going to read and make more
space in the buffer. A classic deadlock!

The fix for this is two fold:

1) Enable non-blocking IO on the DagFileProcessorManager side.

   The only thing the Manager sends back up the pipe is (now, as of 2.0)
   the DagParsingStat object, and the scheduler will happily continue
   without receiving these, so in the case of a blocking error, it is
   simply better to ignore the error, continue the loop and try sending
   one again later.

2) Reduce the size of DagParsingStat

   In the case of a large number of dag files we included the path for
   each and every one (in full) in _each_ parsing stat. Not only did the
   scheduler do nothing with this field, meaning it was larger than it
   needed to be, by making it such a large object, it increases the
   likely hood of hitting this send-buffer-full deadlock case!

(cherry picked from commit b0e68eb)
kaxil added a commit to astronomer/airflow that referenced this pull request Apr 24, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler

Projects

None yet

Development

Successfully merging this pull request may close these issues.

scheduler gets stuck without a trace

4 participants