Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process cross index queries in parallel #854

Closed

Conversation

ylogx
Copy link

@ylogx ylogx commented Sep 12, 2016

This reduced the processing time from 10hrs to less than 2hrs on a machine with 24 cores (100Gig RAM) and a dataset of size ~635,000.

screen shot 2017-03-07 at 1 38 29 am

futures = []
process_chunk = functools.partial(_query_chunk, index=self)

chunks = list(self.iter_chunks())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better avoid loading entire index into memory by loading the chunk inside the executor.


def _query_chunk(chunk, index):
""" Allow pickling """
return list(_query_chunk_gen(chunk, index))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is an example use of this method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workers can't return a generators, since generator can't be pickled. This method's sole purpose in life is to allow returning results from a worker thread/process.

for chunk in self.iter_chunks():
if chunk.shape[0] > 1:
for sim in self[chunk]:
with concurrent.futures.ProcessPoolExecutor() as executor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how big is the speed up from this parallelizatoin?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends on the size of machine, for me it took down a task that took 10hrs down to ~2hrs.
What is the standard way to measure perf?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that sounds like a great speed-up. Could you add description of your machine, data size and speed up into the pr description at the top?

Copy link
Contributor

@tmylk tmylk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid loading entire index into memory

@tmylk
Copy link
Contributor

tmylk commented Sep 15, 2016

Thanks for the PR!
Please add CHANGELOG and state how much improvement this parallelization gives.

@tmylk tmylk added the difficulty easy Easy issue: required small fix label Sep 24, 2016
@piskvorky
Copy link
Owner

For MatrixSimilarity, each chunk is already processed using multiple threads (if BLAS is configured to use threads). So more parallelization likely won't help, and may hurt (more context switching, memory bus contention).

Could be useful for other types of indexes though (SparseMatrixSimilarity, Annoy etc).

We cannot use py3k-only features, but that shouldn't be hard to replace/backport.

@tmylk tmylk added the wishlist Feature request label Jan 25, 2017
@ylogx ylogx force-pushed the feature/parallel_cross_similarity branch from d51c5bc to 4fa50f5 Compare March 6, 2017 19:48
ylogx added 2 commits March 7, 2017 01:22
* Iterate through the chunks instead of loading in memory
* Make work an inner method in worker
@ylogx ylogx force-pushed the feature/parallel_cross_similarity branch from 4fa50f5 to a095312 Compare March 6, 2017 19:52
@ylogx
Copy link
Author

ylogx commented Mar 6, 2017

@tmylk Made the changes.

@tmylk
Copy link
Contributor

tmylk commented Mar 6, 2017

Thanks for the changes. Before merging will need to see the gensim/test/simspeed2.py benchmark before and after the parallelisation.

Ideally this parallelisation should be in SimilarityABC so that SparseMatrixSimilarity and WMDSimilarity could take advantage of it as well, but not MatrixSimilarity as it uses numpy.dot which is parallelised for dense matrices. I will create another improvement issue for this, unless you wish to make it as a part of this PR. It requires changes to MatrixSimilarity to not use parallelisation and to Similarity to use iter code from SimilarityABC.

@piskvorky
Copy link
Owner

The feature is nice in principle, but the implementation has to be carefully tested on large data. A naive multiprocessing.map will take up all memory, I'm afraid we'll need proper input/output queueing.

yield sim
else:
yield self[chunk]
import multiprocessing
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imports at the top of the file please.

yield self[chunk]
import multiprocessing
import functools
pool = multiprocessing.Pool()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many workers / processes?

import functools
pool = multiprocessing.Pool()
worker = functools.partial(_query_chunk_worker, index=self)
for result in pool.map(worker, self.iter_chunks()):
Copy link
Owner

@piskvorky piskvorky Mar 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be problematic: multiprocessing has no queue limits, it keeps feeding the input (and output) queue constantly, while it can. This will blow up the memory (queue up all chunks), for large enough index, for slow enough workers, or for slow enough result consumption after yield.

In other words, we need to tell the process that feeds the input queue to block if there are too many tasks pending already. Likewise if there are too many results in the output queue, waiting to be yielded.

@piskvorky piskvorky added difficulty medium Medium issue: required good gensim understanding & python skills and removed difficulty easy Easy issue: required small fix labels Mar 18, 2017
@ludakas
Copy link

ludakas commented Mar 23, 2017

I am examining the requested changes and I would like to ask if I understand few things correctly and an advice on expected behaviour.

I suppose that the benchmark is the simspeed2.py in gensim/test directory. It fails for me on the second test. Is it supposed to fail, is this the bug I should correct? (it fails in "correct" place where the blocking should be implemented..)
Here is the simspeed2.py benchmark output for reference:
https://gist.github.com/ludakas/31e78c9ed8adeacfdf6228052e5a2b7a

blocking - if I understand it right the "pool.map(worker, self.iter_chunks())" in "__iter__" function (docsim.py) loads all data which are slowly processed and it fills the memory. I was reading about Queue class in multiprocessing. Intuitively the iter_chunks should be entering the queue and only when needed passed to a worker which computes the result. Is the intuition correct?
I am not sure how to translate this into code with pool.map. I found examples which were using Process..

number of workers - this seems easy using the PARALLEL_SHARDS which are declared at the beginning and passing it to the Pool constructor. However I do not understand why it is commented? How should the pool behave if the multiprocessing is disabled?

reference from the beginning:

try:
    import multiprocessing
    # by default, don't parallelize queries. uncomment the following line if you want that.
    # PARALLEL_SHARDS = multiprocessing.cpu_count() # use #parallel processes = #CPus
except ImportError:
    pass

@tmylk
Copy link
Contributor

tmylk commented Mar 27, 2017

@ludakas does simspeed2.py fail even in the develop branch outside of this PR?

For queue limits see example in MulticoreLDA

The num_workers should be a parameter passed in constructor.

@ludakas
Copy link

ludakas commented Mar 29, 2017

@tmylk yes simspeed2.py fails even in the develop branch on the second test, same as in this PR.

Thanks for the queue limits example, however I encountered another problem even before I got to the queues.

At the top of the docsim.py file is the import of multiprocessing in try except with an option to uncomment setting the number of PARALLEL_SHARDS to the number of cpus. When I uncomment it the code fails with the following error:
AssertionError: daemonic processes are not allowed to have children
-->multiprocessing does not allow for the nested pools, which is the case in this situation. I tried a dirty version of creating sub-class of multiprocesing.pool.Pool described in the stackoverflow post below, but the code never finished, maybe my mistake..(it run more than 30 minutes, normally it takes 4 minutes for simspeed2.py).
http://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic

I made another observation - when I keep the PARALLEL_SHARDS commented out (implying query_shards being sequential) as is default in the branch and I remove the parallelism from __iter__ (here should be implemented the queue) and only keeps there standard for loop shown below, the simspeed2.py runs in only 40 seconds, passing all the tests while with the parallel pool takes 4 minutes to run and it fails the second test. I am not sure what I am doing wrong or whether there is some bug already.

# modified, sequential, runs in 40 seconds
for result in self.iter_chunks():
    for sim in result:
        yield sim

# original, parallel, runs in 4 minutes
pool = multiprocessing.Pool()
worker = functools.partial(_query_chunk_worker, index=self)
for result in pool.map(worker, self.iter_chunks()):
    for sim in result:
        yield sim
pool.terminate()

@tmylk
Copy link
Contributor

tmylk commented Mar 30, 2017

@ludakas Thanks for running the benchmark. A line_profiler run is required to find out in more detal, but it is not too surprising that adding parallelism speeds up some cases and slows down others.

@piskvorky
Copy link
Owner

My 2 cents: that PARALLEL_SHARDS code that is commented out is ancient and should probably be just removed. Simply uncommenting it is certainly not expected to work, or do anything meaningful.

@tmylk
Copy link
Contributor

tmylk commented May 2, 2017

It seems that the parallelisation doesn't pass the benchmark with PARALLEL_SHARDS commented out:40 seconds old vs 240 seconds new.

@tmylk tmylk closed this May 2, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
difficulty medium Medium issue: required good gensim understanding & python skills wishlist Feature request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants