-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(rust,python): support horizontal concatenation of LazyFrames #13139
Conversation
66b1978
to
d01b649
Compare
7e7630a
to
e74f9b8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice addition and great PR. It really does look great. I have some minor comments here and there and then it should be good to go.
)?; | ||
} | ||
|
||
let schema_size = inputs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should create a fn merge_schemas(schemas: &[Schema])
function. That can be shared with concat_lf_horizontal
and here.
It would also amortize the cost of materializing the schema. For some logical plan nodes this is a full new allocation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea thanks, I've added this although made it take &[SchemaRef]
instead to avoid schema copies.
@@ -83,21 +81,18 @@ pub(crate) fn insert_streaming_nodes( | |||
// this allows us to split at joins/unions and share a sink | |||
let root = insert_file_sink(root, lp_arena); | |||
|
|||
// We use mutation to communicate when we need to insert a file sink. | |||
// We use a bool flag in the stack to communicate when we need to insert a file sink. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has been a while. So if I understand it correctly we now can have multiple streaming subtrees that all sink into a hconcat
, where previous with for instance union
, the union would be part of the streaming subtree and still produce a single file_sink
that would be picked up by the default engine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's right, although there are separate sinks for each input stream that become the inputs of the hconcat
. It looks like the existing code was intended to work this way, but that behaviour was never actually used as all of the plans that have multiple inputs (join and union) are handled explicitly in the insert_streaming_nodes
match expression rather than falling through to process_non_streamable_node
.
Co-authored-by: Ritchie Vink <ritchie46@gmail.com>
4b1e68f
to
10b5a28
Compare
|
||
for schema in schemas { | ||
schema.iter().try_for_each(|(name, dtype)| { | ||
if merged_schema.with_column(name.clone(), dtype.clone()).is_none() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a blocker for this PR, but this can be done elegantly with ok_or_else(|| polars_err!(..)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, this is actually doing the opposite of ok_or_else
, a Some
result should be mapped to an Err
and a None
to Ok(())
. I couldn't find a more concise way to express this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh right!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great PR @adamreeve. Thanks a lot.
This fixes #10203
I've included support for slice pushdown and projection pushdown, but predicate pushdown is blocked.
In the projection pushdown implementation, it would be nice to be able to remove inputs where no columns are used, but because inputs can have different lengths we need to keep them all to make sure the final length is correct.
This doesn't implement proper streaming support, which could be added later, but I did have to fix the streaming logic for non-streamable nodes with more than one input in
insert_streaming_nodes
. The previous code that usedinsert_file_sink_ptr
didn't work correctly when this was incremented by more than 1. For example, if there are 2 inputs, we'd add a new sink node when processing the first input, then on the next iteration of the while loop ininsert_streaming_nodes
the inserted sink itself would be popped off the stack and the ptr value would be decremented to 0 before getting to the second input node. We also need a new pipeline tree per input rather than only one new tree.