-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Process cross index queries in parallel #854
Conversation
gensim/similarities/docsim.py
Outdated
futures = [] | ||
process_chunk = functools.partial(_query_chunk, index=self) | ||
|
||
chunks = list(self.iter_chunks()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better avoid loading entire index into memory by loading the chunk inside the executor.
gensim/similarities/docsim.py
Outdated
|
||
def _query_chunk(chunk, index): | ||
""" Allow pickling """ | ||
return list(_query_chunk_gen(chunk, index)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is an example use of this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workers can't return a generators, since generator can't be pickled. This method's sole purpose in life is to allow returning results from a worker thread/process.
gensim/similarities/docsim.py
Outdated
for chunk in self.iter_chunks(): | ||
if chunk.shape[0] > 1: | ||
for sim in self[chunk]: | ||
with concurrent.futures.ProcessPoolExecutor() as executor: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how big is the speed up from this parallelizatoin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depends on the size of machine, for me it took down a task that took 10hrs down to ~2hrs.
What is the standard way to measure perf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that sounds like a great speed-up. Could you add description of your machine, data size and speed up into the pr description at the top?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid loading entire index into memory
Thanks for the PR! |
For Could be useful for other types of indexes though ( We cannot use py3k-only features, but that shouldn't be hard to replace/backport. |
d51c5bc
to
4fa50f5
Compare
* Iterate through the chunks instead of loading in memory * Make work an inner method in worker
4fa50f5
to
a095312
Compare
@tmylk Made the changes. |
Thanks for the changes. Before merging will need to see the Ideally this parallelisation should be in |
The feature is nice in principle, but the implementation has to be carefully tested on large data. A naive |
yield sim | ||
else: | ||
yield self[chunk] | ||
import multiprocessing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Imports at the top of the file please.
yield self[chunk] | ||
import multiprocessing | ||
import functools | ||
pool = multiprocessing.Pool() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How many workers / processes?
import functools | ||
pool = multiprocessing.Pool() | ||
worker = functools.partial(_query_chunk_worker, index=self) | ||
for result in pool.map(worker, self.iter_chunks()): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be problematic: multiprocessing has no queue limits, it keeps feeding the input (and output) queue constantly, while it can. This will blow up the memory (queue up all chunks), for large enough index, for slow enough workers, or for slow enough result consumption after yield
.
In other words, we need to tell the process that feeds the input queue to block if there are too many tasks pending already. Likewise if there are too many results in the output queue, waiting to be yielded.
I am examining the requested changes and I would like to ask if I understand few things correctly and an advice on expected behaviour. I suppose that the benchmark is the simspeed2.py in gensim/test directory. It fails for me on the second test. Is it supposed to fail, is this the bug I should correct? (it fails in "correct" place where the blocking should be implemented..) blocking - if I understand it right the "pool.map(worker, self.iter_chunks())" in "__iter__" function (docsim.py) loads all data which are slowly processed and it fills the memory. I was reading about Queue class in multiprocessing. Intuitively the iter_chunks should be entering the queue and only when needed passed to a worker which computes the result. Is the intuition correct? number of workers - this seems easy using the PARALLEL_SHARDS which are declared at the beginning and passing it to the Pool constructor. However I do not understand why it is commented? How should the pool behave if the multiprocessing is disabled? reference from the beginning:
|
@ludakas does For queue limits see example in MulticoreLDA The |
@tmylk yes Thanks for the queue limits example, however I encountered another problem even before I got to the queues. At the top of the I made another observation - when I keep the PARALLEL_SHARDS commented out (implying
|
@ludakas Thanks for running the benchmark. A |
My 2 cents: that |
It seems that the parallelisation doesn't pass the benchmark with |
This reduced the processing time from 10hrs to less than 2hrs on a machine with 24 cores (100Gig RAM) and a dataset of size ~635,000.