Skip to content

Commit e54e432

Browse files
committed
Merge remote-tracking branch 'upstream/main' into varchar_default_ut8view
2 parents 2dc6bc5 + 16c7939 commit e54e432

File tree

23 files changed

+393
-364
lines changed

23 files changed

+393
-364
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ rstest = "0.25.0"
168168
serde_json = "1"
169169
sqlparser = { version = "0.55.0", features = ["visitor"] }
170170
tempfile = "3"
171-
tokio = { version = "1.44", features = ["macros", "rt", "sync"] }
171+
tokio = { version = "1.45", features = ["macros", "rt", "sync"] }
172172
url = "2.5.4"
173173

174174
[profile.release]

benchmarks/src/tpch/run.rs

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -109,29 +109,32 @@ impl RunOpt {
109109
};
110110

111111
let mut benchmark_run = BenchmarkRun::new();
112-
for query_id in query_range {
113-
benchmark_run.start_new_case(&format!("Query {query_id}"));
114-
let query_run = self.benchmark_query(query_id).await?;
115-
for iter in query_run {
116-
benchmark_run.write_iter(iter.elapsed, iter.row_count);
117-
}
118-
}
119-
benchmark_run.maybe_write_json(self.output_path.as_ref())?;
120-
Ok(())
121-
}
122-
123-
async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
124112
let mut config = self
125113
.common
126114
.config()?
127115
.with_collect_statistics(!self.disable_statistics);
128116
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
129117
let rt_builder = self.common.runtime_env_builder()?;
130118
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
131-
132119
// register tables
133120
self.register_tables(&ctx).await?;
134121

122+
for query_id in query_range {
123+
benchmark_run.start_new_case(&format!("Query {query_id}"));
124+
let query_run = self.benchmark_query(query_id, &ctx).await?;
125+
for iter in query_run {
126+
benchmark_run.write_iter(iter.elapsed, iter.row_count);
127+
}
128+
}
129+
benchmark_run.maybe_write_json(self.output_path.as_ref())?;
130+
Ok(())
131+
}
132+
133+
async fn benchmark_query(
134+
&self,
135+
query_id: usize,
136+
ctx: &SessionContext,
137+
) -> Result<Vec<QueryResult>> {
135138
let mut millis = vec![];
136139
// run benchmark
137140
let mut query_results = vec![];
@@ -146,14 +149,14 @@ impl RunOpt {
146149
if query_id == 15 {
147150
for (n, query) in sql.iter().enumerate() {
148151
if n == 1 {
149-
result = self.execute_query(&ctx, query).await?;
152+
result = self.execute_query(ctx, query).await?;
150153
} else {
151-
self.execute_query(&ctx, query).await?;
154+
self.execute_query(ctx, query).await?;
152155
}
153156
}
154157
} else {
155158
for query in sql {
156-
result = self.execute_query(&ctx, query).await?;
159+
result = self.execute_query(ctx, query).await?;
157160
}
158161
}
159162

datafusion/common-runtime/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,4 @@ log = { workspace = true }
4343
tokio = { workspace = true }
4444

4545
[dev-dependencies]
46-
tokio = { version = "1.44", features = ["rt", "rt-multi-thread", "time"] }
46+
tokio = { version = "1.45", features = ["rt", "rt-multi-thread", "time"] }

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -784,7 +784,7 @@ async fn explain_logical_plan_only() {
784784
vec!["logical_plan", "Projection: count(Int64(1)) AS count(*)\
785785
\n Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]\
786786
\n SubqueryAlias: t\
787-
\n Projection: \
787+
\n Projection:\
788788
\n Values: (Utf8(\"a\"), Int64(1), Int64(100)), (Utf8(\"a\"), Int64(2), Int64(150))"]];
789789
assert_eq!(expected, actual);
790790
}

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1815,12 +1815,12 @@ impl LogicalPlan {
18151815
Ok(())
18161816
}
18171817
LogicalPlan::Projection(Projection { ref expr, .. }) => {
1818-
write!(f, "Projection: ")?;
1818+
write!(f, "Projection:")?;
18191819
for (i, expr_item) in expr.iter().enumerate() {
18201820
if i > 0 {
1821-
write!(f, ", ")?;
1821+
write!(f, ",")?;
18221822
}
1823-
write!(f, "{expr_item}")?;
1823+
write!(f, " {expr_item}")?;
18241824
}
18251825
Ok(())
18261826
}

datafusion/functions-nested/src/length.rs

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@
1919
2020
use crate::utils::make_scalar_function;
2121
use arrow::array::{
22-
Array, ArrayRef, Int64Array, LargeListArray, ListArray, OffsetSizeTrait, UInt64Array,
22+
Array, ArrayRef, FixedSizeListArray, Int64Array, LargeListArray, ListArray,
23+
OffsetSizeTrait, UInt64Array,
2324
};
2425
use arrow::datatypes::{
2526
DataType,
2627
DataType::{FixedSizeList, LargeList, List, UInt64},
2728
};
28-
use datafusion_common::cast::{as_generic_list_array, as_int64_array};
29+
use datafusion_common::cast::{
30+
as_fixed_size_list_array, as_generic_list_array, as_int64_array,
31+
};
2932
use datafusion_common::{exec_err, internal_datafusion_err, plan_err, Result};
3033
use datafusion_expr::{
3134
ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
@@ -119,6 +122,23 @@ impl ScalarUDFImpl for ArrayLength {
119122
}
120123
}
121124

125+
macro_rules! array_length_impl {
126+
($array:expr, $dimension:expr) => {{
127+
let array = $array;
128+
let dimension = match $dimension {
129+
Some(d) => as_int64_array(d)?.clone(),
130+
None => Int64Array::from_value(1, array.len()),
131+
};
132+
let result = array
133+
.iter()
134+
.zip(dimension.iter())
135+
.map(|(arr, dim)| compute_array_length(arr, dim))
136+
.collect::<Result<UInt64Array>>()?;
137+
138+
Ok(Arc::new(result) as ArrayRef)
139+
}};
140+
}
141+
122142
/// Array_length SQL function
123143
pub fn array_length_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
124144
if args.len() != 1 && args.len() != 2 {
@@ -128,26 +148,18 @@ pub fn array_length_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
128148
match &args[0].data_type() {
129149
List(_) => general_array_length::<i32>(args),
130150
LargeList(_) => general_array_length::<i64>(args),
151+
FixedSizeList(_, _) => fixed_size_array_length(args),
131152
array_type => exec_err!("array_length does not support type '{array_type:?}'"),
132153
}
133154
}
134155

156+
fn fixed_size_array_length(array: &[ArrayRef]) -> Result<ArrayRef> {
157+
array_length_impl!(as_fixed_size_list_array(&array[0])?, array.get(1))
158+
}
159+
135160
/// Dispatch array length computation based on the offset type.
136161
fn general_array_length<O: OffsetSizeTrait>(array: &[ArrayRef]) -> Result<ArrayRef> {
137-
let list_array = as_generic_list_array::<O>(&array[0])?;
138-
let dimension = if array.len() == 2 {
139-
as_int64_array(&array[1])?.clone()
140-
} else {
141-
Int64Array::from_value(1, list_array.len())
142-
};
143-
144-
let result = list_array
145-
.iter()
146-
.zip(dimension.iter())
147-
.map(|(arr, dim)| compute_array_length(arr, dim))
148-
.collect::<Result<UInt64Array>>()?;
149-
150-
Ok(Arc::new(result) as ArrayRef)
162+
array_length_impl!(as_generic_list_array::<O>(&array[0])?, array.get(1))
151163
}
152164

153165
/// Returns the length of a concrete array dimension
@@ -185,6 +197,10 @@ fn compute_array_length(
185197
value = downcast_arg!(value, LargeListArray).value(0);
186198
current_dimension += 1;
187199
}
200+
FixedSizeList(_, _) => {
201+
value = downcast_arg!(value, FixedSizeListArray).value(0);
202+
current_dimension += 1;
203+
}
188204
_ => return Ok(None),
189205
}
190206
}

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1124,7 +1124,7 @@ mod tests {
11241124
plan,
11251125
@r"
11261126
Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]]
1127-
Projection:
1127+
Projection:
11281128
Aggregate: groupBy=[[]], aggr=[[count(Int32(1))]]
11291129
TableScan: ?table? projection=[]
11301130
"

datafusion/optimizer/tests/optimizer_integration.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ fn between_date32_plus_interval() -> Result<()> {
250250
format!("{plan}"),
251251
@r#"
252252
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
253-
Projection:
253+
Projection:
254254
Filter: test.col_date32 >= Date32("1998-03-18") AND test.col_date32 <= Date32("1998-06-16")
255255
TableScan: test projection=[col_date32]
256256
"#
@@ -268,7 +268,7 @@ fn between_date64_plus_interval() -> Result<()> {
268268
format!("{plan}"),
269269
@r#"
270270
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
271-
Projection:
271+
Projection:
272272
Filter: test.col_date64 >= Date64("1998-03-18") AND test.col_date64 <= Date64("1998-06-16")
273273
TableScan: test projection=[col_date64]
274274
"#

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,8 +1307,8 @@ fn lookup_join_hashmap(
13071307
limit: usize,
13081308
offset: JoinHashMapOffset,
13091309
) -> Result<(UInt64Array, UInt32Array, Option<JoinHashMapOffset>)> {
1310-
let (probe_indices, build_indices, next_offset) = build_hashmap
1311-
.get_matched_indices_with_limit_offset(hashes_buffer, None, limit, offset);
1310+
let (probe_indices, build_indices, next_offset) =
1311+
build_hashmap.get_matched_indices_with_limit_offset(hashes_buffer, limit, offset);
13121312

13131313
let build_indices: UInt64Array = build_indices.into();
13141314
let probe_indices: UInt32Array = probe_indices.into();
@@ -3333,7 +3333,7 @@ mod tests {
33333333

33343334
#[test]
33353335
fn join_with_hash_collision() -> Result<()> {
3336-
let mut hashmap_left = HashTable::with_capacity(2);
3336+
let mut hashmap_left = HashTable::with_capacity(4);
33373337
let left = build_table_i32(
33383338
("a", &vec![10, 20]),
33393339
("x", &vec![100, 200]),
@@ -3348,9 +3348,15 @@ mod tests {
33483348
hashes_buff,
33493349
)?;
33503350

3351-
// Create hash collisions (same hashes)
3351+
// Maps both values to both indices (1 and 2, representing input 0 and 1)
3352+
// 0 -> (0, 1)
3353+
// 1 -> (0, 2)
3354+
// The equality check will make sure only hashes[0] maps to 0 and hashes[1] maps to 1
33523355
hashmap_left.insert_unique(hashes[0], (hashes[0], 1), |(h, _)| *h);
3356+
hashmap_left.insert_unique(hashes[0], (hashes[0], 2), |(h, _)| *h);
3357+
33533358
hashmap_left.insert_unique(hashes[1], (hashes[1], 1), |(h, _)| *h);
3359+
hashmap_left.insert_unique(hashes[1], (hashes[1], 2), |(h, _)| *h);
33543360

33553361
let next = vec![2, 0];
33563362

0 commit comments

Comments
 (0)