Skip to content

Commit a9121af

Browse files
author
Jiayu Liu
committed
update unit test
1 parent 2af2a27 commit a9121af

File tree

3 files changed

+67
-1
lines changed

3 files changed

+67
-1
lines changed

datafusion/src/execution/context.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1268,6 +1268,65 @@ mod tests {
12681268
Ok(())
12691269
}
12701270

1271+
#[tokio::test]
1272+
async fn window() -> Result<()> {
1273+
let results = execute("SELECT c1, MAX(c2) OVER () FROM test", 4).await?;
1274+
assert_eq!(results.len(), 1);
1275+
Ok(())
1276+
}
1277+
1278+
#[tokio::test]
1279+
async fn window_plan() -> Result<()> {
1280+
let schema = Schema::new(vec![
1281+
Field::new("a", DataType::Int32, false),
1282+
Field::new("b", DataType::Int32, false),
1283+
]);
1284+
1285+
let batch = RecordBatch::try_new(
1286+
Arc::new(schema.clone()),
1287+
vec![
1288+
Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
1289+
Arc::new(Int32Array::from(vec![2, 12, 12, 120])),
1290+
],
1291+
)?;
1292+
let mut ctx = ExecutionContext::new();
1293+
let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch]])?;
1294+
ctx.register_table("t", Arc::new(provider))?;
1295+
1296+
let logical_plan = ctx.create_logical_plan("SELECT a, MAX(b) OVER () FROM t")?;
1297+
let opt_plan = ctx.optimize(&logical_plan)?;
1298+
let physical_plan = ctx.create_physical_plan(&opt_plan)?;
1299+
assert_eq!(
1300+
format!("{:?}", physical_plan),
1301+
"ProjectionExec { \
1302+
expr: [(Column { name: \"a\" }, \"a\"), (Column { name: \"MAX(b)\" }, \"MAX(b)\")], \
1303+
schema: Schema { \
1304+
fields: [\
1305+
Field { name: \"a\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }, \
1306+
Field { name: \"MAX(b)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }], \
1307+
metadata: {} }, \
1308+
input: RepartitionExec { \
1309+
input: WindowAggExec { \
1310+
input: partitions: [...]\
1311+
schema: Schema { fields: [\
1312+
Field { name: \"a\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }, \
1313+
Field { name: \"b\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }], \
1314+
metadata: {} }\
1315+
projection: Some([0, 1]), \
1316+
window_expr: [AggregateWindowExpr { \
1317+
aggregate: Max { name: \"MAX(b)\", data_type: Int32, nullable: true, expr: Column { name: \"b\" } } }], \
1318+
schema: Schema { fields: [\
1319+
Field { name: \"MAX(b)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None }, \
1320+
Field { name: \"a\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }, \
1321+
Field { name: \"b\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }], metadata: {} }, \
1322+
input_schema: Schema { fields: [\
1323+
Field { name: \"a\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }, \
1324+
Field { name: \"b\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }], metadata: {} } }, \
1325+
partitioning: RoundRobinBatch(16), channels: Mutex { data: {} } } }"
1326+
);
1327+
Ok(())
1328+
}
1329+
12711330
#[tokio::test]
12721331
async fn aggregate() -> Result<()> {
12731332
let results = execute("SELECT SUM(c1), SUM(c2) FROM test", 4).await?;

datafusion/src/physical_plan/planner.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,10 @@ impl DefaultPhysicalPlanner {
147147
// Initially need to perform the aggregate and then merge the partitions
148148
let input_exec = self.create_initial_plan(input, ctx_state)?;
149149
let input_schema = input_exec.schema();
150-
let physical_input_schema = input_exec.as_ref().schema();
150+
151151
let logical_input_schema = input.as_ref().schema();
152+
let physical_input_schema = input_exec.as_ref().schema();
153+
152154
let window_expr = window_expr
153155
.iter()
154156
.map(|e| {

datafusion/src/physical_plan/windows.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ impl WindowAggExec {
120120
})
121121
}
122122

123+
/// Window expressions
124+
pub fn window_expr(&self) -> &[Arc<dyn WindowExpr>] {
125+
&self.window_expr
126+
}
127+
123128
/// Input plan
124129
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
125130
&self.input

0 commit comments

Comments
 (0)