Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix deadlock in transport due to GC running #814

Merged
merged 8 commits into from
Sep 14, 2020

Conversation

untitaker
Copy link
Member

@untitaker untitaker commented Sep 7, 2020

Fix #803

The workaround is the same as sqlalchemy uses. We also didn't check in tests for this as they would take too long. A test that creates 1m items takes 10 seconds to terminate, going much lower than 10m makes the test flaky (in the sense that the bug is not hit every time)

The concern about RLock-vs-signals is not important as it only happens with the Python impl of RLock, and we have not been able to repro any of this with signals either, so I suppose it's harder to hit.

Alternate proposal would be to use gc.callbacks to figure out when GC is running, and disable event submission in those situations. Armin's concern was that gc.callbacks may run too often and us hooking into it is likely a performance issue.

sqlalchemy's workaround has been working since forever, so that speaks for that

@untitaker untitaker force-pushed the fix/worker-deadlock-gc branch from 82417ec to 073e8bf Compare September 7, 2020 12:36
Comment on lines +80 to +86
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError("task_done() called too many times")
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.all_tasks_done is now re-entrant. This doesn't use atomic operations at all, so say gc kicks in at line 82 and executes the same code and the results are wrong. I think I now understand why CPython doesn't want to do this...

I tried to see how to do this more atomically but it's hard, I always end up with some store operation long after the load operation. Anyway, we care about put, not this one so let's consider this academic.

raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This self.unifinished_tasks += 1 ends up looking something like:

In [17]: class A:
    ...:     def __init__(self):
    ...:         self.unfinished_tasks = 0
    ...:     def put(self):
    ...:         self.unfinished_tasks += 1
    ...:

In [18]: dis.dis(A.put)
  5           0 LOAD_FAST                0 (self)
              2 DUP_TOP
              4 LOAD_ATTR                0 (unfinished_tasks)
              6 LOAD_CONST               1 (1)
              8 INPLACE_ADD
             10 ROT_TWO
             12 STORE_ATTR               0 (unfinished_tasks)
             14 LOAD_CONST               0 (None)
             16 RETURN_VALUE

Importantly here INPLACE_ADD at offset 8 ends up invoking the __iadd__ protocol and thus expands to many bytecodes, but for integers this should hopefully be atomically implemented in C. However it only gets stored again at offset 12. So if GC runs a put here concurrently we could end up with an unfinished task when we think they're all finished.

Is that acceptable for us?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think they'd notice but I think we should figure out hwo to get rid of this race

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use an itertools.count() for those potentially.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you decrement? Could use two counters I guess, and the value is the subtraction of both. I am no longer of the opinion that we need to fix though.

with self.mutex:
return 0 < self.maxsize <= self._qsize()

def put(self, item, block=True, timeout=None):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use a bounded queue, but do we invoke it with block or timeout? I think this might be important because of how the self.not_full.wait() calls end up behaving with an RLock.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only use non-blocking put

@flub
Copy link

flub commented Sep 8, 2020

I was trying to figure out if finer-grained locking would work, but it's really hard to always wake up the .get() with a re-entrant lock. Maybe the only way to fix this is write a C extension, but I haven't fully thought through that.

@untitaker
Copy link
Member Author

We talked about this extensively offline and concluded that this or disabling GC is the best way forward for now. Disabling GC seems relatively intrusive to me, while the downsides of this PR are basically limited to the SDK deadlocking internally.

@untitaker untitaker merged commit cd6ef0c into master Sep 14, 2020
@untitaker untitaker deleted the fix/worker-deadlock-gc branch September 14, 2020 08:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Deadlock (Hangs) during Garbage Collection because of logging in destructor and lock on Queue.
3 participants