Implement streaming versions of Dataframe.collect methods#789
Implement streaming versions of Dataframe.collect methods#789andygrove merged 23 commits intoapache:masterfrom
Conversation
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
alamb
left a comment
There was a problem hiding this comment.
I think the idea looks very nice 👍
datafusion/src/dataframe.rs
Outdated
| /// # Ok(()) | ||
| /// # } | ||
| /// ``` | ||
| async fn collect_stream(&self) -> Result<SendableRecordBatchStream>; |
There was a problem hiding this comment.
What if we called this something like execute rather than collect_stream?
async fn execute_stream(&self) -> Result<SendableRecordBatchStream>;
This would mirror the naming of ExecutionPlan::execute and might make it clearer that collect means collect into a Vec and execute means get a stream
There was a problem hiding this comment.
Good idea. I renamed these to execute_stream and execute_stream_partitioned
| /// execute it, collecting all resulting batches into memory while maintaining | ||
| /// partitioning | ||
| async fn collect_partitioned(&self) -> Result<Vec<Vec<RecordBatch>>> { | ||
| let state = self.ctx_state.lock().unwrap().clone(); |
There was a problem hiding this comment.
You could probably rewrite collect_partitioned to be in terms of collect_stream_partitioned:
collect(self.collect_stream_partitioned().await?)or something like that
There was a problem hiding this comment.
I've cleaned the code up and removed a fair bit of duplication now.
alamb
left a comment
There was a problem hiding this comment.
Looks like a really nice change to me
* feat: Optimze CreateNamedStruct preserve dictionaries Instead of serializing the return data_type we just serialize the field names. The original implmentation was done as it lead to slightly simpler implementation, but it clear from apache#750 that this was the wrong choice and leads to issues with the physical data_type. * Support dictionary data_types in StructVector and MapVector * Add length checks
* feat: Optimze CreateNamedStruct preserve dictionaries Instead of serializing the return data_type we just serialize the field names. The original implmentation was done as it lead to slightly simpler implementation, but it clear from apache#750 that this was the wrong choice and leads to issues with the physical data_type. * Support dictionary data_types in StructVector and MapVector * Add length checks
Which issue does this PR close?
Closes #47.
Rationale for this change
In addition to the current
collect*methods that load results into memory in aVec<RecordBatch>this PR adds alternateexecute_stream*methods that return streams instead so that results don't have to be loaded into memory before being processed.What changes are included in this PR?
New
execute_streamandexecute_stream_partitionedmethods onDataFrame.Are there any user-facing changes?
Yes, new DataFrame methods.