Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix processor cleanup on DagFileProcessorManager (#22685)
* Fix processor cleanup References to processors weren't being cleaned up after killing them in the event of a timeout. This lead to a crash caused by an unhandled exception when trying to read from a closed end of a pipe. * Reap the zombie when killing the processor When calling `_kill_process()` we're generating zombies which weren't being `wait()`ed for. This led to a process leak we fix by just calling `waitpid()` on the appropriate PIDs. * Reap resulting zombies in a safe way According to @potiuk's and @malthe's input, the way we were reaping the zombies could cause some racy and unwanted situations. As seen on the discussion over at `https://bugs.python.org/issue42558` we can safely reap the spawned zombies with the changes we have introduced. * Explain why we are actively waiting As suggested by @potiuk explaining why we chose to actively wait on an scenario such as this one can indeed be useful for anybody taking a look at the code some time from now... Co-authored-by: Jarek Potiuk <jarek@potiuk.com> * Fix small typo and triling whitespace After accepting the changes proposed on the PR we found a small typo (we make those on a daily basis) and a trailing whitespace we though was nice to delete. Hope we made the right choice! * Fix call to `poll()` We were calling `poll()` through the `_process` attribute and, as shown on the static checks triggered by GitHub, it's not defined for the `BaseProcess` class. We instead have to call `poll()` through `BaseProcess`'s `_popen` attribute. * Fix processor cleanup References to processors weren't being cleaned up after killing them in the event of a timeout. This lead to a crash caused by an unhandled exception when trying to read from a closed end of a pipe. * Reap the zombie when killing the processor When calling `_kill_process()` we're generating zombies which weren't being `wait()`ed for. This led to a process leak we fix by just calling `waitpid()` on the appropriate PIDs. * Reap resulting zombies in a safe way According to @potiuk's and @malthe's input, the way we were reaping the zombies could cause some racy and unwanted situations. As seen on the discussion over at `https://bugs.python.org/issue42558` we can safely reap the spawned zombies with the changes we have introduced. * Explain why we are actively waiting As suggested by @potiuk explaining why we chose to actively wait on an scenario such as this one can indeed be useful for anybody taking a look at the code some time from now... Co-authored-by: Jarek Potiuk <jarek@potiuk.com> * Fix small typo and triling whitespace After accepting the changes proposed on the PR we found a small typo (we make those on a daily basis) and a trailing whitespace we though was nice to delete. Hope we made the right choice! * Fix call to `poll()` We were calling `poll()` through the `_process` attribute and, as shown on the static checks triggered by GitHub, it's not defined for the `BaseProcess` class. We instead have to call `poll()` through `BaseProcess`'s `_popen` attribute. * Prevent static check from failing After reading through `multiprocessing`'s implementation we really didn't know why the static check on line `239` was failing: the process should contain a `_popen` attribute... That's when we found line `223` and discovered the trailing `# type: ignore` comment. After reading up on it we found that it instructs *MyPy* not to statically check that very line. Given we're having trouble with the exact same attribute we decided to include the same directive for the static checker. Hope we made the right call! * Fix test for `_kill_timed_out_processors()` We hadn't updated the tests for the method whose body we've altered. This caused the tests to fail when trying to retrieve a processor's *waitable*, a property similar to a *file descriptor* in UNIX-like systems. We have added a mock property to the `processor` and we've also updated the `manager`'s attributes so as to faithfully recreate the state of the data sctructures at a moment when a `processor` is to be terminated. Please note the `assertions` at the end are meant to check we reach the `manager`'s expected state. We have chosen to check the number of processor's against an explicit value because we're defining `manager._processors` explicitly within the test. On the other hand, `manager.waitables` can have a different length depending on the call to `DagFileProcessorManager`'s `__init__()`. In this test the expected initial length is `1` given we're passing `MagicMock()` as the `signal_conn` when instantiating the manager. However, if this were to be changed the tests would 'inexplicably' fail. Instead of checking `manager.waitables`' length against a hardcoded value we decided to instead compare it to its initial length so as to emphasize we're interested in the change in length, not its absolute value. * Fix `black` checks and `mock` decorators One of the methods we are to mock required a rather long `@mock.patch` decorator which didn't pass the checks made by `black` on the precommit hooks. On top of that, we messed up the ordering of the `@mock.patch` decorators which meant we didn't set them up properly. This manifested as a `KeyError` on the method we're currently testing. O_o Co-authored-by: Jarek Potiuk <jarek@potiuk.com> GitOrigin-RevId: 4a06f895bb2982ba9698b9f0cfeb26d1ff307fc3
- Loading branch information