Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use std::mem;
use std::sync::Arc;

use datafusion_catalog::Session;
use datafusion_common::internal_err;
use datafusion_common::{HashMap, Result, ScalarValue};
use datafusion_common::{
assert_or_internal_err, DataFusionError, HashMap, Result, ScalarValue,
};
use datafusion_datasource::ListingTableUrl;
use datafusion_datasource::PartitionedFile;
use datafusion_expr::{lit, utils, BinaryExpr, Operator};
Expand Down Expand Up @@ -386,12 +387,11 @@ pub async fn pruned_partition_list<'a>(
.try_filter(|object_meta| futures::future::ready(object_meta.size > 0));

if partition_cols.is_empty() {
if !filters.is_empty() {
return internal_err!(
"Got partition filters for unpartitioned table {}",
table_path
);
}
assert_or_internal_err!(
filters.is_empty(),
"Got partition filters for unpartitioned table {}",
table_path
);

// if no partition col => simply list all the files
Ok(objects.map_ok(|object_meta| object_meta.into()).boxed())
Expand Down
107 changes: 45 additions & 62 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use std::mem::{size_of, size_of_val};
use std::str::FromStr;
use std::sync::Arc;

use crate::assert_or_internal_err;
use crate::cast::{
as_binary_array, as_binary_view_array, as_boolean_array, as_date32_array,
as_date64_array, as_decimal128_array, as_decimal256_array, as_decimal32_array,
Expand Down Expand Up @@ -78,8 +79,8 @@ use arrow::compute::kernels::numeric::{
use arrow::datatypes::{
i256, validate_decimal_precision_and_scale, ArrowDictionaryKeyType, ArrowNativeType,
ArrowTimestampType, DataType, Date32Type, Decimal128Type, Decimal256Type,
Decimal32Type, Decimal64Type, Field, Float32Type, Int16Type, Int32Type, Int64Type,
Int8Type, IntervalDayTime, IntervalDayTimeType, IntervalMonthDayNano,
Decimal32Type, Decimal64Type, DecimalType, Field, Float32Type, Int16Type, Int32Type,
Int64Type, Int8Type, IntervalDayTime, IntervalDayTimeType, IntervalMonthDayNano,
IntervalMonthDayNanoType, IntervalUnit, IntervalYearMonthType, TimeUnit,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, UnionFields,
Expand Down Expand Up @@ -1578,12 +1579,10 @@ impl ScalarValue {
DataType::Float32 => ScalarValue::Float32(Some(1.0)),
DataType::Float64 => ScalarValue::Float64(Some(1.0)),
DataType::Decimal32(precision, scale) => {
validate_decimal_precision_and_scale::<Decimal32Type>(
Self::validate_decimal_or_internal_err::<Decimal32Type>(
*precision, *scale,
)?;
if *scale < 0 {
return _internal_err!("Negative scale is not supported");
}
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
match 10_i32.checked_pow(*scale as u32) {
Some(value) => {
ScalarValue::Decimal32(Some(value), *precision, *scale)
Expand All @@ -1592,12 +1591,10 @@ impl ScalarValue {
}
}
DataType::Decimal64(precision, scale) => {
validate_decimal_precision_and_scale::<Decimal64Type>(
Self::validate_decimal_or_internal_err::<Decimal64Type>(
*precision, *scale,
)?;
if *scale < 0 {
return _internal_err!("Negative scale is not supported");
}
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
match i64::from(10).checked_pow(*scale as u32) {
Some(value) => {
ScalarValue::Decimal64(Some(value), *precision, *scale)
Expand All @@ -1606,12 +1603,10 @@ impl ScalarValue {
}
}
DataType::Decimal128(precision, scale) => {
validate_decimal_precision_and_scale::<Decimal128Type>(
Self::validate_decimal_or_internal_err::<Decimal128Type>(
*precision, *scale,
)?;
if *scale < 0 {
return _internal_err!("Negative scale is not supported");
}
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
match i128::from(10).checked_pow(*scale as u32) {
Some(value) => {
ScalarValue::Decimal128(Some(value), *precision, *scale)
Expand All @@ -1620,12 +1615,10 @@ impl ScalarValue {
}
}
DataType::Decimal256(precision, scale) => {
validate_decimal_precision_and_scale::<Decimal256Type>(
Self::validate_decimal_or_internal_err::<Decimal256Type>(
*precision, *scale,
)?;
if *scale < 0 {
return _internal_err!("Negative scale is not supported");
}
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
match i256::from(10).checked_pow(*scale as u32) {
Some(value) => {
ScalarValue::Decimal256(Some(value), *precision, *scale)
Expand All @@ -1652,12 +1645,10 @@ impl ScalarValue {
DataType::Float32 => ScalarValue::Float32(Some(-1.0)),
DataType::Float64 => ScalarValue::Float64(Some(-1.0)),
DataType::Decimal32(precision, scale) => {
validate_decimal_precision_and_scale::<Decimal32Type>(
Self::validate_decimal_or_internal_err::<Decimal32Type>(
*precision, *scale,
)?;
if *scale < 0 {
return _internal_err!("Negative scale is not supported");
}
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
match 10_i32.checked_pow(*scale as u32) {
Some(value) => {
ScalarValue::Decimal32(Some(-value), *precision, *scale)
Expand All @@ -1666,12 +1657,10 @@ impl ScalarValue {
}
}
DataType::Decimal64(precision, scale) => {
validate_decimal_precision_and_scale::<Decimal64Type>(
Self::validate_decimal_or_internal_err::<Decimal64Type>(
*precision, *scale,
)?;
if *scale < 0 {
return _internal_err!("Negative scale is not supported");
}
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
match i64::from(10).checked_pow(*scale as u32) {
Some(value) => {
ScalarValue::Decimal64(Some(-value), *precision, *scale)
Expand All @@ -1680,12 +1669,10 @@ impl ScalarValue {
}
}
DataType::Decimal128(precision, scale) => {
validate_decimal_precision_and_scale::<Decimal128Type>(
Self::validate_decimal_or_internal_err::<Decimal128Type>(
*precision, *scale,
)?;
if *scale < 0 {
return _internal_err!("Negative scale is not supported");
}
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
match i128::from(10).checked_pow(*scale as u32) {
Some(value) => {
ScalarValue::Decimal128(Some(-value), *precision, *scale)
Expand All @@ -1694,12 +1681,10 @@ impl ScalarValue {
}
}
DataType::Decimal256(precision, scale) => {
validate_decimal_precision_and_scale::<Decimal256Type>(
Self::validate_decimal_or_internal_err::<Decimal256Type>(
*precision, *scale,
)?;
if *scale < 0 {
return _internal_err!("Negative scale is not supported");
}
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
match i256::from(10).checked_pow(*scale as u32) {
Some(value) => {
ScalarValue::Decimal256(Some(-value), *precision, *scale)
Expand Down Expand Up @@ -1729,14 +1714,10 @@ impl ScalarValue {
DataType::Float32 => ScalarValue::Float32(Some(10.0)),
DataType::Float64 => ScalarValue::Float64(Some(10.0)),
DataType::Decimal32(precision, scale) => {
if let Err(err) = validate_decimal_precision_and_scale::<Decimal32Type>(
Self::validate_decimal_or_internal_err::<Decimal32Type>(
*precision, *scale,
) {
return _internal_err!("Invalid precision and scale {err}");
}
if *scale < 0 {
return _internal_err!("Negative scale is not supported");
}
)?;
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
match 10_i32.checked_pow((*scale + 1) as u32) {
Some(value) => {
ScalarValue::Decimal32(Some(value), *precision, *scale)
Expand All @@ -1745,14 +1726,10 @@ impl ScalarValue {
}
}
DataType::Decimal64(precision, scale) => {
if let Err(err) = validate_decimal_precision_and_scale::<Decimal64Type>(
Self::validate_decimal_or_internal_err::<Decimal64Type>(
*precision, *scale,
) {
return _internal_err!("Invalid precision and scale {err}");
}
if *scale < 0 {
return _internal_err!("Negative scale is not supported");
}
)?;
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
match i64::from(10).checked_pow((*scale + 1) as u32) {
Some(value) => {
ScalarValue::Decimal64(Some(value), *precision, *scale)
Expand All @@ -1761,14 +1738,10 @@ impl ScalarValue {
}
}
DataType::Decimal128(precision, scale) => {
if let Err(err) = validate_decimal_precision_and_scale::<Decimal128Type>(
Self::validate_decimal_or_internal_err::<Decimal128Type>(
*precision, *scale,
) {
return _internal_err!("Invalid precision and scale {err}");
}
if *scale < 0 {
return _internal_err!("Negative scale is not supported");
}
)?;
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
match i128::from(10).checked_pow((*scale + 1) as u32) {
Some(value) => {
ScalarValue::Decimal128(Some(value), *precision, *scale)
Expand All @@ -1777,14 +1750,10 @@ impl ScalarValue {
}
}
DataType::Decimal256(precision, scale) => {
if let Err(err) = validate_decimal_precision_and_scale::<Decimal256Type>(
Self::validate_decimal_or_internal_err::<Decimal256Type>(
*precision, *scale,
) {
return _internal_err!("Invalid precision and scale {err}");
}
if *scale < 0 {
return _internal_err!("Negative scale is not supported");
}
)?;
assert_or_internal_err!(*scale >= 0, "Negative scale is not supported");
match i256::from(10).checked_pow((*scale + 1) as u32) {
Some(value) => {
ScalarValue::Decimal256(Some(value), *precision, *scale)
Expand Down Expand Up @@ -4354,6 +4323,20 @@ impl ScalarValue {
_ => None,
}
}

/// A thin wrapper on Arrow's validation that throws internal error if validation
/// fails.
fn validate_decimal_or_internal_err<T: DecimalType>(
precision: u8,
scale: i8,
) -> Result<()> {
validate_decimal_precision_and_scale::<T>(precision, scale).map_err(|err| {
_internal_datafusion_err!(
"Decimal precision/scale invariant violated \
(precision={precision}, scale={scale}): {err}"
)
})
}
}

/// Compacts the data of an `ArrayData` into a new `ArrayData`.
Expand Down
9 changes: 4 additions & 5 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ pub mod memory;
pub mod proxy;
pub mod string_utils;

use crate::error::{_exec_datafusion_err, _internal_datafusion_err, _internal_err};
use crate::{Result, ScalarValue};
use crate::assert_or_internal_err;
use crate::error::{_exec_datafusion_err, _internal_datafusion_err};
use crate::{DataFusionError, Result, ScalarValue};
use arrow::array::{
cast::AsArray, Array, ArrayRef, FixedSizeListArray, LargeListArray, ListArray,
OffsetSizeTrait,
Expand Down Expand Up @@ -519,9 +520,7 @@ pub fn arrays_into_list_array(
arr: impl IntoIterator<Item = ArrayRef>,
) -> Result<ListArray> {
let arr = arr.into_iter().collect::<Vec<_>>();
if arr.is_empty() {
return _internal_err!("Cannot wrap empty array into list array");
}
assert_or_internal_err!(!arr.is_empty(), "Cannot wrap empty array into list array");

let lens = arr.iter().map(|x| x.len()).collect::<Vec<_>>();
// Assume data type is consistent
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1757,11 +1757,11 @@ fn qualify_join_schema_sides(
let join_fields = join_schema.fields();

// Validate lengths
if join_fields.len() != left_fields.len() + right_fields.len() {
return internal_err!(
"Join schema field count must match left and right field count."
);
}
assert_eq_or_internal_err!(
join_fields.len(),
left_fields.len() + right_fields.len(),
"Join schema field count must match left and right field count."
);

// Validate field names match
for (i, (field, expected)) in join_fields
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/tests/parquet/external_access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,10 @@ async fn bad_selection() {
.await
.unwrap_err();
let err_string = err.to_string();
assert_contains!(&err_string, "Internal error: Invalid ParquetAccessPlan Selection. Row group 0 has 5 rows but selection only specifies 4 rows");
assert_contains!(
&err_string,
"Row group 0 has 5 rows but selection only specifies 4 rows."
);
}

/// Return a RowSelection of 1 rows from a row group of 5 rows
Expand Down
19 changes: 11 additions & 8 deletions datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ use arrow::{
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::{
common::cast::as_int64_array,
common::{arrow_datafusion_err, internal_err, DFSchemaRef},
common::{arrow_datafusion_err, DFSchemaRef},
error::{DataFusionError, Result},
execution::{
context::{QueryPlanner, SessionState, TaskContext},
Expand All @@ -91,7 +91,7 @@ use datafusion::{
};
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::ScalarValue;
use datafusion_common::{assert_eq_or_internal_err, assert_or_internal_err, ScalarValue};
use datafusion_expr::{FetchType, InvariantLevel, Projection, SortExpr};
use datafusion_optimizer::optimizer::ApplyOrder;
use datafusion_optimizer::AnalyzerRule;
Expand Down Expand Up @@ -585,9 +585,10 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode {
kind,
}) = self.invariant_mock.clone()
{
if should_fail_invariant && check == kind {
return internal_err!("node fails check, such as improper inputs");
}
assert_or_internal_err!(
!(should_fail_invariant && check == kind),
"node fails check, such as improper inputs"
);
}
Ok(())
}
Expand Down Expand Up @@ -733,9 +734,11 @@ impl ExecutionPlan for TopKExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if 0 != partition {
return internal_err!("TopKExec invalid partition {partition}");
}
assert_eq_or_internal_err!(
partition,
0,
"TopKExec invalid partition {partition}"
);

Ok(Box::pin(TopKReader {
input: self.input.execute(partition, context)?,
Expand Down
Loading