-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat: support pivot and unpivot #17946
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
b4b297e
cc84b7e
b988225
a931e2c
9becd14
0b95a56
3341ae4
1f471f7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2368,6 +2368,139 @@ impl DataFrame { | |||||||||||||||||||||
| let df = ctx.read_batch(batch)?; | ||||||||||||||||||||||
| Ok(df) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /// Pivot the DataFrame, transforming rows into columns based on the specified value columns and aggregation functions. | ||||||||||||||||||||||
| /// | ||||||||||||||||||||||
| /// # Arguments | ||||||||||||||||||||||
| /// * `aggregate_functions` - Aggregation expressions to apply (e.g., sum, count). | ||||||||||||||||||||||
| /// * `value_column` - Columns whose unique values will become new columns in the output. | ||||||||||||||||||||||
| /// * `value_source` - Columns to use as values for the pivoted columns. | ||||||||||||||||||||||
| /// * `default_on_null` - Optional expressions to use as default values when a pivoted value is null. | ||||||||||||||||||||||
| /// | ||||||||||||||||||||||
| /// # Example | ||||||||||||||||||||||
| /// ``` | ||||||||||||||||||||||
| /// # use datafusion::prelude::*; | ||||||||||||||||||||||
| /// # use arrow::array::{ArrayRef, Int32Array, StringArray}; | ||||||||||||||||||||||
| /// # use datafusion::functions_aggregate::expr_fn::sum; | ||||||||||||||||||||||
| /// # use std::sync::Arc; | ||||||||||||||||||||||
| /// # let ctx = SessionContext::new(); | ||||||||||||||||||||||
| /// let value: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); | ||||||||||||||||||||||
| /// let category: ArrayRef = Arc::new(StringArray::from(vec!["A", "B", "A"])); | ||||||||||||||||||||||
| /// let df = DataFrame::from_columns(vec![("value", value), ("category", category)]).unwrap(); | ||||||||||||||||||||||
| /// let pivoted = df.pivot( | ||||||||||||||||||||||
| /// vec![sum(col("value"))], | ||||||||||||||||||||||
| /// vec![Column::from("category")], | ||||||||||||||||||||||
| /// vec![col("value")], | ||||||||||||||||||||||
| /// None | ||||||||||||||||||||||
| /// ).unwrap(); | ||||||||||||||||||||||
| /// ``` | ||||||||||||||||||||||
| pub fn pivot( | ||||||||||||||||||||||
| self, | ||||||||||||||||||||||
| aggregate_functions: Vec<Expr>, | ||||||||||||||||||||||
| value_column: Vec<Column>, | ||||||||||||||||||||||
| value_source: Vec<Expr>, | ||||||||||||||||||||||
| default_on_null: Option<Vec<Expr>>, | ||||||||||||||||||||||
| ) -> Result<Self> { | ||||||||||||||||||||||
| let plan = LogicalPlanBuilder::from(self.plan) | ||||||||||||||||||||||
| .pivot( | ||||||||||||||||||||||
| aggregate_functions, | ||||||||||||||||||||||
| value_column, | ||||||||||||||||||||||
| value_source, | ||||||||||||||||||||||
| default_on_null, | ||||||||||||||||||||||
| )? | ||||||||||||||||||||||
| .build()?; | ||||||||||||||||||||||
| Ok(DataFrame { | ||||||||||||||||||||||
| session_state: self.session_state, | ||||||||||||||||||||||
| plan, | ||||||||||||||||||||||
| projection_requires_validation: self.projection_requires_validation, | ||||||||||||||||||||||
| }) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /// Unpivot the DataFrame, transforming columns into rows. | ||||||||||||||||||||||
| /// | ||||||||||||||||||||||
| /// # Arguments | ||||||||||||||||||||||
| /// * `value_column_names` - Names for the value columns in the output | ||||||||||||||||||||||
| /// * `name_column` - Name for the column that will contain the original column names | ||||||||||||||||||||||
| /// * `unpivot_columns` - List of (column_names, optional_alias) tuples to unpivot | ||||||||||||||||||||||
| /// * `id_columns` - Optional list of columns to preserve (if None, all non-unpivoted columns are preserved) | ||||||||||||||||||||||
| /// * `include_nulls` - Whether to include rows with NULL values (default: false excludes NULLs) | ||||||||||||||||||||||
| /// | ||||||||||||||||||||||
| /// # Example | ||||||||||||||||||||||
| /// ``` | ||||||||||||||||||||||
| /// # use std::sync::Arc; | ||||||||||||||||||||||
| /// # use arrow::array::{ArrayRef, Int32Array}; | ||||||||||||||||||||||
| /// # use datafusion::prelude::*; | ||||||||||||||||||||||
| /// # use datafusion::error::Result; | ||||||||||||||||||||||
| /// # #[tokio::main] | ||||||||||||||||||||||
| /// # async fn main() -> Result<()> { | ||||||||||||||||||||||
| /// let ctx = SessionContext::new(); | ||||||||||||||||||||||
| /// let id: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); | ||||||||||||||||||||||
| /// let jan: ArrayRef = Arc::new(Int32Array::from(vec![100, 110])); | ||||||||||||||||||||||
| /// let feb: ArrayRef = Arc::new(Int32Array::from(vec![200, 210])); | ||||||||||||||||||||||
| /// let mar: ArrayRef = Arc::new(Int32Array::from(vec![300, 310])); | ||||||||||||||||||||||
| /// let df = DataFrame::from_columns(vec![("id", id), ("jan", jan), ("feb", feb), ("mar", mar)]).unwrap(); | ||||||||||||||||||||||
| /// let unpivoted = df.unpivot( | ||||||||||||||||||||||
| /// vec!["jan".to_string(), "feb".to_string(), "mar".to_string()], | ||||||||||||||||||||||
| /// "month".to_string(), | ||||||||||||||||||||||
| /// vec![(vec!["jan".to_string(), "feb".to_string(), "mar".to_string()], None)], | ||||||||||||||||||||||
|
Comment on lines
+2443
to
+2445
|
||||||||||||||||||||||
| /// vec!["jan".to_string(), "feb".to_string(), "mar".to_string()], | |
| /// "month".to_string(), | |
| /// vec![(vec!["jan".to_string(), "feb".to_string(), "mar".to_string()], None)], | |
| /// vec!["value".to_string()], // Name for the value column in output | |
| /// "month".to_string(), | |
| /// vec![ | |
| /// (vec!["jan".to_string()], Some("jan".to_string())), | |
| /// (vec!["feb".to_string()], Some("feb".to_string())), | |
| /// (vec!["mar".to_string()], Some("mar".to_string())) | |
| /// ], |
Copilot
AI
Nov 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent indentation in the error handling closure. The opening brace and closure body on lines 2465-2467 should be indented consistently with the similar error handlers below on lines 2473-2475 and 2481-2483.
| DataFusionError::Plan("named_struct function not found".to_string()) | |
| })?; | |
| DataFusionError::Plan("named_struct function not found".to_string()) | |
| })?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation example appears to be incorrect. The
value_sourceparameter should contain literal values that will become column names (e.g.,lit("A"),lit("B")), notcol("value"). Based on the test cases and implementation, this should be: