-
Notifications
You must be signed in to change notification settings - Fork 367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multithreaded custom grouped operations with single-row result #2588
Conversation
1cc8880
to
d2bef41
Compare
I agree that it is better to statically split work. In rare cases one thread will get much more work, but I think most of the time it is good. This is complex. Let me summarize the logic here:
So the invariant the threading code outside of the
|
That's right. This also assumes that the order in which we call |
Yes - I assumed it is associative 😄. |
Spawn one task per thread in `_combine_rows_with_first!` so that custom grouped operations that return a single row are run in parallel. This is optimal if operations take about the same time for all groups. Spawning one task per group could be faster if these times vary a lot, but the overhead would be larger: we could add this as an option later. The implementation is somewhat tricky as output columns need to be reallocated when a new return type is detected.
35eacfa
to
135512b
Compare
CI fails |
Do you have some comparable benchmarks of e.g.:
or
(and with more similar arguments) to see the performance? If not I can run some |
No I haven't tested that yet. I can try later if you don't beat me to it. |
A quck tests show that we get the benefits, but in normal situations everything is swamped by the compilation cost for a new anonymous function when doing it in an interactive mode. |
Can you please also add a manual section for threading (maybe even a separate page)? (I mean that threading is so important topic that it should be somewhere on top of the manual - not buried deep in the description of grouping operations and potentially in the future we will have more things implemented there; e.g. |
# This has lower overhead than creating one task per group, | ||
# but is optimal only if operations take roughly the same time for all groups | ||
@static if VERSION >= v"1.4" | ||
basesize = max(1, (len - 1) ÷ Threads.nthreads()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is incorrect. as div
rounds down and we spawn one thread too many. Eg.:
julia> max(1, (20 - 1) ÷ 4)
4
julia> collect(Iterators.partition(2:20, 4))
5-element Array{UnitRange{Int64},1}:
2:5
6:9
10:13
14:17
18:20
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woops. This probably slows down things significantly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you change this could you please report the benchmarks? Thank you.
else | ||
partitions = (2:len,) | ||
end | ||
widen_type_lock = ReentrantLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you create a new lock and not use the GroupedDataFrame
lock? Is the other lock used in the part of code that does parallel processing of different transformations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory AFAICT we don't have to take gd.lazy_lock
, i.e. it should be possible to compute indices in parallel without conflicting with what we're doing here. But yeah, since the code here requires indices to have been computed, gd.lazy_lock
will never be locked when we are here, so it doesn't make a difference and I can reuse gd.lazy_lock
for simplicity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking about it actually it is OK to use a separate lock I think. The reason is that if you run 2 transformations that produce one row then you want separate locks for both of them.
Still - as commented below - it would be good to have a benchmark of something like:
combine($gd, :y => (y -> sum(y)) => :y1, :y => (y -> sum(y)) => :y2);
(so that we can see what happens to the performance when we run in parallel two transformations that themselves get run in parallel)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes that's probably why I used separate locks. :-)
I'll run more benchmarks after fixing the PR.
PR looks good - there is one problem I think (with the number of tasks spawned). The rest of the comments are minor. |
I've updated the benchmarks. The PR seems consistently faster than main, except with 1000 rows and 10 groups, where some cases (but not all) regressed a lot in relative terms (but these operations are super fast so I'm not sure it matters). Even with only 1000 rows the PR is faster when the number of groups is large -- what's funny is that it's the case even with a single thread. Not sure what has been improved, maybe some type instability was fixed. |
Looks good. Thank you
Just to be sure: apart from benchmarking have you run more correctness tests? Also I think it would be good to add printing
or something smilar around https://github.com/JuliaData/DataFrames.jl/blob/main/test/runtests.jl#L11 (so that we do not only see a warning when threading is not tested, but also an information when threading actually is tested how many threads are used). |
What is left for this PR? (except maybe the corner case of threading with CategoricalArrays.jl we discussed recently - but I guess you concluded we do not need to fix it - right?) |
Actually I thought it was merged already. :-D I've added Regarding the CategoricalArrays issue, I had implemented a check to disable threading when input and output columns contained Of course this implementation doesn't actually fix the thread safety issue in general, since assigning a value other than What remains problematic is if |
OK - thank you. Will you add the lock you discuss in CategoricalArrays.jl? (probably we only need it if more than 1 thread is present) |
Actually I spoke too soon. These issues are really tricky. Even if we replace the pool, we have to recode existing refs if a level is added at the front of the pool, and while one thread is going that, another one may continue to assign values based on the old pool. And even if it used the new pool values may get recoded even if they were already correct. I've still not been able to trigger this kind of bug, but it's probably safer to disable multithreading when the output is |
I agree - let us disable multithreading in this case. |
This is a continuation of (though it's mostly orthogonal to) #2574.
Spawn one task per thread in
_combine_rows_with_first!
so that custom grouped operations that return a single row are run in parallel. This is optimal if operations take about the same time for all groups. Spawning one task per group could be faster if these times vary a lot, but the overhead would be larger: we could add this as an option later.The implementation is somewhat tricky as output columns need to be reallocated when a new return type is detected.
Benchmarks for a relatively fast operation such as (non-fast-path)
sum
(which should be the worst case for this reason) show nice speedups with multiple threads even for small number of rows, and a limited overhead when a single thread is available.