-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proposal: Hook to better support CollectLeft
joins in distributed execution
#12454
Comments
One challenge I predict with the above scenario is that it seems to assume that the order of rows from the build side will be the same on all nodes across all partitions (so you can match up the BooleanBuffer across ndoes)
I think adding hooks for making distributed engines is a very reasonable discussion
I am not sure this is the "correct" way though it is certainly one way to do it. It seems like the core challenge you are describing is finding all rows in a small You could manage this via a distributed state as you suggest. Another way might be to rewrite the query (automatically) to do the check of which rows didn't match on a single node The single node might seem like a bad idea, but if the So do something like
The idea is that you run the outer query either on a single node or after redistributing both sides on fid This does mean you have to hash |
Yeah, fair. I meant basically that it seems like the way to do it that does not require any distributed state shared across distributed execution nodes.
This was an approach we tried (or something very similar in spirit at least) but it breaks down when you have a join + aggregation. E.g. with a query like
that tries to calculate the matched vs unmatched rows from By adding a coalesce to do the outer join you can no longer fuse the partial aggregation with the hash join and you end up back in the position of having to shuffle huge amounts of data FWIW, I implemented my proposal from above on our internal fork of DataFusion and it's not terribly intrusive in the DataFusion code (IMO): coralogix#269 |
Yeah, this is definitely a challenge in the general case if the build-side subquery is inlined into the hash join. In our case the build side subquery is a separate stage so we can pretty easily ensure a consistent row ordering since the its read from a shuffle |
I would defer to @korowa and @comphead , who I think are our current join experts. |
If there 1 huge table and another one is small, in this case the |
You are right about broadcast join, but I think for Though @thinkharderdev maybe that is another idea: how about do the |
Right, this is what we are doing now but the problem is that for an |
That would still require coalescing all the output partitions from the hash join into a single partition and processing that stream on a single node. |
This unfortunately is a bug that is present in Ballista as well, for left and full outer joins (every join type that produces rows for unmatched left side rows), a plain broadcast join won't work when planned on multiple nodes, I will file an issue on Ballista repo as well. Sharing the bitmap (to one node) has the least overhead, every other solution will require moving the full output to one node, so the performance depends more on the join output size. FYI @andygrove |
That is right (or alternately repartitioning the Depending on the join's output cardinality that might not be too bad (it is certainly better than repartitioning the base |
It probably isn't feasible, but the ideal way to run such a query is to pre-partitioning the (this is how Vertica does this type of join, FWIW) If re-partitioning the entire data table is too large, you could potentially only repartition a projection of |
That would essentially just be the hash partitioning right? Or do you mean actually storing the underlying table data so it is already partitioned and doesn't require a repartition during query execution? The latter wouldn't be possible in our case since we don't know the specific query patterns that are required up front. |
In our case it doesn't help much since the output cardinality can be (in principle) even larger than the size of |
It still not very clear for me what exactly required to be done on DF side 🤔 |
The basic idea (which I have implemented on our internal fork here coralogix#269) is to provide a hook in DataFusion which can be passed in through the
Hash partitioning is the standard way to do this in general I agree but its just not possible in practice to shuffle that much data as required by hash partitioning. The best performing way to do it by far is a broadcast join which is currently not possible. My opinion personally is that since DataFusion is being used by a number of organizations to build distributed query engines (even if DF itself is not that) it is reasonable to add hooks/extension points in DF to support that use case as long as
If this conversation makes sense to concretize this on a particular code change I'm happy to submit a PR here to frame things. |
We had a discussion about join of huge table with small table here: #7000 (comment) There are several approaches discussed:
|
I don't think either of these would actually solve the issue with outer joins. The problem is that there is shared state across the partitions in the join stage about which build-side rows have been matched across the entire stage. In single-node execution you can deal with this as shared in-memory state (which is what |
I understand your point. I mentioned ClickHouse functionality and Dictionaries just to show that sometimes it is not necessary to do join when you'd like to do join. I agree that this will work only for inner joins (the we can replace join by getting data from dictionary).
Yes, this is true. There are some examples: So I think that if we can consider small Facts table as a some kind of dictionary, and build HashJoin index on each node using copies on this table. This can help us to do join without resort to develop an external mechanism for communication. |
This is effectively what |
@thinkharderdev thats totally true, outer joins, and especially filtered outer joins require to track filtered/matched indexes before emitting the final result. The similar problem we encountered in SortMergeJoin #12359 even in a single node environment but the idea is the same. The processed partition has no idea about other partitions and the row can find a match in partition0, but no match in partition1. In this case the join result emitted not correctly. There are probably 2 options to handle it:
Both of approaches require to partition the keys appropriately. |
this animation helps a lot for partitioning https://www.youtube.com/watch?v=GRONctC_Uh0 |
The third option which I am proposing here is to create an extension point to allow sharing just the bitmask. We don't have to (and shouldn't) try and solve the actual coordination of this sharing in DF because that is not what DF is, but I'm suggesting that we add the hooks that allow it to be implemented outside of DF for use cases (like ours) where it is needed. |
I personally think the hook is fine as long as it is clearly documented (ideally with an example, but that is not required). |
I like hook idea, lets see a PR so we can talk in details there. Just to double confirm @thinkharderdev you planning to send a bitmask or array of matched/nonmatched left indices to all nodes to calculate correctly the outer join? |
Thanks guys, put up a draft PR at #12523. |
Is your feature request related to a problem or challenge?
Suppose you are building a distributed query engine on top of DataFusion and you want to run a query like
where
facts
is a small "fact" table anddata
is some HUGE table (many, many TB lets assume).The optimal way to do this in a single node execution is probably using
CollectLeft
sincefact
is small, but this doesn't really work in a distributed join becauseCollectLeft
joins rely on in-memory state.The correct way to do this in a distributed execution is to use a partitioned join and repartition
data
but this is a problem becausedata
is huge and the repartition would require shuffling a potentially massive amount of data.Describe the solution you'd like
Add a "hook" in
HashJoinExec
that would allow shared state to be managed in a distributed execution in a user-defined way.This might look something like
That is,
JoinLeftData
can have an optionalDistributedJoinState
that can be passed in through theTaskContext
during execution. If not provided then everything works exactly as it does now. But if it is provided, thenHashJoinStream
can poll the distributed state when it's last (local) probe task completes and, if its the last global probe task, emit the unmatched rows based on the global bit mask.Describe alternatives you've considered
Do nothing and rely on only hash partitioned joins for distributed use cases
Additional context
This sort of goes against the idea that DataFusion itself is not a library for distributed query execution, but given that many use cases of DF are in fact for distributed execution it might make sense to provide hooks for that directly in DF as long as they don't add any meaningful overhead to the single-node execution model.
If that is not the way we want to go then totally fine, just raising the question :)
The text was updated successfully, but these errors were encountered: