Skip to content

Commit

Permalink
remove some unwrap.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Aug 14, 2023
1 parent 68af743 commit 9b481c3
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use table_engine::{
table::ReadRequest,
};

/// Unresolved partitioned table scan which can't be executed before resolving
#[derive(Debug)]
pub struct UnresolvedPartitionedScan {
pub sub_tables: Vec<TableIdentifier>,
Expand Down Expand Up @@ -110,7 +111,6 @@ impl ExecutionPlan for ResolvedPartitionedScan {
self
}

// TODO: check if it is right.
fn schema(&self) -> SchemaRef {
self.remote_exec_plans
.first()
Expand Down
2 changes: 1 addition & 1 deletion df_engine_extensions/src/dist_sql_query/sub_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion::{
};
use table_engine::{provider::ScanTable, remote::model::TableIdentifier, table::ReadRequest};

/// Unresolved sub table scan which can't be executed before rewriting
/// Unresolved sub table scan which can't be executed before resolving
#[derive(Debug)]
pub struct UnresolvedSubTableScan {
pub table: TableIdentifier,
Expand Down
3 changes: 3 additions & 0 deletions partition_table_engine/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ define_result!(Error);
pub enum Error {
#[snafu(display("Internal error, message:{}, err:{}", msg, source))]
Internal { msg: String, source: GenericError },

#[snafu(display("Datafusion error, message:{}, err:{}", msg, source))]
Datafusion { msg: String, source: GenericError },
}
10 changes: 7 additions & 3 deletions partition_table_engine/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use table_engine::{
};
use trace_metric::MetricsCollector;

const SCAN_TABLE_METRICS_COLLECTOR_NAME: &str = "scan_table";
const SCAN_TABLE_METRICS_COLLECTOR_NAME: &str = "scan_partitioned_table";

/// `TableProviderAdapter` for `PartitionedTableImpl`.
// TODO: use single `TableProviderAdapter` for normal table and partitioned table
Expand Down Expand Up @@ -155,12 +155,16 @@ impl TableProviderAdapter {
) -> Result<Arc<dyn ExecutionPlan>> {
// Build partition rule.
let df_partition_rule =
DfPartitionRuleAdapter::new(partition_info.clone(), &self.table.schema()).unwrap();
DfPartitionRuleAdapter::new(partition_info.clone(), &self.table.schema()).map_err(
|e| DataFusionError::Internal(format!("failed to build partition rule, err:{e}")),
)?;

// Evaluate expr and locate partition.
let partitions = df_partition_rule
.locate_partitions_for_read(request.predicate.exprs())
.unwrap();
.map_err(|e| {
DataFusionError::Internal(format!("failed to locate partition for read, err:{e}"))
})?;
let sub_tables = self.get_sub_table_idents(partition_info, partitions);

// Build plan.
Expand Down

0 comments on commit 9b481c3

Please sign in to comment.