Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster joins meta issue #2340

Closed
bkamins opened this issue Jul 31, 2020 · 50 comments · Fixed by #2641
Closed

Faster joins meta issue #2340

bkamins opened this issue Jul 31, 2020 · 50 comments · Fixed by #2641
Labels
Milestone

Comments

@bkamins
Copy link
Member

bkamins commented Jul 31, 2020

Here are the things I think joins can take advantage of to improve performance:

  • allow joins on GroupedDataFrame
  • taking into account the cardinality of rows of joined tables
  • being able to take information that the data frame is already sorted on on variable
  • being able to take information that the on variable defines unique unique rows in one or both of the joined tables
  • take into account if the columns are "pooled" (this already happens)
  • maybe special casing for single-column of bitstype joins
  • maybe special algorithms for concrete joins (like innerjoin which is probably most common)

Please comment on what you think or have implemented to move forward with this issue (also keeping in mind that in the long term we might want multi-threading here). Also we should think of memory footprint of the algorithms we use.

@bkamins
Copy link
Member Author

bkamins commented Aug 1, 2020

Here is a basic benchmark I would start with (it can be extended easily):

julia> using DataFrames, Random

julia> n1 = 10^7 # nobs in df1
10000000

julia> n2 = 10^6 # nobs in df2
1000000

julia> ul1 = 10^7 # unique ids in df1
10000000

julia> ul2 = 10^6 # unique ids in df2
1000000

julia> issort = true # if the data is presorted
true

julia> @assert n1 / 2 < ul1 <= n1

julia> @assert n2 / 2 < ul2 <= n2

julia> df1 = DataFrame(id=[1:ul1; 1:(n1-ul1)], d1=1:n1);

julia> df2 = DataFrame(id=[1:ul2; 1:(n2-ul2)], d2=1:n2);

julia> @show n1, ul1, n2, ul2
(n1, ul1, n2, ul2) = (10000000, 10000000, 1000000, 1000000)
(10000000, 10000000, 1000000, 1000000)

julia> @info "sorted"
[ Info: sorted

julia> @time innerjoin(df1, df2, on=:id);
 17.976224 seconds (40.00 M allocations: 1.900 GiB, 1.90% gc time)

julia> @time leftjoin(df1, df2, on=:id);
 18.301490 seconds (40.00 M allocations: 2.684 GiB, 1.73% gc time)

julia> @time rightjoin(df1, df2, on=:id);
  1.138355 seconds (4.00 M allocations: 915.740 MiB, 3.42% gc time)

julia> @time outerjoin(df1, df2, on=:id);
 18.862897 seconds (40.00 M allocations: 2.704 GiB, 3.34% gc time)

julia> @time innerjoin(df2, df1, on=:id);
  1.109695 seconds (4.00 M allocations: 898.573 MiB, 2.15% gc time)

julia> @time leftjoin(df2, df1, on=:id);
  1.182023 seconds (4.00 M allocations: 914.785 MiB, 8.23% gc time)

julia> @time rightjoin(df2, df1, on=:id);
 18.198670 seconds (40.00 M allocations: 2.545 GiB, 2.05% gc time)

julia> @time outerjoin(df2, df1, on=:id);
  1.636731 seconds (4.00 M allocations: 1.608 GiB, 10.24% gc time)

julia> Random.seed!(1234);

julia> shuffle!(df1.id);

julia> shuffle!(df2.id);

julia> @info "shuffled"
[ Info: shuffled

julia> @time innerjoin(df1, df2, on=:id);
 19.280719 seconds (40.00 M allocations: 1.900 GiB, 2.55% gc time)

julia> @time leftjoin(df1, df2, on=:id);
 20.846783 seconds (40.00 M allocations: 2.684 GiB, 1.62% gc time)

julia> @time rightjoin(df1, df2, on=:id);
  1.809123 seconds (4.00 M allocations: 915.740 MiB, 5.66% gc time)

julia> @time outerjoin(df1, df2, on=:id);
 21.042610 seconds (40.00 M allocations: 2.704 GiB, 2.25% gc time)

julia> @time innerjoin(df2, df1, on=:id);
  1.697177 seconds (4.00 M allocations: 898.573 MiB, 0.94% gc time)

julia> @time leftjoin(df2, df1, on=:id);
  1.779558 seconds (4.00 M allocations: 914.785 MiB, 6.80% gc time)

julia> @time rightjoin(df2, df1, on=:id);
 19.613252 seconds (40.00 M allocations: 2.545 GiB, 2.52% gc time)

julia> @time outerjoin(df2, df1, on=:id);
  2.218243 seconds (4.00 M allocations: 1.608 GiB, 6.91% gc time)

It already shows one thing we know of - the joins are sensitive to order of arguments. I.e. have a look at @time innerjoin(df1, df2, on=:id); and @time innerjoin(df2, df1, on=:id);. This could be relatively easily extended, but the problem is that it will affect the row order in the result.

@shashi - I think this is a good benchmark to start from

for a reference. In R data table has the following example timings (using 1 thread):

sorted data:

> system.time(merge(df1, df2, all=FALSE)) # innerjoin
   user  system elapsed 
   0.28    0.01    0.30 
> system.time(merge(df1, df2, all=TRUE)) # outerjoin
   user  system elapsed 
   0.66    0.14    0.80 

unsorted data:

> system.time(merge(df1, df2, all=FALSE)) # innerjoin
   user  system elapsed 
   1.41    0.07    1.48 
> system.time(merge(df1, df2, all=TRUE)) # outerjoin
   user  system elapsed 
   4.33    0.35    4.67 

So clearly it takes advantage of the fact that data is sorted. But for unsorted data it would seem that we should almost as good as data.table if only we fixed the problem with ignoring the nrow of df1 and df2 in joins.

And in H2O benchmarks we still have a problam with 50GB data with memory management.

@bkamins
Copy link
Member Author

bkamins commented Aug 2, 2020

If we were to go for a shared package that would perform joins, then the API from DataFrames.jl perspective that would be nice would look like:

getjoin(left::NTuple{AbstractVector}, right::NTuple{AbstractVector}, kind, leftunique, leftsorted, rightunique, rightsorted, dropmissing, threads)

where:

  • left would be a tuple of vectors from the left table to join on
  • right would be a tuple of vectors from the left table to join on (where column numbers with left would be matched)
  • kind would be: inner, left, right, outer, not inner (we currently do not support not inner; you would have to do two joins to get it), semijoin, antijoin
  • leftunique: hint if left table has unique rows (we can drop it if the algorithm would not use it)
  • leftsorted: hint if left table is sorted (this for sure will be handy)
  • rightunique: hint if right table has unique rows (we can drop it if the algorithm would not use it)
  • rightsorted: hint if right table is sorted (this for sure will be handy)
  • dropmissing if rows with missing values should be dropped or matched (we currently match them and we do not have this option)
  • threads some way of telling if or how many threads should be used to perform the operation

The return value should be a tuple of two vectors:

  • indexing vector for the left table
  • indexing vector for the right table (nothing for semijoin and antijoin)

so that after applying these vectors to the rows of the original tables we can hcat them to get the result.


in the future also non-equi joins could be supported (and because of such extensions I believe that it would be better to have a package doing this as all this is non-DataFrames.jl specific).

@bkamins
Copy link
Member Author

bkamins commented Aug 2, 2020

It was mentioned to check SplittApplyCombine.jl innerjoin function and it seems it is slower even than what we have now in DataFrames.jl (CC @andyferris as maybe I have made some gross mistake).

@andyferris
Copy link
Member

My recent testing (I think with TypedTables) showed the same thing, so probably not a mistake. (I did think it was relatively performant in some tests done in the past, though, so I was a bit confused, but I haven’t dug into it yet).

@andyferris
Copy link
Member

If you used it naively with a DataFrame though it would perform extremely poorly!

@bkamins
Copy link
Member Author

bkamins commented Aug 3, 2020

No - I used what you show in the manual:

julia> @time innerjoin(identity, identity, tuple, 1:10^7, 1:10^6); # this is already after compilation
  2.318715 seconds (15.00 M allocations: 616.420 MiB, 25.71% gc time)

julia> using DataFrames
WARNING: using DataFrames.innerjoin in module Main conflicts with an existing identifier.

julia> df1 = DataFrame(x=1:10^7); df2 = DataFrame(x=1:10^6);

julia> DataFrames.innerjoin(df1, df2, on=:x); # compile

julia> @time DataFrames.innerjoin(df1, df2, on=:x); # this we know is slow and is relatively easy to fix
 17.391257 seconds (40.00 M allocations: 1.878 GiB, 2.08% gc time)

julia> @time DataFrames.innerjoin(df2, df1, on=:x); # but here we are 2x faster already
  1.143421 seconds (4.00 M allocations: 875.685 MiB, 3.46% gc time)

julia> @time innerjoin(identity, identity, tuple, 1:10^6, 1:10^6); # another example now left and right are symmetric
  2.101644 seconds (15.00 M allocations: 631.420 MiB, 33.57% gc time)

julia> @time DataFrames.innerjoin(df2, df2, on=:x); # and we are faster again
  0.558084 seconds (4.00 M allocations: 275.033 MiB, 13.71% gc time)

@andyferris
Copy link
Member

OK - try the upcoming SplitApplyCombine 1.1.2. I fixed some type instabilities that speeds up the first example by 3x.

@andyferris
Copy link
Member

andyferris commented Aug 4, 2020

OK here are some benchmarks I just ran in Julia 1.5:

julia> using SplitApplyCombine, DataFrames, BenchmarkTools

julia> @btime SplitApplyCombine.innerjoin(identity, identity, tuple, 1:10^7, 1:10^6);
  466.977 ms (2000074 allocations: 189.24 MiB)

julia> @btime SplitApplyCombine.innerjoin(identity, identity, tuple, 1:10^6, 1:10^7); # This is the slow way for SplitApplyCombine (like you say, easy to fix)
  4.172 s (20000092 allocations: 1.74 GiB)

julia> @btime SplitApplyCombine.innerjoin(identity, identity, tuple, 1:10^6, 1:10^6);
  187.345 ms (2000074 allocations: 189.24 MiB)

julia> df1 = DataFrame(x=1:10^7); df2 = DataFrame(x=1:10^6);

julia> @btime DataFrames.innerjoin(df1, df2, on=:x);
  9.057 s (19999687 allocations: 854.87 MiB)

julia> @btime DataFrames.innerjoin(df2, df1, on=:x);
  724.453 ms (1999687 allocations: 768.87 MiB)

julia> @btime DataFrames.innerjoin(df2, df2, on=:x);
  324.604 ms (1999687 allocations: 168.22 MiB)

ProfileView showed most of the SplitApplyCombine time was dealing with hashes, hashtables and pushing the data to the output array. Altering the approach may lead to some performance benefits. I didn't gain any particular insights from doing the same for DataFrames but @bkamins you might understand the bottlenecks better than me.

@bkamins
Copy link
Member Author

bkamins commented Aug 4, 2020

In DataFrames.jl this should be similar - hashing is the biggest cost. The timings you show look "good" in comparison to data.table.

So my conclusion is: that using SplitApplyCombine.jl could be a good backend to use for joining in DataFrames.jl, provided that you would be OK to (of course I can join the efforts, but I have learnt that it is best to have a commitment from the core developer for large changes in the package):

  1. agree to work on multithreading support in the future.
  2. agree to add support for all kinds of joins
  3. agree to fix # This is the slow way for SplitApplyCombine (like you say, easy to fix)
  4. agree to add more algorithms for special cases (like issorted hinting as this allows us to skip hashing and will be much faster, very often grouping columns are sorted in practice)

Sorry for the long list of requirements here, but I want Julia data ecosystem to be best in class 😄 and I know how hard such things can turn out to be to maintain in the long run. And thank you for your fast response. I believe that with points 1-4 (which seem relatively easy to achieve with the infrastructure you have now) we can easily compete with data.table in terms of performance of joins.

If we would agree to this on my side the first thing I will do is add SplitApplyCombine.jl as a dependency of DataFrames.jl and integrate your engine for innerjoin into it. A minor requirement would be if you could consider allowing returning a Tuple of Vectors rather than a Vector of Tuples from innerjoin (maybe as a Val argument to make return type type stable), as with this change it will be much faster later to compose an output DataFrame.

If anyone has some comment on this plan please chime in. And thank you @andyferris for your quick response - this is much appreciated and I believe that if you have some time to work on it we can relatively quickly have something that makes a new release that will be probably in 1-2 months look much better in H2O benchmarks.

@bkamins
Copy link
Member Author

bkamins commented Aug 4, 2020

@andyferris - as a note to remember to check this - we should benchmark the performance for column/columns of: Vector{Int}, PooledVector, CategoricalVector, SentinelVector and Vector{String}.

@andyferris
Copy link
Member

andyferris commented Aug 4, 2020

OK great - happy to help out and contributions are more than welcome. I should point out that I've found it difficult to put in siginficant time into open source work over the last 18 months or so. I'm still active but not at the level I was.

Just as a preamble - originally everything that went into SplitApplyCombine was designed so that it could be potentially be ported to Base later. (I think only is the only direct port to Base so far...). It is primarily relying on the iteration and indexing APIs, and so-on, and the semantics of the functions are defined this way (and everyone can add methods so long as they conform to the semantics). This changed slightly because I found Dict awkward with group (hence Dictionaries.jl) but that is still the idea. My personal point of view is dealing with nested structured data and with relational algebra should be equally supported out-of-the-box in Julia as LinearAlgebra is.

multithreading support in the future.

Yes, this would be nice. The package generally follows the kinds of patterns I saw in Base in Julia 1.0 (see preamble). My understanding of the community's direction is we'd be just as happy to have mutlithreading by default for Base.map and Base.filter as we are discussing for SplitApplyCombine.group and SplitApplyCombine.innerjoin.

add support for all kinds of joins

Yes I think this is easier with leftjoin and so-on becoming seperate generic functions. (EDIT: also note that leftgroupjoin is available for use, potentially even returning a GroupedDataFrame or similar).

agree to fix # This is the slow way for SplitApplyCombine (like you say, easy to fix)

Sure, I was just looking at that tonight. (I will need my beauty sleep though... 😴)

add more algorithms for special cases (like issorted hinting as this allows us to skip hashing and will be much faster, very often grouping columns are sorted in practice)

Yes - for sure. The algorithms can get cleverer and I think there are lots of good approaches here. I had good success in Dictionaries.distinct and Dictionaries.dictionary becoming much faster for collections that were already sorted without even needing issorted. Another way is to get packages like CSV.jl to add AcceleratedArray indices to columns that it detects as pre-sorted. Etc.

A minor requirement would be if you could consider allowing returning a Tuple of Vectors rather than a Vector of Tuples from innerjoin (maybe as a Val argument to make return type type stable), as with this change it will be much faster later to compose an output DataFrame.

I agree that we'll have to do something to make this wrangling easier. Like I said above, it would be good if the semantics of the generic function remained intact for all the methods... and I always intended to use something like SplitApplyCombine.invert to do this sort of transformation. Another approach is to overload _innerjoin or something like that, similar to what AcceleratedArrays does.

we should benchmark the performance for column/columns of: Vector{Int}, PooledVector, CategoricalVector, SentinelVector and Vector{String}.

For sure the Pooled and Categorical stuff will be slower than necessary, until they get specialized methods. I think I did one benchmark for grouping with strings that didn't seem spectacular. Similarly we need benchmarks for grouping by multiple columns - I think this was slower than I expected but that might be magically fixed in Julia 1.5? The other thing to be wary of is performance with Union{T, Missing}.

@bkamins
Copy link
Member Author

bkamins commented Aug 4, 2020

I always intended to use something like SplitApplyCombine.invert

This is problematic because it adds another pass (less a problem as this is cheap) and eats up memory (this is a real issue - we currently fail 50GB H2O tests because our current join algorithm is too memory greedy).

As a first steps could be:

  • fix # This is the slow way for SplitApplyCombine (like you say, easy to fix) (I understand you are looking into this)
  • improve handling of categorical-type collections (I can have a look at it if you want after the point above is done as this should be simple)
  • test integration of the code into DataFrames.jl for the cases we are sure it handles well, and fall back to the old approach otherwise (I will do it as this is tricky because there is a lot of pre- and post- processing that is messy)

@andyferris
Copy link
Member

This is problematic because it adds another pass (less a problem as this is cheap) and eats up memory (this is a real issue - we currently fail 50GB H2O tests because our current join algorithm is too memory greedy).

Sorry I probably wasn't clear - by "something like" I meant a lazy version of invert; we definitely shouldn't add a second pass or create any copies.

(I understand you are looking into this)

Yes you nerd sniped me, I'm working on it now, nearly done :) (I'll sleep another day...)

improve handling of categorical-type collections (I can have a look at it if you want after the point above is done as this should be simple)

I imagine it should go very similarly to how AcceleratedArrays handles joins with HashIndex - there is a _innerjoin! method.

@andyferris
Copy link
Member

WIP:

julia> @btime SplitApplyCombine.innerjoin(identity, identity, tuple, 1:10^7, 1:10^6);
  521.482 ms (2000074 allocations: 189.24 MiB)

julia> @btime SplitApplyCombine.innerjoin(identity, identity, tuple, 1:10^6, 1:10^7);
  514.161 ms (2000074 allocations: 189.24 MiB)

@andyferris
Copy link
Member

andyferris commented Aug 4, 2020

OK that's published. I wish I understood how to profile allocations. This (and also group) seem to produce lots of small allocations that I hoped would go away in Julia 1.5 but haven't. (EDIT master is no better).

@bkamins
Copy link
Member Author

bkamins commented Aug 4, 2020

@vtjnash - is there some best practice how to track the source of allocations that @andyferris observes. Any hints would be helpful. Thank you!

@bkamins
Copy link
Member Author

bkamins commented Aug 4, 2020

I meant a lazy version of invert

still - I would check if this would not be slower than two vectors, as it seems that two vector approach should be more cache friendly for later processing.

@vtjnash
Copy link
Contributor

vtjnash commented Aug 4, 2020

I have no special insights that I think Andy doesn't already have. Huda talked about some of the tools recently (https://www.youtube.com/watch?v=S5R8zXJOsUQ) and others have been working on a memory profile sometimes (JuliaLang/julia#33467)

@shashi
Copy link

shashi commented Aug 4, 2020

I did some benchmarks with IndexedTables, while there the order of tables does not affect the performance, it has similar or worse performance in good cases.

Is the plan to use SplitApplyCombine.jl to do this and SAC itself becomes aware of AcceleratedArrays (or just uses the right Base API that AcceleratedArrays magically speed up) so Tables implementations like CSV can use that to attach indexing metadata? @andyferris Benchmarks looking pretty good! But in this case an algorithm similar to merge sort could be faster than hashing right?

@bkamins
Copy link
Member Author

bkamins commented Aug 4, 2020

So my plan (and question to the community) is if we decide to have SplitApplyCombine.jl as a place to develop these algorithms as I believe we should bet on one package and jointly contribute to improve it.

Then all the things you ask for (like being indexing-aware or using a different algorithms in different cases) should be just gradually added to it by many contributors.

In short the question is - do we agree to choose SplitApplyCombine.jl as the place for such functionality? I vote yes!

@andyferris
Copy link
Member

andyferris commented Aug 4, 2020

SplitApplyCombine.jl to do this and SAC itself becomes aware of AcceleratedArrays (or just uses the right Base API that AcceleratedArrays magically speed up)

FYI AcceleratedArrays already accelerates SplitApplyCombine (group*, innerjoin and leftgroupjoin) and Base (filter, find*, in, count, unique) for certain "search predicates" (isequal, isless, in an interval, etc). It needs to be extended a bit (e.g. for predicates like == and < and a few more basic functions).

But in this case an algorithm similar to merge sort could be faster than hashing right?

Definitely - innerjoin beween two arrays with SortIndex uses a sort-merge-join algorithm - it is particularly fast between two UniqueSortIndex's. I'm not sure where my benchmarking code got to but I think putting these into CI or something could be a good next step for me.

@bkamins
Copy link
Member Author

bkamins commented Aug 9, 2020

I have worked with SplitApplyCombine.jl code, DataFrames.jl code and checked the H2O benchmarks (if someone wants - I have generated the 0.5GB test set they use).

The conclusions are:

  • in DataFrames.jl I need a joiner that is optimized for columnar storage (i.e. NTuple{Vararg{AbstractVector}} as left and right) - SplitApplyCombine.jl currently assumes row storage; this is a minor issue - I will see with what I end up (the joining code in the end turns out not that big, and most complexity will be in handling NTuple{Vararg{AbstractVector}} efficiently - probably we need @generated here)
  • the key performance bottleneck will be string comparisons. R interns strings so it is fast there. We have to thing of some smarter alternative. If we have two PooledVector{String} vectors the challenge is to avoid having to do isequal. I do not have a general solution to this yet unfortunately, so comments are welcome.

@shashi
Copy link

shashi commented Aug 9, 2020

If we have two PooledVector{String} vectors the challenge is to avoid having to do isequal. I do not have a general solution to this yet unfortunately, so comments are welcome.

The obvious algorithm would be: combine the unique pools of both, sort them (optional, depends on whether you need isless as well) to assign a rank to each unique value, replace the refs with the rank, then it should be a matter of doing an integer comparison on the new ref vectors... (Of course at this point you can replace the PooledArray with just a vector of integers and carry on with the join). I guess the tricky part is how do you make this happen in a generically written join... But it shouldn't be impossible.

@bkamins
Copy link
Member Author

bkamins commented Aug 9, 2020

Two quick thoughts from experiments I have been doing today:

combine the unique pools of both

This is the expensive part. Combining the pools like this can be done efficiently only if one of pools has low cardinality.

sort them

This is also out of question in practice. In H2O benchmarks data.table does a join of 10^7 vs. 10^7 table in 2.3 second. Just doing sorting would take 1.5 second. And this is the best case.

E.g. for a join of 10^7 vs 10^4 it takes 0.3s. So here sorting the 10^7 table is prohibitive.

In general have a look at https://h2oai.github.io/db-benchmark/ in joins/0.5GB section for the timings (as I have said - I have these data sets so I can share them, but they are to big to attach them here)

@bkamins
Copy link
Member Author

bkamins commented Aug 9, 2020

Here is a quick test (probably not optimal but showing the issue):

julia> x = PooledArray(rand(10^7));

julia> y = PooledArray(rand(10^7));

julia> @time append!(x, y); # this already is too slow vs the benchmark
  2.657543 seconds (36.06 k allocations: 455.983 MiB, 2.44% gc time)

julia> x = PooledArray([randstring(4) for i in 1:10^7])

julia> y = PooledArray([randstring(4) for i in 1:10^7])

julia> @time append!(x, y); # and this is just prohibitive - and shows in particular the overhead we have with isequal - so even if we fix things with merging pools we are slow
  5.731321 seconds (104.34 k allocations: 107.522 MiB)

@andyferris
Copy link
Member

a joiner that is optimized for columnar storage

Yeah agree. My attempt so far has been to stay within the collection-of-rows abstraction and use dispatch patterns to (a) fetch the one-or-few joining columns from each table and (b) choose the best algorithm. This is currently possible using TypedTables, SplitApplyCombine and AcceleratedArrays but that approach I believe could extend to DataFrames and PooledArrays without much trouble. This is the _innerjoin! function - the API and documentation for that could be cleaned up for sure.

the key performance bottleneck will be string comparisons

Also agree. I was looking at PooledArrays.jl, I see it's generally incremental and inserting the elements one-by-one (short glance indicated that append! will fall back to iterate setindex! via copyto!)... I believe with a bit of work it should be possible to come up with a faster approach that is a bit more "batch oriented" and only performs isequal once between each group with colliding (full) hashes and no hashes recomputed (using an implementation like a Dictionaries.jl Dictionary which retains the hash).

@bkamins
Copy link
Member Author

bkamins commented Aug 9, 2020

As for integration with SplitApplyCombine.jl here is a first pass #2359. I just used broadcasted getindex to transform to columnar approach for now, but it is not a bottleneck anyway. I get an improvement but it is way not enough. Judging by the H2O benchmark we have the biggest trouble in SplitApplyCombine.jl with high-cardinality joins - 10^7 vs 10^7 rows producing 9,000,000 intersection (so almost perfect matching).

(in general innerjoin code in SplitApplyCombine.jl is well written, so it is clear what is going on there 👍)

@andyferris
Copy link
Member

andyferris commented Aug 10, 2020

Cool. I'm supposing that the high-cardinality arena is where you would see the row-column-mangling slowest anyway, so that's not too discouraging right? Did you do any profiling to determine where the time is spent?

so it is clear what is going on there

Oh that's good to hear - I wasn't sure if people would appreciate the mapview / productview approach or not. :)

@andyferris
Copy link
Member

andyferris commented Aug 10, 2020

Yeah true - I need to use that in SplitApplyCombine for joins group etc too. Hmm... thanks!

@andyferris
Copy link
Member

signals that the user says the keys are unique

You reminded me, I should finish implementing that for UniqueIndex in AcceleratedArrays.

@andyferris
Copy link
Member

andyferris commented Aug 10, 2020

So - how far away from "good" (e.g. data.table) performance are we?

I'm looking at grouping (it's a bit simpler) and using the general approach you outline I get roughly 2x improvement:

julia> a = rand(1:10^6, 10^7);

julia> @btime SplitApplyCombine.groupfind2(a);
  1.535 s (83 allocations: 219.31 MiB)

julia> @btime SplitApplyCombine.groupfind(a);
  2.819 s (3664416 allocations: 338.50 MiB)

(If anyone is curious the newer algorithm is here)

@bkamins
Copy link
Member Author

bkamins commented Aug 10, 2020

Nice generic code - you are a master :).

Now regarding performance vs. data.table. I have distilled out the key part. Here are current (i.e. v1.1.3) timings:

julia> using SplitApplyCombine
julia> using Random
julia> x = shuffle(1:10^7);
julia> y = copy(x);
julia> y[1:10^6] .= 10^8 .+ (1:10^6);
julia> shuffle!(y);
julia> GC.gc(); @time SplitApplyCombine.innerjoin(identity, identity, tuple, x, y);
 14.313567 seconds (20.87 M allocations: 2.013 GiB, 43.01% gc time)
julia> GC.gc(); @time SplitApplyCombine.innerjoin(identity, identity, tuple, x, y);
 11.643759 seconds (20.00 M allocations: 1.972 GiB, 32.14% gc time)

Comments:

  1. we have a lot of allocations - probably an efficient "single pool" trick should alleviate that
  2. data.table does more than this code (this code does only a minimal operation, so some non-negligible overhead to construct tables is needed)

Now the same in R reconstructed:

> library(data.table)
data.table 1.13.0 using 4 threads (see ?getDTthreads).  Latest news: r-datatable.com
> setDTthreads(1) # we conentrate on 1-threaded algorithm for now
> getDTthreads()
[1] 1
> x <- sample(1:10^7)
> y <- x
> y[1:10^6] <- 10^8 + (1:10^6)
> y <- sample(y)
> dt1 <- datatable(id3=x)
> dt1 <- data.table(id3=x)
> dt2 <- data.table(id3=y)
> system.time(dt1[dt2, on='id3', nomatch=NULL])
   user  system elapsed 
  2.998   0.188   3.186 
> system.time(dt1[dt2, on='id3', nomatch=NULL])
   user  system elapsed 
  2.677   0.208   2.884 

So the gap is really wide - data.table is very good here. (and we are still working with Int, where we have a better situation than for factors)

@bkamins
Copy link
Member Author

bkamins commented Aug 10, 2020

What goes almost as good as data.table is:

julia> using Random

julia> x = shuffle(1:10^7);

julia> y = copy(x);

julia> y[1:10^6] .= 10^8 .+ (1:10^6);

julia> shuffle!(y);

julia> GC.gc(); @time SplitApplyCombine.innerjoin(identity, identity, tuple, x, y);
  3.705953 seconds (648.22 k allocations: 830.800 MiB, 2.39% gc time)

julia> GC.gc(); @time SplitApplyCombine.innerjoin(identity, identity, tuple, x, y);
  3.391843 seconds (121 allocations: 798.173 MiB, 0.16% gc time)

Now, how do I get it? The strategy is:

  1. Optimistically use dict = Dict{eltype(r), V}() instead of Dict{eltype(r), Vector{V}}()
  2. If you do not find any duplicates stay with it (and you are then fast)
  3. If you find first duplicate switch to Dict{eltype(r), Vector{V}}(), which is cheap, as all the work you have done up to this point can be just copied (keys are the same, only values need to be wrapped in a vector) and then use the slow path.

Of course in point 3 if we can be faster with the strategy with single vector giving permutation for groups then we can use it as well. The point is that unique can be cheaply detected (i.e. the cost of conversion if it happens that we have non-unique collection is quite low comparable to other costs we have to pay).

I hope this helps 😄

@andyferris
Copy link
Member

Yes, that helps a lot.

I was already thinking that I can port that groupfind2 stuff to _innerjoin! and within the first loop track whether it is unique or not, branching to a faster algorithm after that if possible (e.g. the sortperm is unnecessary if the elements are unique, as is all the range-finding stuff).

@nalimilan
Copy link
Member

Sharing common join code in SplitApplyCombine seems like a good idea.

My two cents:

  • I think data.table uses radix sort, which should be very efficient when working on bitstype keys. Currently it's not as fast as it could, but with some experimental changes I was able to make it closer to data.table (Make sortperm faster for large vectors JuliaCollections/SortingAlgorithms.jl#33). So it might be worth trying.
  • A lesson from the DataFrames grouping and joining implementation is that if you compute hashes for multiple columns, it's more efficient to do that one column at a time (probably because that optimizes memory accesses) than row by row.

@piever
Copy link

piever commented Aug 14, 2020

I just wanted to mention that StructArrays implements the sorting approach, computing a sortperm for both sets of keys, and then an online mergesort (see here), so maybe that can be useful to compare. It seems to me that SplitApplyCombine is faster though.

I would actually be pretty happy to get rid of that code, so I'd be vary much in favor of having join functionality in a separate package.

@bkamins
Copy link
Member Author

bkamins commented Aug 14, 2020

I have checked it earlier:

julia> x = shuffle(1:10^7);

julia> sortperm(x);

julia> @time sortperm(x);
  2.523749 seconds (3 allocations: 76.294 MiB, 1.99% gc time)

So for two structures it is 5 seconds + the cost of merging.

My current implementation above does everything in 3.4 seconds so the cost is a bit above a cost of single sortperm.

EDIT

just note that this 3.4 seconds timing is not deployed anywhere yet - I put is as a benchmark for @andyferris, as he is working on the SplitApplyCombine.jl joins update currently (of course I can make a PR with this change, but I feel that it is cleaner if @andyferris makes a change that is consistent with his approach about how the exact design be in SplitApplyCombine.jl).

@derekmahar
Copy link

* `leftsorted`: hint if right table is sorted (this for sure will be handy)

I think you meant rightsorted.

@bkamins
Copy link
Member Author

bkamins commented Aug 14, 2020

sure - copy/paste issue. I fix it above.

@bkamins
Copy link
Member Author

bkamins commented Aug 14, 2020

Also - after discussion on Discourse it would be good to allow non-equi joins (even if they would not be super fast - in many cases people have relatively small tables so even O(nrow(df1)*nrow(df2)) time complexity is acceptable.

@bkamins
Copy link
Member Author

bkamins commented Aug 17, 2020

@andyferris - what do you think the plan for the next steps should be here? We clearly have a path to go in the long term, but in short term do you think that I should do some temporary solution in DataFrames.jl or updates to SplitApplyCombine.jl can be expected soon enough so that we should wait for them (no rush of course - I am asking to plan the work in DataFrames.jl).

@andyferris
Copy link
Member

@bkamins that’s a good question. Where is the “3.4 seconds” code, by the way? It would be good if I could compare it to my prototype for SplitApplyCombine (and how all the column-row-column handling stuff goes).

I suppose such an integration can be implemented slowly - there’s a few basic functions like mapview that I’ve tended to use internally (which should become Iterators.map eventually). Are we also interested in the group family of functions? The joins are probably the most complex, when you think about it.

@bkamins
Copy link
Member Author

bkamins commented Aug 18, 2020

I have not pushed it anywhere as it was dirty. Here is the code:

function _innerjoin!(out, l::AbstractArray, r::AbstractArray, v::AbstractArray, ::typeof(isequal),
                     keeporder::Bool=true)
    @boundscheck if keeporder && (axes(l)..., axes(r)...) != axes(v)
        throw(DimensionMismatch("innerjoin arrays do not have matching dimensions"))
    end

    keeporder && length(r) > length(l) && return _innerjoin!(out, r, l, v, isequal, false)

    rkeys = keys(r)
    V = eltype(rkeys)
    dict = Dict{eltype(r), V}()
    @inbounds for i_r ∈ rkeys
        r_value = r[i_r]
        dict_index = Base.ht_keyindex2!(dict, r_value)

        # this can be optimized in the future as what we already have in dict can be reused
        if dict_index > 0
            if keeporder
                return _innerjoin_dup!(out, l, r, v, isequal)
            else
                return _innerjoin_dup!(out, r, l, v, isequal)
            end
        end

        Base._setindex!(dict, i_r, r_value, -dict_index)
    end

    @inbounds for i_l in keys(l)
        l_value = l[i_l]
        dict_index = Base.ht_keyindex(dict, l_value)
        if dict_index > 0 # -1 if key not found
            i_r = dict.vals[dict_index]
            # check if compiler properly optimizes out this condition when doing constant propagation
            if keeporder
                push!(out, v[_tuple(i_l)..., _tuple(i_r)...])
            else
                push!(out, v[_tuple(i_r)..., _tuple(i_l)...])
            end
        end
    end

    return out
end

# we fall back to original code if we have duplicates
function _innerjoin_dup!(out, l::AbstractArray, r::AbstractArray, v::AbstractArray, ::typeof(isequal))
    @boundscheck if (axes(l)..., axes(r)...) != axes(v)
        throw(DimensionMismatch("innerjoin arrays do not have matching dimensions"))
    end

    @info "using slow branch"

    if length(r) <= length(l)
        rkeys = keys(r)
        V = eltype(rkeys)
        dict = Dict{eltype(r), Vector{V}}()
        @inbounds for i_r ∈ rkeys
            push!(get!(Vector{V}, dict, r[i_r]), i_r)
        end

        @inbounds for i_l in keys(l)
            l_value = l[i_l]
            dict_index = Base.ht_keyindex(dict, l_value)
            if dict_index > 0 # -1 if key not found
                for i_r ∈ dict.vals[dict_index]
                    push!(out, v[_tuple(i_l)..., _tuple(i_r)...])
                end
            end
        end
    else
        lkeys = keys(l)
        V = eltype(lkeys)
        dict = Dict{eltype(l), Vector{V}}()
        @inbounds for i_l ∈ lkeys
            push!(get!(Vector{V}, dict, l[i_l]), i_l)
        end

        @inbounds for i_r in keys(r)
            r_value = r[i_r]
            dict_index = Base.ht_keyindex(dict, r_value)
            if dict_index > 0 # -1 if key not found
                for i_l ∈ dict.vals[dict_index]
                    push!(out, v[_tuple(i_l)..., _tuple(i_r)...])
                end
            end
        end
    end

    return out
end

@bkamins
Copy link
Member Author

bkamins commented Aug 18, 2020

Are we also interested in the group family of functions?

We can check it in the future, but I believe it is less of a priority as we already have this done quite efficiently (of course maybe things can be improved also here)

@bkamins
Copy link
Member Author

bkamins commented Aug 18, 2020

It seems the compiler does not fully inline keeporder so this is faster:

function _innerjoin!(out, l::AbstractArray, r::AbstractArray, v::AbstractArray, ::typeof(isequal))
    @boundscheck if (axes(l)..., axes(r)...) != axes(v)
        throw(DimensionMismatch("innerjoin arrays do not have matching dimensions"))
    end

    if length(r) <= length(l)
        let # make sure the local variables do not leak out of branches
            rkeys = keys(r)
            V = eltype(rkeys)
            dict = Dict{eltype(r), V}()
            @inbounds for i_r ∈ rkeys
                r_value = r[i_r]
                dict_index = Base.ht_keyindex2!(dict, r_value)

                # this can be optimized in the future as dict can be reused
                if dict_index > 0
                    return _innerjoin_dup!(out, l, r, v, isequal)
                end

                Base._setindex!(dict, i_r, r_value, -dict_index)
            end

            @inbounds for i_l in keys(l)
                l_value = l[i_l]
                dict_index = Base.ht_keyindex(dict, l_value)
                if dict_index > 0 # -1 if key not found
                    i_r = dict.vals[dict_index]
                    push!(out, v[_tuple(i_l)..., _tuple(i_r)...])
                end
            end
        end
    else
        let
            lkeys = keys(l)
            V = eltype(lkeys)
            dict = Dict{eltype(l), V}()
            @inbounds for i_l ∈ lkeys
                l_value = l[i_l]
                dict_index = Base.ht_keyindex2!(dict, l_value)

                # this can be optimized in the future as dict can be reused
                if dict_index > 0
                    return _innerjoin_dup!(out, l, r, v, isequal)
                end

                Base._setindex!(dict, i_l, l_value, -dict_index)
            end

            @inbounds for i_r in keys(r)
                r_value = l[i_r]
                dict_index = Base.ht_keyindex(dict, r_value)
                if dict_index > 0 # -1 if key not found
                    i_l = dict.vals[dict_index]
                    push!(out, v[_tuple(i_l)..., _tuple(i_r)...])
                end
            end
        end
    end

    return out
end

@andyferris
Copy link
Member

It seems the compiler does not fully inline keeporder so this is faster:

You mean constant propagate? It is interesting to me that this stuff doesn't work well with default and keyword arguments (I'd expect the equivalent of @propagate_inbounds and full constant prop for the auto-generated intermediate functions).

In general... I'm curious how important the order out of innerjoin is? The functions in SplitApplyCombine generally are specified in terms of iteration, but a lot of these accelerations are faster without this worrying about this.

@quinnj
Copy link
Member

quinnj commented Aug 19, 2020

I've been on vacation for the past week or so, but I'm trying to catch back up on things. Two of the basic questions I have are where the core operations/functions are going to live, and what exactly will they be? I ask because I wonder if we could have some generic function definitions in DataAPI.jl that would allow experimental overload by custom array types to try and optimize certain operations (grouping, joining, etc.).

I guess I'm also wondering if that would even make sense, or do we think we can make a set of optimized implementations that could operate on any array type?

I've started playing around with making the ChainedVector type in SentinelArrays.jl use threads for certain operations (map, filter, etc.) and I want to be able to plug into the grouping/joining functionality seamlessly and have threading "just work". Any thoughts on the best way forward here? I guess it would probably be useful for me to actually dive into the SplitApplyCombine.jl code and see what exactly is going on there; is that code up to date w/ everything that's been posted here? Or are people working locally or on PRs?

@andyferris
Copy link
Member

Hey Jacob - from my point of view SplitApplyCombine was created to be a home for generic operations like group and join on generic collections. If ChainedVector is an AbstractVector then everything that is there should theoretically just work, I hope :)

Of course faster (you said multithreaded?) implementations for certain array types is always good, so adding methods to those generic functions makes sense to me (and is easy to prototype if you're willing to take SplitApplyCombine as a dependency - AcceleratedArrays is the current "poster child" for this - I think it makes sense to have different packages to supply the verbs/functions and for others to supply the nouns/datatypes).

There may also be deficiencies in terms of what generic functions are needed for tabular manipulation, it's likely we'd want some more (e.g. the only joins are innerjoin and the C#-like leftgroupjoin).

is that code up to date w/ everything that's been posted here? Or are people working locally or on PRs?

I published a few performance improvements based on earlier parts of this discussion but the more recent stuff is still WIP (code pasted and/or linked above).

@bkamins
Copy link
Member Author

bkamins commented Aug 19, 2020

@andyferris

You mean constant propagate?

Yes - I mean constant propagation and pruning branches in the code (in my first implementation keeporder was only used to avoid code duplication that follows if length(r) <= length(l) test you have now).

Or are people working locally or on PRs?

I assume that @andyferris will update SplitApplyCombine.jl based on the discussion (@andyferris - thank you for working on this). To my understanding in the short term the changes that show promise of significant improvement are:

  1. optimistically assume that the short side of the join is unique and efficiently fall back to the non-unique algorithm that we have now if we detect it is not
  2. in the non-unique short side of the join algorithm avoid using a Dict of key => vector associations as it leads to many small allocations but use a strategy similar to groupby in DataFrames.jl
  3. A small optimization that would save some time (not much, but still valuable) is to allow changing the value returned from innerjoin (in DataFrames.jl I need two vectors to index left and right data frame to perform post processing, and the current API does not provide such an option).
  4. A point that we have not discussed much, but it lies on a table is ensuring the performance of joins on vectors that are like PolledArray or CategoricalArray (the current code does not take into account possible optimizations here)

For me the point 1. above would be a priority as it is pretty clear what needs to be done here and gives a lot of performance boost (so if SplitApplyCombine.jl would be released with changes described in 1. I would then update DataFrames.jl to use it instead of what we have in innerjoin).

Now, what I believe we need in the long term is described in #2340 (comment) but this is a lot of work so I assume we are going to handle this incrementally.

non-Jedi added a commit to non-Jedi/SplitApplyCombine.jl that referenced this issue Aug 28, 2020
This avoids allocating a Vector for the case where l does not have
multiple indices with the same value. For the smoke-test benchmark in
<JuliaData/DataFrames.jl#2340 (comment)>,
this reduces allocations by half and overall runtime by 10%.
@bkamins bkamins added this to the 1.0 milestone Mar 4, 2021
This was referenced Mar 4, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants