-
Notifications
You must be signed in to change notification settings - Fork 520
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Fix deadlock in transport due to GC running #814
Conversation
82417ec
to
073e8bf
Compare
with self.all_tasks_done: | ||
unfinished = self.unfinished_tasks - 1 | ||
if unfinished <= 0: | ||
if unfinished < 0: | ||
raise ValueError("task_done() called too many times") | ||
self.all_tasks_done.notify_all() | ||
self.unfinished_tasks = unfinished |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.all_tasks_done
is now re-entrant. This doesn't use atomic operations at all, so say gc kicks in at line 82 and executes the same code and the results are wrong. I think I now understand why CPython doesn't want to do this...
I tried to see how to do this more atomically but it's hard, I always end up with some store operation long after the load operation. Anyway, we care about put, not this one so let's consider this academic.
raise Full | ||
self.not_full.wait(remaining) | ||
self._put(item) | ||
self.unfinished_tasks += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This self.unifinished_tasks += 1
ends up looking something like:
In [17]: class A:
...: def __init__(self):
...: self.unfinished_tasks = 0
...: def put(self):
...: self.unfinished_tasks += 1
...:
In [18]: dis.dis(A.put)
5 0 LOAD_FAST 0 (self)
2 DUP_TOP
4 LOAD_ATTR 0 (unfinished_tasks)
6 LOAD_CONST 1 (1)
8 INPLACE_ADD
10 ROT_TWO
12 STORE_ATTR 0 (unfinished_tasks)
14 LOAD_CONST 0 (None)
16 RETURN_VALUE
Importantly here INPLACE_ADD
at offset 8 ends up invoking the __iadd__
protocol and thus expands to many bytecodes, but for integers this should hopefully be atomically implemented in C. However it only gets stored again at offset 12. So if GC runs a put here concurrently we could end up with an unfinished task when we think they're all finished.
Is that acceptable for us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think they'd notice but I think we should figure out hwo to get rid of this race
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use an itertools.count()
for those potentially.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you decrement? Could use two counters I guess, and the value is the subtraction of both. I am no longer of the opinion that we need to fix though.
with self.mutex: | ||
return 0 < self.maxsize <= self._qsize() | ||
|
||
def put(self, item, block=True, timeout=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use a bounded queue, but do we invoke it with block or timeout? I think this might be important because of how the self.not_full.wait()
calls end up behaving with an RLock
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we only use non-blocking put
I was trying to figure out if finer-grained locking would work, but it's really hard to always wake up the |
We talked about this extensively offline and concluded that this or disabling GC is the best way forward for now. Disabling GC seems relatively intrusive to me, while the downsides of this PR are basically limited to the SDK deadlocking internally. |
Fix #803
The workaround is the same as sqlalchemy uses. We also didn't check in tests for this as they would take too long. A test that creates 1m items takes 10 seconds to terminate, going much lower than 10m makes the test flaky (in the sense that the bug is not hit every time)
The concern about RLock-vs-signals is not important as it only happens with the Python impl of RLock, and we have not been able to repro any of this with signals either, so I suppose it's harder to hit.
Alternate proposal would be to use
gc.callbacks
to figure out when GC is running, and disable event submission in those situations. Armin's concern was that gc.callbacks may run too often and us hooking into it is likely a performance issue.sqlalchemy's workaround has been working since forever, so that speaks for that