Skip to content

Commit

Permalink
Minor: Add debug logging for schema mismatch errors (apache#6626)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored and jayzhan211 committed Jun 12, 2023
1 parent b972466 commit f21a1b2
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 24 deletions.
36 changes: 20 additions & 16 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! [`MemTable`] for querying `Vec<RecordBatch>` by DataFusion.

use futures::StreamExt;
use log::debug;
use std::any::Any;
use std::fmt::{self, Debug, Display};
use std::sync::Arc;
Expand Down Expand Up @@ -55,23 +56,26 @@ pub struct MemTable {
impl MemTable {
/// Create a new in-memory table from the provided schema and record batches
pub fn try_new(schema: SchemaRef, partitions: Vec<Vec<RecordBatch>>) -> Result<Self> {
if partitions
.iter()
.flatten()
.all(|batches| schema.contains(&batches.schema()))
{
Ok(Self {
schema,
batches: partitions
.into_iter()
.map(|e| Arc::new(RwLock::new(e)))
.collect::<Vec<_>>(),
})
} else {
Err(DataFusionError::Plan(
"Mismatch between schema and batches".to_string(),
))
for batches in partitions.iter().flatten() {
let batches_schema = batches.schema();
if !schema.contains(&batches_schema) {
debug!(
"mem table schema does not contain batches schema. \
Target_schema: {schema:?}. Batches Schema: {batches_schema:?}"
);
return Err(DataFusionError::Plan(
"Mismatch between schema and batches".to_string(),
));
}
}

Ok(Self {
schema,
batches: partitions
.into_iter()
.map(|e| Arc::new(RwLock::new(e)))
.collect::<Vec<_>>(),
})
}

/// Create a mem table by reading from another data source
Expand Down
16 changes: 12 additions & 4 deletions datafusion/core/src/datasource/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use async_trait::async_trait;

use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{Expr, TableType};
use log::debug;

use crate::datasource::TableProvider;
use crate::execution::context::{SessionState, TaskContext};
Expand Down Expand Up @@ -53,10 +54,17 @@ impl StreamingTable {
schema: SchemaRef,
partitions: Vec<Arc<dyn PartitionStream>>,
) -> Result<Self> {
if !partitions.iter().all(|x| schema.contains(x.schema())) {
return Err(DataFusionError::Plan(
"Mismatch between schema and batches".to_string(),
));
for x in partitions.iter() {
let partition_schema = x.schema();
if !schema.contains(partition_schema) {
debug!(
"target schema does not contain partition schema. \
Target_schema: {schema:?}. Partiton Schema: {partition_schema:?}"
);
return Err(DataFusionError::Plan(
"Mismatch between schema and batches".to_string(),
));
}
}

Ok(Self {
Expand Down
16 changes: 12 additions & 4 deletions datafusion/core/src/physical_plan/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use futures::stream::StreamExt;

use datafusion_common::{DataFusionError, Result, Statistics};
use datafusion_physical_expr::PhysicalSortExpr;
use log::debug;

use crate::datasource::streaming::PartitionStream;
use crate::physical_plan::stream::RecordBatchStreamAdapter;
Expand All @@ -48,10 +49,17 @@ impl StreamingTableExec {
projection: Option<&Vec<usize>>,
infinite: bool,
) -> Result<Self> {
if !partitions.iter().all(|x| schema.contains(x.schema())) {
return Err(DataFusionError::Plan(
"Mismatch between schema and batches".to_string(),
));
for x in partitions.iter() {
let partition_schema = x.schema();
if !schema.contains(partition_schema) {
debug!(
"target schema does not contain partition schema. \
Target_schema: {schema:?}. Partiton Schema: {partition_schema:?}"
);
return Err(DataFusionError::Plan(
"Mismatch between schema and batches".to_string(),
));
}
}

let projected_schema = match projection {
Expand Down

0 comments on commit f21a1b2

Please sign in to comment.