Skip to content

Commit 85777c4

Browse files
authored
minor: refactor with assert_or_internal_err!() in datafusion/physical-plan (#18730)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Part of #18613 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent b0015d3 commit 85777c4

File tree

26 files changed

+261
-221
lines changed

26 files changed

+261
-221
lines changed

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ use arrow::datatypes::{Field, Schema, SchemaRef};
4444
use arrow::record_batch::RecordBatch;
4545
use arrow_schema::FieldRef;
4646
use datafusion_common::stats::Precision;
47-
use datafusion_common::{internal_err, not_impl_err, Constraint, Constraints, Result};
47+
use datafusion_common::{
48+
assert_eq_or_internal_err, not_impl_err, Constraint, Constraints, DataFusionError,
49+
Result,
50+
};
4851
use datafusion_execution::TaskContext;
4952
use datafusion_expr::{Accumulator, Aggregate};
5053
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
@@ -492,9 +495,13 @@ impl AggregateExec {
492495
schema: SchemaRef,
493496
) -> Result<Self> {
494497
// Make sure arguments are consistent in size
495-
if aggr_expr.len() != filter_expr.len() {
496-
return internal_err!("Inconsistent aggregate expr: {:?} and filter expr: {:?} for AggregateExec, their size should match", aggr_expr, filter_expr);
497-
}
498+
assert_eq_or_internal_err!(
499+
aggr_expr.len(),
500+
filter_expr.len(),
501+
"Inconsistent aggregate expr: {:?} and filter expr: {:?} for AggregateExec, their size should match",
502+
aggr_expr,
503+
filter_expr
504+
);
498505

499506
let input_eq_properties = input.equivalence_properties();
500507
// Get GROUP BY expressions:

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ use crate::{RecordBatchStream, SendableRecordBatchStream};
3939

4040
use arrow::array::*;
4141
use arrow::datatypes::SchemaRef;
42-
use datafusion_common::{internal_err, DataFusionError, Result};
42+
use datafusion_common::{
43+
assert_eq_or_internal_err, assert_or_internal_err, internal_err, DataFusionError,
44+
Result,
45+
};
4346
use datafusion_execution::memory_pool::proxy::VecAllocExt;
4447
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
4548
use datafusion_execution::TaskContext;
@@ -938,9 +941,10 @@ impl GroupedHashAggregateStream {
938941
)?;
939942
}
940943
_ => {
941-
if opt_filter.is_some() {
942-
return internal_err!("aggregate filter should be applied in partial stage, there should be no filter in final stage");
943-
}
944+
assert_or_internal_err!(
945+
opt_filter.is_none(),
946+
"aggregate filter should be applied in partial stage, there should be no filter in final stage"
947+
);
944948

945949
// if aggregation is over intermediate states,
946950
// use merge
@@ -1218,9 +1222,11 @@ impl GroupedHashAggregateStream {
12181222
let input_values = evaluate_many(&self.aggregate_arguments, &batch)?;
12191223
let filter_values = evaluate_optional(&self.filter_expressions, &batch)?;
12201224

1221-
if group_values.len() != 1 {
1222-
return internal_err!("group_values expected to have single element");
1223-
}
1225+
assert_eq_or_internal_err!(
1226+
group_values.len(),
1227+
1,
1228+
"group_values expected to have single element"
1229+
);
12241230
let mut output = group_values.swap_remove(0);
12251231

12261232
let iter = self

datafusion/physical-plan/src/analyze.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
3131

3232
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
3333
use datafusion_common::instant::Instant;
34-
use datafusion_common::{internal_err, DataFusionError, Result};
34+
use datafusion_common::{assert_eq_or_internal_err, DataFusionError, Result};
3535
use datafusion_execution::TaskContext;
3636
use datafusion_physical_expr::EquivalenceProperties;
3737

@@ -161,11 +161,11 @@ impl ExecutionPlan for AnalyzeExec {
161161
partition: usize,
162162
context: Arc<TaskContext>,
163163
) -> Result<SendableRecordBatchStream> {
164-
if 0 != partition {
165-
return internal_err!(
166-
"AnalyzeExec invalid partition. Expected 0, got {partition}"
167-
);
168-
}
164+
assert_eq_or_internal_err!(
165+
partition,
166+
0,
167+
"AnalyzeExec invalid partition. Expected 0, got {partition}"
168+
);
169169

170170
// Gather futures that will run each input partition in
171171
// parallel (on a separate tokio task) using a JoinSet to

datafusion/physical-plan/src/async_func.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{
2323
use arrow::array::RecordBatch;
2424
use arrow_schema::{Fields, Schema, SchemaRef};
2525
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
26-
use datafusion_common::{internal_err, Result};
26+
use datafusion_common::{assert_eq_or_internal_err, DataFusionError, Result};
2727
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
2828
use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr;
2929
use datafusion_physical_expr::equivalence::ProjectionMapping;
@@ -148,9 +148,11 @@ impl ExecutionPlan for AsyncFuncExec {
148148
self: Arc<Self>,
149149
children: Vec<Arc<dyn ExecutionPlan>>,
150150
) -> Result<Arc<dyn ExecutionPlan>> {
151-
if children.len() != 1 {
152-
return internal_err!("AsyncFuncExec wrong number of children");
153-
}
151+
assert_eq_or_internal_err!(
152+
children.len(),
153+
1,
154+
"AsyncFuncExec wrong number of children"
155+
);
154156
Ok(Arc::new(AsyncFuncExec::try_new(
155157
self.async_exprs.clone(),
156158
Arc::clone(&children[0]),

datafusion/physical-plan/src/coalesce/mod.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use arrow::array::RecordBatch;
1919
use arrow::compute::BatchCoalescer;
2020
use arrow::datatypes::SchemaRef;
21-
use datafusion_common::{internal_err, Result};
21+
use datafusion_common::{assert_or_internal_err, DataFusionError, Result};
2222

2323
/// Concatenate multiple [`RecordBatch`]es and apply a limit
2424
///
@@ -88,11 +88,10 @@ impl LimitedBatchCoalescer {
8888
/// Returns an error if called after [`Self::finish`] or if the internal push
8989
/// operation fails.
9090
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<PushBatchStatus> {
91-
if self.finished {
92-
return internal_err!(
93-
"LimitedBatchCoalescer: cannot push batch after finish"
94-
);
95-
}
91+
assert_or_internal_err!(
92+
!self.finished,
93+
"LimitedBatchCoalescer: cannot push batch after finish"
94+
);
9695

9796
// if we are at the limit, return LimitReached
9897
if let Some(fetch) = self.fetch {

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ use crate::projection::{make_with_child, ProjectionExec};
3333
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
3434

3535
use datafusion_common::config::ConfigOptions;
36-
use datafusion_common::{internal_err, Result};
36+
use datafusion_common::{
37+
assert_eq_or_internal_err, internal_err, DataFusionError, Result,
38+
};
3739
use datafusion_execution::TaskContext;
3840
use datafusion_physical_expr::PhysicalExpr;
3941

@@ -160,9 +162,11 @@ impl ExecutionPlan for CoalescePartitionsExec {
160162
context: Arc<TaskContext>,
161163
) -> Result<SendableRecordBatchStream> {
162164
// CoalescePartitionsExec produces a single partition
163-
if 0 != partition {
164-
return internal_err!("CoalescePartitionsExec invalid partition {partition}");
165-
}
165+
assert_eq_or_internal_err!(
166+
partition,
167+
0,
168+
"CoalescePartitionsExec invalid partition {partition}"
169+
);
166170

167171
let input_partitions = self.input.output_partitioning().partition_count();
168172
match input_partitions {

datafusion/physical-plan/src/coop.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ use crate::{
8585
};
8686
use arrow::record_batch::RecordBatch;
8787
use arrow_schema::Schema;
88-
use datafusion_common::{internal_err, Result, Statistics};
88+
use datafusion_common::{assert_eq_or_internal_err, DataFusionError, Result, Statistics};
8989
use datafusion_execution::TaskContext;
9090

9191
use crate::execution_plan::SchedulingType;
@@ -269,9 +269,11 @@ impl ExecutionPlan for CooperativeExec {
269269
self: Arc<Self>,
270270
mut children: Vec<Arc<dyn ExecutionPlan>>,
271271
) -> Result<Arc<dyn ExecutionPlan>> {
272-
if children.len() != 1 {
273-
return internal_err!("CooperativeExec requires exactly one child");
274-
}
272+
assert_eq_or_internal_err!(
273+
children.len(),
274+
1,
275+
"CooperativeExec requires exactly one child"
276+
);
275277
Ok(Arc::new(CooperativeExec::new(children.swap_remove(0))))
276278
}
277279

datafusion/physical-plan/src/empty.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::{
2929

3030
use arrow::datatypes::SchemaRef;
3131
use arrow::record_batch::RecordBatch;
32-
use datafusion_common::{internal_err, Result};
32+
use datafusion_common::{assert_or_internal_err, DataFusionError, Result};
3333
use datafusion_execution::TaskContext;
3434
use datafusion_physical_expr::EquivalenceProperties;
3535

@@ -136,13 +136,12 @@ impl ExecutionPlan for EmptyExec {
136136
) -> Result<SendableRecordBatchStream> {
137137
trace!("Start EmptyExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
138138

139-
if partition >= self.partitions {
140-
return internal_err!(
141-
"EmptyExec invalid partition {} (expected less than {})",
142-
partition,
143-
self.partitions
144-
);
145-
}
139+
assert_or_internal_err!(
140+
partition < self.partitions,
141+
"EmptyExec invalid partition {} (expected less than {})",
142+
partition,
143+
self.partitions
144+
);
146145

147146
Ok(Box::pin(MemoryStream::try_new(
148147
self.data()?,
@@ -157,13 +156,12 @@ impl ExecutionPlan for EmptyExec {
157156

158157
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
159158
if let Some(partition) = partition {
160-
if partition >= self.partitions {
161-
return internal_err!(
162-
"EmptyExec invalid partition {} (expected less than {})",
163-
partition,
164-
self.partitions
165-
);
166-
}
159+
assert_or_internal_err!(
160+
partition < self.partitions,
161+
"EmptyExec invalid partition {} (expected less than {})",
162+
partition,
163+
self.partitions
164+
);
167165
}
168166

169167
let batch = self

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@ use crate::stream::RecordBatchStreamAdapter;
4747
use arrow::array::{Array, RecordBatch};
4848
use arrow::datatypes::SchemaRef;
4949
use datafusion_common::config::ConfigOptions;
50-
use datafusion_common::{exec_err, Constraints, DataFusionError, Result};
50+
use datafusion_common::{
51+
assert_eq_or_internal_err, assert_or_internal_err, exec_err, Constraints,
52+
DataFusionError, Result,
53+
};
5154
use datafusion_common_runtime::JoinSet;
5255
use datafusion_execution::TaskContext;
5356
use datafusion_physical_expr::EquivalenceProperties;
@@ -484,13 +487,12 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
484487
if let Some(idx) = partition {
485488
// Validate partition index
486489
let partition_count = self.properties().partitioning.partition_count();
487-
if idx >= partition_count {
488-
return internal_err!(
489-
"Invalid partition index: {}, the partition count is {}",
490-
idx,
491-
partition_count
492-
);
493-
}
490+
assert_or_internal_err!(
491+
idx < partition_count,
492+
"Invalid partition index: {}, the partition count is {}",
493+
idx,
494+
partition_count
495+
);
494496
}
495497
Ok(Statistics::new_unknown(&self.schema()))
496498
}
@@ -1082,15 +1084,15 @@ impl PlanProperties {
10821084
macro_rules! check_len {
10831085
($target:expr, $func_name:ident, $expected_len:expr) => {
10841086
let actual_len = $target.$func_name().len();
1085-
if actual_len != $expected_len {
1086-
return internal_err!(
1087-
"{}::{} returned Vec with incorrect size: {} != {}",
1088-
$target.name(),
1089-
stringify!($func_name),
1090-
actual_len,
1091-
$expected_len
1092-
);
1093-
}
1087+
assert_eq_or_internal_err!(
1088+
actual_len,
1089+
$expected_len,
1090+
"{}::{} returned Vec with incorrect size: {} != {}",
1091+
$target.name(),
1092+
stringify!($func_name),
1093+
actual_len,
1094+
$expected_len
1095+
);
10941096
};
10951097
}
10961098

@@ -1127,9 +1129,12 @@ pub fn with_new_children_if_necessary(
11271129
children: Vec<Arc<dyn ExecutionPlan>>,
11281130
) -> Result<Arc<dyn ExecutionPlan>> {
11291131
let old_children = plan.children();
1130-
if children.len() != old_children.len() {
1131-
internal_err!("Wrong number of children")
1132-
} else if children.is_empty()
1132+
assert_eq_or_internal_err!(
1133+
children.len(),
1134+
old_children.len(),
1135+
"Wrong number of children"
1136+
);
1137+
if children.is_empty()
11331138
|| children
11341139
.iter()
11351140
.zip(old_children.iter())

datafusion/physical-plan/src/explain.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
2727

2828
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
2929
use datafusion_common::display::StringifiedPlan;
30-
use datafusion_common::{internal_err, Result};
30+
use datafusion_common::{assert_eq_or_internal_err, DataFusionError, Result};
3131
use datafusion_execution::TaskContext;
3232
use datafusion_physical_expr::EquivalenceProperties;
3333

@@ -134,9 +134,11 @@ impl ExecutionPlan for ExplainExec {
134134
context: Arc<TaskContext>,
135135
) -> Result<SendableRecordBatchStream> {
136136
trace!("Start ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
137-
if 0 != partition {
138-
return internal_err!("ExplainExec invalid partition {partition}");
139-
}
137+
assert_eq_or_internal_err!(
138+
partition,
139+
0,
140+
"ExplainExec invalid partition {partition}"
141+
);
140142
let mut type_builder =
141143
StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
142144
let mut plan_builder =

0 commit comments

Comments
 (0)