Skip to content

Fixing finite generator race condition #8162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion keras/utils/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ def get(self):
yield inputs
else:
all_finished = all([not thread.is_alive() for thread in self._threads])
if all_finished:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Between the validating that the queue is empty and checking that all threads are inactive, the final running thread might update the queue. The conditions for this to happen must be just right and it's difficult to reproduce. By placing sleep() calls in strategic places one can reproduce the bug.

if all_finished and self.queue.empty():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By checking for the second time if the queue is empty we can fix the race condition problem. If all the threads are inactive and the queue is empty, no further updates will happen on the queue. Thus it is safe to raise a StopIteration() exception. Nevertheless if all the threads are inactive and the last thread managed to update the queue, we will sleep and loop again until we exhaust the items of the queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty is not thread_safe. Please use self.queue.join()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I check that all threads are not alive before that. I should be covered right?

raise StopIteration()
else:
time.sleep(self.wait_time)
2 changes: 1 addition & 1 deletion tests/keras/utils/data_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def test_finite_generator_enqueuer_threads():
acc = []
for output in gen_output:
acc.append(int(output[0, 0, 0, 0]))
assert len(set(acc) - set(range(100))) == 0, "Output is not the same"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is copied from the original test_generator_enqueuer_threads() test. This is not a great assertion as:
len(set() - set(range(100))) == 0

I understand the reason why the original test asserts this way. Due to multi-threading, the final elements of the iterator (numbers 95-99) might not appear in the first 100 results. It is possible that the cyclic iterator will put in the queue first the initial items (numbers 0-5) followed by the last items (numbers 95-99). Hence checking the equality of the sets is not possible and the next best thing we can test for is the above.

assert set(acc) == set(range(100)), "Output is not the same"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of finite iterator though, all numbers should be returned; there is no cyclic iterator in place to produce the above effect. As a result in the test of the finite iterator we should assert the equality of the sets.

enqueuer.stop()


Expand Down