-
Notifications
You must be signed in to change notification settings - Fork 19.6k
Fixing finite generator race condition #8162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -654,7 +654,7 @@ def get(self): | |
yield inputs | ||
else: | ||
all_finished = all([not thread.is_alive() for thread in self._threads]) | ||
if all_finished: | ||
if all_finished and self.queue.empty(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By checking for the second time if the queue is empty we can fix the race condition problem. If all the threads are inactive and the queue is empty, no further updates will happen on the queue. Thus it is safe to raise a StopIteration() exception. Nevertheless if all the threads are inactive and the last thread managed to update the queue, we will sleep and loop again until we exhaust the items of the queue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. empty is not thread_safe. Please use self.queue.join() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I check that all threads are not alive before that. I should be covered right? |
||
raise StopIteration() | ||
else: | ||
time.sleep(self.wait_time) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -264,7 +264,7 @@ def test_finite_generator_enqueuer_threads(): | |
acc = [] | ||
for output in gen_output: | ||
acc.append(int(output[0, 0, 0, 0])) | ||
assert len(set(acc) - set(range(100))) == 0, "Output is not the same" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic is copied from the original I understand the reason why the original test asserts this way. Due to multi-threading, the final elements of the iterator (numbers 95-99) might not appear in the first 100 results. It is possible that the cyclic iterator will put in the queue first the initial items (numbers 0-5) followed by the last items (numbers 95-99). Hence checking the equality of the sets is not possible and the next best thing we can test for is the above. |
||
assert set(acc) == set(range(100)), "Output is not the same" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the case of finite iterator though, all numbers should be returned; there is no cyclic iterator in place to produce the above effect. As a result in the test of the finite iterator we should assert the equality of the sets. |
||
enqueuer.stop() | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Between the validating that the queue is empty and checking that all threads are inactive, the final running thread might update the queue. The conditions for this to happen must be just right and it's difficult to reproduce. By placing sleep() calls in strategic places one can reproduce the bug.