Skip to content

Commit

Permalink
Upgrade to DataFusion 16 (again) (#636)
Browse files Browse the repository at this point in the history
  • Loading branch information
Brent Gardner authored Jan 30, 2023
1 parent 70032e1 commit 0b6b28f
Show file tree
Hide file tree
Showing 78 changed files with 1,645 additions and 5,109 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

[workspace]
members = [
"benchmarks",
"ballista-cli",
"ballista/client",
"ballista/core",
"ballista/executor",
"ballista/scheduler",
"benchmarks",
"examples",
"ballista-cli",
]
exclude = ["python"]

Expand Down
4 changes: 2 additions & 2 deletions ballista-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ ballista = { path = "../ballista/client", version = "0.10.0", features = [
"standalone",
] }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = "15.0.0"
datafusion-cli = "15.0.0"
datafusion = "16.1.0"
datafusion-cli = "16.1.0"
dirs = "4.0.0"
env_logger = "0.10"
mimalloc = { version = "0.1", default-features = false }
Expand Down
6 changes: 3 additions & 3 deletions ballista-cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl Command {
.map_err(BallistaError::DataFusionError)
}
Self::DescribeTable(name) => {
let df = ctx.sql(&format!("SHOW COLUMNS FROM {}", name)).await?;
let df = ctx.sql(&format!("SHOW COLUMNS FROM {name}")).await?;
let batches = df.collect().await?;
print_options
.print_batches(&batches, now)
Expand Down Expand Up @@ -97,10 +97,10 @@ impl Command {
Self::SearchFunctions(function) => {
if let Ok(func) = function.parse::<Function>() {
let details = func.function_details()?;
println!("{}", details);
println!("{details}");
Ok(())
} else {
let msg = format!("{} is not a supported function", function);
let msg = format!("{function} is not a supported function");
Err(BallistaError::NotImplemented(msg))
}
}
Expand Down
12 changes: 6 additions & 6 deletions ballista-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub async fn exec_from_lines(
if line.ends_with(';') {
match exec_and_print(ctx, print_options, query).await {
Ok(_) => {}
Err(err) => println!("{:?}", err),
Err(err) => println!("{err:?}"),
}
query = "".to_owned();
} else {
Expand All @@ -68,7 +68,7 @@ pub async fn exec_from_lines(
if !query.is_empty() {
match exec_and_print(ctx, print_options, query).await {
Ok(_) => {}
Err(err) => println!("{:?}", err),
Err(err) => println!("{err:?}"),
}
}
}
Expand Down Expand Up @@ -110,7 +110,7 @@ pub async fn exec_from_repl(ctx: &BallistaContext, print_options: &mut PrintOpti
if let Err(e) =
command.execute(&mut print_options).await
{
eprintln!("{}", e)
eprintln!("{e}")
}
} else {
eprintln!(
Expand All @@ -124,7 +124,7 @@ pub async fn exec_from_repl(ctx: &BallistaContext, print_options: &mut PrintOpti
}
_ => {
if let Err(e) = cmd.execute(ctx, &mut print_options).await {
eprintln!("{}", e)
eprintln!("{e}")
}
}
}
Expand All @@ -136,7 +136,7 @@ pub async fn exec_from_repl(ctx: &BallistaContext, print_options: &mut PrintOpti
rl.add_history_entry(line.trim_end());
match exec_and_print(ctx, &print_options, line).await {
Ok(_) => {}
Err(err) => eprintln!("{:?}", err),
Err(err) => eprintln!("{err:?}"),
}
}
Err(ReadlineError::Interrupted) => {
Expand All @@ -148,7 +148,7 @@ pub async fn exec_from_repl(ctx: &BallistaContext, print_options: &mut PrintOpti
break;
}
Err(err) => {
eprintln!("Unknown error happened {:?}", err);
eprintln!("Unknown error happened {err:?}");
break;
}
}
Expand Down
10 changes: 5 additions & 5 deletions ballista-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub async fn main() -> Result<()> {
let args = Args::parse();

if !args.quiet {
println!("Ballista CLI v{}", BALLISTA_CLI_VERSION);
println!("Ballista CLI v{BALLISTA_CLI_VERSION}");
}

if let Some(ref path) = args.data_path {
Expand Down Expand Up @@ -166,28 +166,28 @@ fn is_valid_file(dir: &str) -> std::result::Result<(), String> {
if Path::new(dir).is_file() {
Ok(())
} else {
Err(format!("Invalid file '{}'", dir))
Err(format!("Invalid file '{dir}'"))
}
}

fn is_valid_data_dir(dir: &str) -> std::result::Result<(), String> {
if Path::new(dir).is_dir() {
Ok(())
} else {
Err(format!("Invalid data directory '{}'", dir))
Err(format!("Invalid data directory '{dir}'"))
}
}

fn is_valid_batch_size(size: &str) -> std::result::Result<(), String> {
match size.parse::<usize>() {
Ok(size) if size > 0 => Ok(()),
_ => Err(format!("Invalid batch size '{}'", size)),
_ => Err(format!("Invalid batch size '{size}'")),
}
}

fn is_valid_concurrent_tasks_size(size: &str) -> std::result::Result<(), String> {
match size.parse::<usize>() {
Ok(size) if size > 0 => Ok(()),
_ => Err(format!("Invalid concurrent_tasks size '{}'", size)),
_ => Err(format!("Invalid concurrent_tasks size '{size}'")),
}
}
6 changes: 3 additions & 3 deletions ballista/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ rust-version = "1.63"
ballista-core = { path = "../core", version = "0.10.0" }
ballista-executor = { path = "../executor", version = "0.10.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "0.10.0", optional = true }
datafusion = "15.0.0"
datafusion-proto = "15.0.0"
datafusion = "16.1.0"
datafusion-proto = "16.1.0"
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
sqlparser = "0.27"
sqlparser = "0.30.0"
tempfile = "3"
tokio = "1.0"

Expand Down
60 changes: 33 additions & 27 deletions ballista/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl BallistaContext {
);
let connection = create_grpc_client_connection(scheduler_url.clone())
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
let mut scheduler = SchedulerGrpcClient::new(connection);

let remote_session_id = scheduler
Expand All @@ -111,7 +111,7 @@ impl BallistaContext {
optional_session_id: None,
})
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
.into_inner()
.session_id;

Expand Down Expand Up @@ -139,8 +139,8 @@ impl BallistaContext {
config: &BallistaConfig,
concurrent_tasks: usize,
) -> ballista_core::error::Result<Self> {
use ballista_core::serde::protobuf::PhysicalPlanNode;
use ballista_core::serde::BallistaCodec;
use datafusion_proto::protobuf::PhysicalPlanNode;

log::info!("Running in local mode. Scheduler will be run in-proc");

Expand Down Expand Up @@ -170,7 +170,7 @@ impl BallistaContext {
optional_session_id: None,
})
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
.into_inner()
.session_id;

Expand Down Expand Up @@ -212,7 +212,7 @@ impl BallistaContext {
&self,
path: &str,
options: AvroReadOptions<'_>,
) -> Result<Arc<DataFrame>> {
) -> Result<DataFrame> {
let df = self.context.read_avro(path, options).await?;
Ok(df)
}
Expand All @@ -223,7 +223,7 @@ impl BallistaContext {
&self,
path: &str,
options: ParquetReadOptions<'_>,
) -> Result<Arc<DataFrame>> {
) -> Result<DataFrame> {
let df = self.context.read_parquet(path, options).await?;
Ok(df)
}
Expand All @@ -234,7 +234,7 @@ impl BallistaContext {
&self,
path: &str,
options: CsvReadOptions<'_>,
) -> Result<Arc<DataFrame>> {
) -> Result<DataFrame> {
let df = self.context.read_csv(path, options).await?;
Ok(df)
}
Expand All @@ -260,9 +260,9 @@ impl BallistaContext {
.read_csv(path, options)
.await
.map_err(|e| {
DataFusionError::Context(format!("Can't read CSV: {}", path), Box::new(e))
DataFusionError::Context(format!("Can't read CSV: {path}"), Box::new(e))
})?
.to_logical_plan()?;
.into_optimized_plan()?;
match plan {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source_as_provider(&source)?)
Expand All @@ -277,7 +277,11 @@ impl BallistaContext {
path: &str,
options: ParquetReadOptions<'_>,
) -> Result<()> {
match self.read_parquet(path, options).await?.to_logical_plan()? {
match self
.read_parquet(path, options)
.await?
.into_optimized_plan()?
{
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source_as_provider(&source)?)
}
Expand All @@ -291,7 +295,7 @@ impl BallistaContext {
path: &str,
options: AvroReadOptions<'_>,
) -> Result<()> {
match self.read_avro(path, options).await?.to_logical_plan()? {
match self.read_avro(path, options).await?.into_optimized_plan()? {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source_as_provider(&source)?)
}
Expand Down Expand Up @@ -332,7 +336,7 @@ impl BallistaContext {
///
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
/// might require the schema to be inferred.
pub async fn sql(&self, sql: &str) -> Result<Arc<DataFrame>> {
pub async fn sql(&self, sql: &str) -> Result<DataFrame> {
let mut ctx = self.context.clone();

let is_show = self.is_show_statement(sql).await?;
Expand Down Expand Up @@ -361,7 +365,7 @@ impl BallistaContext {
}
}

let plan = ctx.create_logical_plan(sql)?;
let plan = ctx.state().create_logical_plan(sql).await?;

match plan {
LogicalPlan::CreateExternalTable(CreateExternalTable {
Expand All @@ -375,7 +379,7 @@ impl BallistaContext {
ref if_not_exists,
..
}) => {
let table_exists = ctx.table_exist(name.as_str())?;
let table_exists = ctx.table_exist(name.as_table_reference())?;
let schema: SchemaRef = Arc::new(schema.as_ref().to_owned().into());
let table_partition_cols = table_partition_cols
.iter()
Expand All @@ -397,40 +401,41 @@ impl BallistaContext {
if !schema.fields().is_empty() {
options = options.schema(&schema);
}
self.register_csv(name, location, options).await?;
Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
self.register_csv(
name.as_table_reference().table(),
location,
options,
)
.await?;
Ok(DataFrame::new(ctx.state(), plan))
}
"parquet" => {
self.register_parquet(
name,
name.as_table_reference().table(),
location,
ParquetReadOptions::default()
.table_partition_cols(table_partition_cols),
)
.await?;
Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
Ok(DataFrame::new(ctx.state(), plan))
}
"avro" => {
self.register_avro(
name,
name.as_table_reference().table(),
location,
AvroReadOptions::default()
.table_partition_cols(table_partition_cols),
)
.await?;
Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
Ok(DataFrame::new(ctx.state(), plan))
}
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported file type {:?}.",
file_type
"Unsupported file type {file_type:?}."
))),
},
(true, true) => {
Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
}
(true, true) => Ok(DataFrame::new(ctx.state(), plan)),
(false, true) => Err(DataFusionError::Execution(format!(
"Table '{:?}' already exists",
name
"Table '{name:?}' already exists"
))),
}
}
Expand Down Expand Up @@ -593,6 +598,7 @@ mod tests {
collect_stat: x.collect_stat,
target_partitions: x.target_partitions,
file_sort_order: None,
infinite_source: false,
};

let table_paths = listing_table
Expand Down
8 changes: 4 additions & 4 deletions ballista/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ simd = ["datafusion/simd"]
[dependencies]
ahash = { version = "0.8", default-features = false }

arrow-flight = { version = "28.0.0", features = ["flight-sql-experimental"] }
arrow-flight = { version = "29.0.0", features = ["flight-sql-experimental"] }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = "15.0.0"
datafusion = "16.1.0"
datafusion-objectstore-hdfs = { version = "0.1.1", default-features = false, optional = true }
datafusion-proto = "15.0.0"
datafusion-proto = "16.1.0"
futures = "0.3"
hashbrown = "0.13"

Expand All @@ -68,7 +68,7 @@ prost = "0.11"
prost-types = "0.11"
rand = "0.8"
serde = { version = "1", features = ["derive"] }
sqlparser = "0.27"
sqlparser = "0.30.0"
sys-info = "0.9.0"
tokio = "1.0"
tokio-stream = { version = "0.1", features = ["net"] }
Expand Down
4 changes: 2 additions & 2 deletions ballista/core/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn main() -> Result<(), String> {
println!("cargo:rerun-if-env-changed=FORCE_REBUILD");

let version = rustc_version::version().unwrap();
println!("cargo:rustc-env=RUSTC_VERSION={}", version);
println!("cargo:rustc-env=RUSTC_VERSION={version}");

// TODO: undo when resolved: https://github.com/intellij-rust/intellij-rust/issues/9402
#[cfg(feature = "docsrs")]
Expand All @@ -42,7 +42,7 @@ fn main() -> Result<(), String> {
tonic_build::configure()
.extern_path(".datafusion", "::datafusion_proto::protobuf")
.compile(&["proto/ballista.proto"], &["proto"])
.map_err(|e| format!("protobuf compilation failed: {}", e))?;
.map_err(|e| format!("protobuf compilation failed: {e}"))?;
let generated_source_path = out.join("ballista.protobuf.rs");
let code = std::fs::read_to_string(generated_source_path).unwrap();
let mut file = std::fs::OpenOptions::new()
Expand Down
Loading

0 comments on commit 0b6b28f

Please sign in to comment.