Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing finite generator race condition #8162

Merged
merged 2 commits into from
Oct 19, 2017

Conversation

datumbox
Copy link
Contributor

I spotted a race condition on my previous PR #8104. The problem affects only the case of finite generators and has no effect on the infinite ones. This PR fixes the bug and updates the test to capture similar problems.

Please check the github comments on the code where I explain where the problem was and how I fixed it. Feedback is always welcome! :)

Check again for empty queue after detecting that all threads have finished.
Copy link
Contributor Author

@datumbox datumbox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've put some comments below to explain the changes. This was a sneaky bug that was quite hard to reproduce. The PR is ready to merge from my side, unless someone spots anything fishy in which case I'm happy to update it.

@@ -654,7 +654,7 @@ def get(self):
yield inputs
else:
all_finished = all([not thread.is_alive() for thread in self._threads])
if all_finished:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Between the validating that the queue is empty and checking that all threads are inactive, the final running thread might update the queue. The conditions for this to happen must be just right and it's difficult to reproduce. By placing sleep() calls in strategic places one can reproduce the bug.

@@ -654,7 +654,7 @@ def get(self):
yield inputs
else:
all_finished = all([not thread.is_alive() for thread in self._threads])
if all_finished:
if all_finished and self.queue.empty():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By checking for the second time if the queue is empty we can fix the race condition problem. If all the threads are inactive and the queue is empty, no further updates will happen on the queue. Thus it is safe to raise a StopIteration() exception. Nevertheless if all the threads are inactive and the last thread managed to update the queue, we will sleep and loop again until we exhaust the items of the queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty is not thread_safe. Please use self.queue.join()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I check that all threads are not alive before that. I should be covered right?

@@ -264,7 +264,7 @@ def test_finite_generator_enqueuer_threads():
acc = []
for output in gen_output:
acc.append(int(output[0, 0, 0, 0]))
assert len(set(acc) - set(range(100))) == 0, "Output is not the same"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is copied from the original test_generator_enqueuer_threads() test. This is not a great assertion as:
len(set() - set(range(100))) == 0

I understand the reason why the original test asserts this way. Due to multi-threading, the final elements of the iterator (numbers 95-99) might not appear in the first 100 results. It is possible that the cyclic iterator will put in the queue first the initial items (numbers 0-5) followed by the last items (numbers 95-99). Hence checking the equality of the sets is not possible and the next best thing we can test for is the above.

@@ -264,7 +264,7 @@ def test_finite_generator_enqueuer_threads():
acc = []
for output in gen_output:
acc.append(int(output[0, 0, 0, 0]))
assert len(set(acc) - set(range(100))) == 0, "Output is not the same"
assert set(acc) == set(range(100)), "Output is not the same"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of finite iterator though, all numbers should be returned; there is no cyclic iterator in place to produce the above effect. As a result in the test of the finite iterator we should assert the equality of the sets.

Copy link
Member

@fchollet fchollet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks

@fchollet fchollet merged commit 9c79e01 into keras-team:master Oct 19, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants