Skip to content

Commit 843cd93

Browse files
authored
Register tables in BallistaContext using TableProviders instead of Dataframe (#1028)
1 parent 2258256 commit 843cd93

File tree

5 files changed

+25
-105
lines changed

5 files changed

+25
-105
lines changed

ballista/rust/client/src/context.rs

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,11 @@ use std::path::PathBuf;
2323
use std::sync::{Arc, Mutex};
2424

2525
use ballista_core::config::BallistaConfig;
26-
use ballista_core::{
27-
datasource::DfTableAdapter, utils::create_df_ctx_with_ballista_query_planner,
28-
};
26+
use ballista_core::utils::create_df_ctx_with_ballista_query_planner;
2927

3028
use datafusion::catalog::TableReference;
3129
use datafusion::dataframe::DataFrame;
30+
use datafusion::datasource::TableProvider;
3231
use datafusion::error::{DataFusionError, Result};
3332
use datafusion::execution::dataframe_impl::DataFrameImpl;
3433
use datafusion::logical_plan::LogicalPlan;
@@ -44,7 +43,7 @@ struct BallistaContextState {
4443
/// Scheduler port
4544
scheduler_port: u16,
4645
/// Tables that have been registered with this context
47-
tables: HashMap<String, LogicalPlan>,
46+
tables: HashMap<String, Arc<dyn TableProvider>>,
4847
}
4948

5049
impl BallistaContextState {
@@ -197,11 +196,13 @@ impl BallistaContext {
197196
}
198197

199198
/// Register a DataFrame as a table that can be referenced from a SQL query
200-
pub fn register_table(&self, name: &str, table: &dyn DataFrame) -> Result<()> {
199+
pub fn register_table(
200+
&self,
201+
name: &str,
202+
table: Arc<dyn TableProvider>,
203+
) -> Result<()> {
201204
let mut state = self.state.lock().unwrap();
202-
state
203-
.tables
204-
.insert(name.to_owned(), table.to_logical_plan());
205+
state.tables.insert(name.to_owned(), table);
205206
Ok(())
206207
}
207208

@@ -211,13 +212,17 @@ impl BallistaContext {
211212
path: &str,
212213
options: CsvReadOptions,
213214
) -> Result<()> {
214-
let df = self.read_csv(path, options)?;
215-
self.register_table(name, df.as_ref())
215+
match self.read_csv(path, options)?.to_logical_plan() {
216+
LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
217+
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
218+
}
216219
}
217220

218221
pub fn register_parquet(&self, name: &str, path: &str) -> Result<()> {
219-
let df = self.read_parquet(path)?;
220-
self.register_table(name, df.as_ref())
222+
match self.read_parquet(path)?.to_logical_plan() {
223+
LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
224+
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
225+
}
221226
}
222227

223228
pub fn register_avro(
@@ -226,9 +231,10 @@ impl BallistaContext {
226231
path: &str,
227232
options: AvroReadOptions,
228233
) -> Result<()> {
229-
let df = self.read_avro(path, options)?;
230-
self.register_table(name, df.as_ref())?;
231-
Ok(())
234+
match self.read_avro(path, options)?.to_logical_plan() {
235+
LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
236+
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
237+
}
232238
}
233239

234240
/// Create a DataFrame from a SQL statement
@@ -245,12 +251,10 @@ impl BallistaContext {
245251
// register tables with DataFusion context
246252
{
247253
let state = self.state.lock().unwrap();
248-
for (name, plan) in &state.tables {
249-
let plan = ctx.optimize(plan)?;
250-
let execution_plan = ctx.create_physical_plan(&plan)?;
254+
for (name, prov) in &state.tables {
251255
ctx.register_table(
252256
TableReference::Bare { table: name },
253-
Arc::new(DfTableAdapter::new(plan, execution_plan)),
257+
Arc::clone(prov),
254258
)?;
255259
}
256260
}

ballista/rust/core/src/datasource.rs

Lines changed: 0 additions & 64 deletions
This file was deleted.

ballista/rust/core/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ pub fn print_version() {
2525

2626
pub mod client;
2727
pub mod config;
28-
pub mod datasource;
2928
pub mod error;
3029
pub mod execution_plans;
3130
pub mod memory_stream;

ballista/rust/core/src/serde/logical_plan/to_proto.rs

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
//! processes.
2121
2222
use super::super::proto_error;
23-
use crate::datasource::DfTableAdapter;
2423
use crate::serde::{protobuf, BallistaError};
2524
use datafusion::arrow::datatypes::{
2625
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit,
@@ -728,20 +727,7 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
728727
..
729728
} => {
730729
let schema = source.schema();
731-
732-
// unwrap the DFTableAdapter to get to the real TableProvider
733-
let source = if let Some(adapter) =
734-
source.as_any().downcast_ref::<DfTableAdapter>()
735-
{
736-
match &adapter.logical_plan {
737-
LogicalPlan::TableScan { source, .. } => Ok(source.as_any()),
738-
_ => Err(BallistaError::General(
739-
"Invalid LogicalPlan::TableScan".to_owned(),
740-
)),
741-
}
742-
} else {
743-
Ok(source.as_any())
744-
}?;
730+
let source = source.as_any();
745731

746732
let projection = match projection {
747733
None => None,

ballista/rust/scheduler/src/planner.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,11 @@
2222
use std::collections::HashMap;
2323
use std::sync::Arc;
2424

25-
use ballista_core::datasource::DfTableAdapter;
2625
use ballista_core::error::{BallistaError, Result};
2726
use ballista_core::{
2827
execution_plans::{ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec},
2928
serde::scheduler::PartitionLocation,
3029
};
31-
use datafusion::execution::context::ExecutionContext;
3230
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
3331
use datafusion::physical_plan::repartition::RepartitionExec;
3432
use datafusion::physical_plan::windows::WindowAggExec;
@@ -96,10 +94,7 @@ impl DistributedPlanner {
9694
stages.append(&mut child_stages);
9795
}
9896

99-
if let Some(adapter) = execution_plan.as_any().downcast_ref::<DfTableAdapter>() {
100-
let ctx = ExecutionContext::new();
101-
Ok((ctx.create_physical_plan(&adapter.logical_plan)?, stages))
102-
} else if let Some(coalesce) = execution_plan
97+
if let Some(coalesce) = execution_plan
10398
.as_any()
10499
.downcast_ref::<CoalescePartitionsExec>()
105100
{

0 commit comments

Comments
 (0)