Skip to content

Commit ff27d90

Browse files
authored
Fix regression by reverting Materialize dictionaries in group keys (#8740)
* revert eb8aff7 / Materialize dictionaries in group keys * Update tests * Update tests
1 parent dd4263f commit ff27d90

File tree

6 files changed

+124
-48
lines changed

6 files changed

+124
-48
lines changed

datafusion/core/tests/path_partition.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,9 @@ async fn parquet_distinct_partition_col() -> Result<()> {
168168
assert_eq!(min_limit, resulting_limit);
169169

170170
let s = ScalarValue::try_from_array(results[0].column(1), 0)?;
171-
let month = match s {
172-
ScalarValue::Utf8(Some(month)) => month,
173-
s => panic!("Expected month as Utf8 found {s:?}"),
171+
let month = match extract_as_utf(&s) {
172+
Some(month) => month,
173+
s => panic!("Expected month as Dict(_, Utf8) found {s:?}"),
174174
};
175175

176176
let sql_on_partition_boundary = format!(
@@ -191,6 +191,15 @@ async fn parquet_distinct_partition_col() -> Result<()> {
191191
Ok(())
192192
}
193193

194+
fn extract_as_utf(v: &ScalarValue) -> Option<String> {
195+
if let ScalarValue::Dictionary(_, v) = v {
196+
if let ScalarValue::Utf8(v) = v.as_ref() {
197+
return v.clone();
198+
}
199+
}
200+
None
201+
}
202+
194203
#[tokio::test]
195204
async fn csv_filter_with_file_col() -> Result<()> {
196205
let ctx = SessionContext::new();

datafusion/physical-plan/src/aggregates/group_values/row.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,22 @@
1717

1818
use crate::aggregates::group_values::GroupValues;
1919
use ahash::RandomState;
20+
use arrow::compute::cast;
2021
use arrow::record_batch::RecordBatch;
2122
use arrow::row::{RowConverter, Rows, SortField};
22-
use arrow_array::ArrayRef;
23-
use arrow_schema::SchemaRef;
23+
use arrow_array::{Array, ArrayRef};
24+
use arrow_schema::{DataType, SchemaRef};
2425
use datafusion_common::hash_utils::create_hashes;
25-
use datafusion_common::Result;
26+
use datafusion_common::{DataFusionError, Result};
2627
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
2728
use datafusion_physical_expr::EmitTo;
2829
use hashbrown::raw::RawTable;
2930

3031
/// A [`GroupValues`] making use of [`Rows`]
3132
pub struct GroupValuesRows {
33+
/// The output schema
34+
schema: SchemaRef,
35+
3236
/// Converter for the group values
3337
row_converter: RowConverter,
3438

@@ -75,6 +79,7 @@ impl GroupValuesRows {
7579
let map = RawTable::with_capacity(0);
7680

7781
Ok(Self {
82+
schema,
7883
row_converter,
7984
map,
8085
map_size: 0,
@@ -165,7 +170,7 @@ impl GroupValues for GroupValuesRows {
165170
.take()
166171
.expect("Can not emit from empty rows");
167172

168-
let output = match emit_to {
173+
let mut output = match emit_to {
169174
EmitTo::All => {
170175
let output = self.row_converter.convert_rows(&group_values)?;
171176
group_values.clear();
@@ -198,6 +203,20 @@ impl GroupValues for GroupValuesRows {
198203
}
199204
};
200205

206+
// TODO: Materialize dictionaries in group keys (#7647)
207+
for (field, array) in self.schema.fields.iter().zip(&mut output) {
208+
let expected = field.data_type();
209+
if let DataType::Dictionary(_, v) = expected {
210+
let actual = array.data_type();
211+
if v.as_ref() != actual {
212+
return Err(DataFusionError::Internal(format!(
213+
"Converted group rows expected dictionary of {v} got {actual}"
214+
)));
215+
}
216+
*array = cast(array.as_ref(), expected)?;
217+
}
218+
}
219+
201220
self.group_values = Some(group_values);
202221
Ok(output)
203222
}

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ use crate::{
3636
use arrow::array::ArrayRef;
3737
use arrow::datatypes::{Field, Schema, SchemaRef};
3838
use arrow::record_batch::RecordBatch;
39-
use arrow_schema::DataType;
4039
use datafusion_common::stats::Precision;
4140
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
4241
use datafusion_execution::TaskContext;
@@ -254,9 +253,6 @@ pub struct AggregateExec {
254253
limit: Option<usize>,
255254
/// Input plan, could be a partial aggregate or the input to the aggregate
256255
pub input: Arc<dyn ExecutionPlan>,
257-
/// Original aggregation schema, could be different from `schema` before dictionary group
258-
/// keys get materialized
259-
original_schema: SchemaRef,
260256
/// Schema after the aggregate is applied
261257
schema: SchemaRef,
262258
/// Input schema before any aggregation is applied. For partial aggregate this will be the
@@ -287,19 +283,15 @@ impl AggregateExec {
287283
input: Arc<dyn ExecutionPlan>,
288284
input_schema: SchemaRef,
289285
) -> Result<Self> {
290-
let original_schema = create_schema(
286+
let schema = create_schema(
291287
&input.schema(),
292288
&group_by.expr,
293289
&aggr_expr,
294290
group_by.contains_null(),
295291
mode,
296292
)?;
297293

298-
let schema = Arc::new(materialize_dict_group_keys(
299-
&original_schema,
300-
group_by.expr.len(),
301-
));
302-
let original_schema = Arc::new(original_schema);
294+
let schema = Arc::new(schema);
303295
AggregateExec::try_new_with_schema(
304296
mode,
305297
group_by,
@@ -308,7 +300,6 @@ impl AggregateExec {
308300
input,
309301
input_schema,
310302
schema,
311-
original_schema,
312303
)
313304
}
314305

@@ -329,7 +320,6 @@ impl AggregateExec {
329320
input: Arc<dyn ExecutionPlan>,
330321
input_schema: SchemaRef,
331322
schema: SchemaRef,
332-
original_schema: SchemaRef,
333323
) -> Result<Self> {
334324
let input_eq_properties = input.equivalence_properties();
335325
// Get GROUP BY expressions:
@@ -382,7 +372,6 @@ impl AggregateExec {
382372
aggr_expr,
383373
filter_expr,
384374
input,
385-
original_schema,
386375
schema,
387376
input_schema,
388377
projection_mapping,
@@ -693,7 +682,7 @@ impl ExecutionPlan for AggregateExec {
693682
children[0].clone(),
694683
self.input_schema.clone(),
695684
self.schema.clone(),
696-
self.original_schema.clone(),
685+
//self.original_schema.clone(),
697686
)?;
698687
me.limit = self.limit;
699688
Ok(Arc::new(me))
@@ -800,24 +789,6 @@ fn create_schema(
800789
Ok(Schema::new(fields))
801790
}
802791

803-
/// returns schema with dictionary group keys materialized as their value types
804-
/// The actual convertion happens in `RowConverter` and we don't do unnecessary
805-
/// conversion back into dictionaries
806-
fn materialize_dict_group_keys(schema: &Schema, group_count: usize) -> Schema {
807-
let fields = schema
808-
.fields
809-
.iter()
810-
.enumerate()
811-
.map(|(i, field)| match field.data_type() {
812-
DataType::Dictionary(_, value_data_type) if i < group_count => {
813-
Field::new(field.name(), *value_data_type.clone(), field.is_nullable())
814-
}
815-
_ => Field::clone(field),
816-
})
817-
.collect::<Vec<_>>();
818-
Schema::new(fields)
819-
}
820-
821792
fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef {
822793
let group_fields = schema.fields()[0..group_count].to_vec();
823794
Arc::new(Schema::new(group_fields))

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -324,9 +324,7 @@ impl GroupedHashAggregateStream {
324324
.map(create_group_accumulator)
325325
.collect::<Result<_>>()?;
326326

327-
// we need to use original schema so RowConverter in group_values below
328-
// will do the proper coversion of dictionaries into value types
329-
let group_schema = group_schema(&agg.original_schema, agg_group_by.expr.len());
327+
let group_schema = group_schema(&agg_schema, agg_group_by.expr.len());
330328
let spill_expr = group_schema
331329
.fields
332330
.into_iter()

datafusion/sqllogictest/test_files/aggregate.slt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2469,11 +2469,11 @@ select max(x_dict) from value_dict group by x_dict % 2 order by max(x_dict);
24692469
query T
24702470
select arrow_typeof(x_dict) from value_dict group by x_dict;
24712471
----
2472-
Int32
2473-
Int32
2474-
Int32
2475-
Int32
2476-
Int32
2472+
Dictionary(Int64, Int32)
2473+
Dictionary(Int64, Int32)
2474+
Dictionary(Int64, Int32)
2475+
Dictionary(Int64, Int32)
2476+
Dictionary(Int64, Int32)
24772477

24782478
statement ok
24792479
drop table value

datafusion/sqllogictest/test_files/dictionary.slt

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ order by date_bin('30 minutes', time) DESC
169169

170170
# Reproducer for https://github.com/apache/arrow-datafusion/issues/8738
171171
# This query should work correctly
172-
query error DataFusion error: External error: Arrow error: Invalid argument error: RowConverter column schema mismatch, expected Utf8 got Dictionary\(Int32, Utf8\)
172+
query P?TT rowsort
173173
SELECT
174174
"data"."timestamp" as "time",
175175
"data"."tag_id",
@@ -201,3 +201,82 @@ ORDER BY
201201
"time",
202202
"data"."tag_id"
203203
;
204+
----
205+
2023-12-20T00:00:00 1000 f1 32.0
206+
2023-12-20T00:00:00 1000 f2 foo
207+
2023-12-20T00:10:00 1000 f1 32.0
208+
2023-12-20T00:10:00 1000 f2 foo
209+
2023-12-20T00:20:00 1000 f1 32.0
210+
2023-12-20T00:20:00 1000 f2 foo
211+
2023-12-20T00:30:00 1000 f1 32.0
212+
2023-12-20T00:30:00 1000 f2 foo
213+
2023-12-20T00:40:00 1000 f1 32.0
214+
2023-12-20T00:40:00 1000 f2 foo
215+
2023-12-20T00:50:00 1000 f1 32.0
216+
2023-12-20T00:50:00 1000 f2 foo
217+
2023-12-20T01:00:00 1000 f1 32.0
218+
2023-12-20T01:00:00 1000 f2 foo
219+
2023-12-20T01:10:00 1000 f1 32.0
220+
2023-12-20T01:10:00 1000 f2 foo
221+
2023-12-20T01:20:00 1000 f1 32.0
222+
2023-12-20T01:20:00 1000 f2 foo
223+
2023-12-20T01:30:00 1000 f1 32.0
224+
2023-12-20T01:30:00 1000 f2 foo
225+
226+
227+
# deterministic sort (so we can avoid rowsort)
228+
query P?TT
229+
SELECT
230+
"data"."timestamp" as "time",
231+
"data"."tag_id",
232+
"data"."field",
233+
"data"."value"
234+
FROM (
235+
(
236+
SELECT "m2"."time" as "timestamp", "m2"."tag_id", 'active_power' as "field", "m2"."f5" as "value"
237+
FROM "m2"
238+
WHERE "m2"."time" >= '2023-12-05T14:46:35+01:00' AND "m2"."time" < '2024-01-03T14:46:35+01:00'
239+
AND "m2"."f5" IS NOT NULL
240+
AND "m2"."type" IN ('active')
241+
AND "m2"."tag_id" IN ('1000')
242+
) UNION (
243+
SELECT "m1"."time" as "timestamp", "m1"."tag_id", 'f1' as "field", "m1"."f1" as "value"
244+
FROM "m1"
245+
WHERE "m1"."time" >= '2023-12-05T14:46:35+01:00' AND "m1"."time" < '2024-01-03T14:46:35+01:00'
246+
AND "m1"."f1" IS NOT NULL
247+
AND "m1"."tag_id" IN ('1000')
248+
) UNION (
249+
SELECT "m1"."time" as "timestamp", "m1"."tag_id", 'f2' as "field", "m1"."f2" as "value"
250+
FROM "m1"
251+
WHERE "m1"."time" >= '2023-12-05T14:46:35+01:00' AND "m1"."time" < '2024-01-03T14:46:35+01:00'
252+
AND "m1"."f2" IS NOT NULL
253+
AND "m1"."tag_id" IN ('1000')
254+
)
255+
) as "data"
256+
ORDER BY
257+
"time",
258+
"data"."tag_id",
259+
"data"."field",
260+
"data"."value"
261+
;
262+
----
263+
2023-12-20T00:00:00 1000 f1 32.0
264+
2023-12-20T00:00:00 1000 f2 foo
265+
2023-12-20T00:10:00 1000 f1 32.0
266+
2023-12-20T00:10:00 1000 f2 foo
267+
2023-12-20T00:20:00 1000 f1 32.0
268+
2023-12-20T00:20:00 1000 f2 foo
269+
2023-12-20T00:30:00 1000 f1 32.0
270+
2023-12-20T00:30:00 1000 f2 foo
271+
2023-12-20T00:40:00 1000 f1 32.0
272+
2023-12-20T00:40:00 1000 f2 foo
273+
2023-12-20T00:50:00 1000 f1 32.0
274+
2023-12-20T00:50:00 1000 f2 foo
275+
2023-12-20T01:00:00 1000 f1 32.0
276+
2023-12-20T01:00:00 1000 f2 foo
277+
2023-12-20T01:10:00 1000 f1 32.0
278+
2023-12-20T01:10:00 1000 f2 foo
279+
2023-12-20T01:20:00 1000 f1 32.0
280+
2023-12-20T01:20:00 1000 f2 foo
281+
2023-12-20T01:30:00 1000 f1 32.0
282+
2023-12-20T01:30:00 1000 f2 foo

0 commit comments

Comments
 (0)