Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extending GeneratorEnqueuer to handle finite generators. #8104

Merged
merged 7 commits into from
Oct 16, 2017

Conversation

datumbox
Copy link
Contributor

The GeneratorEnqueuer class is used by Keras to generate batches of data. Even though it's an internal class, it can be used for building fast training and prediction pipelines that do not use the *_generator() methods. One core assumption of the class is that the underlying generator must be able to produce infinite amount of data. This is useful when we perform training but can be limiting when we want to make predictions (especially when you combine Spark + Keras).

This PR extends GeneratorEnqueuer to handle finite generators and it was tested both with threads and processes. The main idea is that we catch the StopIteration which is raised when the underlying generator is exhausted and gracefully exit the thread. When we call get() on the enqueuer instead of sleeping directly when the queue is empty, we check if any of the threads are active. If all of them are inactive we raise a StopIteration exception, else if at least one thread is still running we sleep.

The changes should have no effect if the underlying iterator is infinite. When it is finite, we raise a StopIteration exception once the underlying generator is exhausted to inform the user of the class that the data have been processed.

Copy link
Member

@fchollet fchollet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will require a unit test.

@@ -536,11 +536,12 @@ def stop(self, timeout=None):

class GeneratorEnqueuer(SequenceEnqueuer):
"""Builds a queue out of a data generator.
The provided generator can be finite in which case the class will throw a StopIteration exception.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduce a line break after the first docstring line, and format the line you added into 2 shorter lines. Add code delimiters aroundStopIteration`.

@datumbox
Copy link
Contributor Author

@fchollet Roger. I'll send an update next week. :)

@datumbox
Copy link
Contributor Author

@fchollet I think it's ready from my part. Please have a look and let me know.

I added two additional tests that cover the case of the finite generator (one with threads and one with processes). I also noticed that the PR is partially covered by the two test_generator_enqueuer_fail_* tests. Apologies for the multiple commits; I had a hard time with PEP8 policies as I don't have it installed on my system. I can squash the commits if you want, but I think you typically do this on merge.



def test_finite_generator_enqueuer_processes():
enqueuer = GeneratorEnqueuer(create_finite_generator_from_sequence_pcs(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dref360 Is not this tested directly in the for loop? The StopIterator causes the for loop to exit. Do you mean place a next() call after the end of the loop to show it throws an exception?

Copy link
Member

@fchollet fchollet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks

@fchollet fchollet merged commit f940d22 into keras-team:master Oct 16, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants