Skip to content

Commit 478f606

Browse files
authored
Make DataFusion Core compile (#3)
* wip * more * Make scalar.rs compile * Fix various compilation error due to API difference * Make datafusion core compile * fmt * wip
1 parent 428efa8 commit 478f606

File tree

23 files changed

+467
-166
lines changed

23 files changed

+467
-166
lines changed

ballista/rust/core/src/execution_plans/shuffle_writer.rs

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,11 @@ use crate::utils;
3434
use crate::serde::protobuf::ShuffleWritePartition;
3535
use crate::serde::scheduler::{PartitionLocation, PartitionStats};
3636
use async_trait::async_trait;
37-
use datafusion::arrow::array::{
38-
Array, ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder,
39-
UInt64Builder,
40-
};
37+
use datafusion::arrow::array::*;
4138
use datafusion::arrow::compute::take;
4239
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
43-
use datafusion::arrow::ipc::reader::FileReader;
44-
use datafusion::arrow::ipc::writer::FileWriter;
40+
use datafusion::arrow::io::ipc::read::FileReader;
41+
use datafusion::arrow::io::ipc::write::FileWriter;
4542
use datafusion::arrow::record_batch::RecordBatch;
4643
use datafusion::error::{DataFusionError, Result};
4744
use datafusion::physical_plan::hash_utils::create_hashes;
@@ -244,7 +241,7 @@ impl ShuffleWriterExec {
244241
.collect::<Result<Vec<Arc<dyn Array>>>>()?;
245242

246243
let output_batch =
247-
RecordBatch::try_new(input_batch.schema(), columns)?;
244+
RecordBatch::try_new(input_batch.schema().clone(), columns)?;
248245

249246
// write non-empty batch out
250247

@@ -356,18 +353,18 @@ impl ExecutionPlan for ShuffleWriterExec {
356353

357354
// build metadata result batch
358355
let num_writers = part_loc.len();
359-
let mut partition_builder = UInt32Builder::new(num_writers);
360-
let mut path_builder = StringBuilder::new(num_writers);
361-
let mut num_rows_builder = UInt64Builder::new(num_writers);
362-
let mut num_batches_builder = UInt64Builder::new(num_writers);
363-
let mut num_bytes_builder = UInt64Builder::new(num_writers);
356+
let mut partition_builder = UInt32Vec::with_capacity(num_writers);
357+
let mut path_builder = MutableUtf8Array::with_capacity(num_writers);
358+
let mut num_rows_builder = UInt64Vec::with_capacity(num_writers);
359+
let mut num_batches_builder = UInt64Vec::with_capacity(num_writers);
360+
let mut num_bytes_builder = UInt64Vec::with_capacity(num_writers);
364361

365362
for loc in &part_loc {
366-
path_builder.append_value(loc.path.clone())?;
367-
partition_builder.append_value(loc.partition_id as u32)?;
368-
num_rows_builder.append_value(loc.num_rows)?;
369-
num_batches_builder.append_value(loc.num_batches)?;
370-
num_bytes_builder.append_value(loc.num_bytes)?;
363+
path_builder.push(Some(loc.path.clone()));
364+
partition_builder.push(Some(loc.partition_id as u32));
365+
num_rows_builder.push(Some(loc.num_rows));
366+
num_batches_builder.push(Some(loc.num_batches));
367+
num_bytes_builder.push(Some(loc.num_bytes));
371368
}
372369

373370
// build arrays
@@ -428,17 +425,17 @@ fn result_schema() -> SchemaRef {
428425
]))
429426
}
430427

431-
struct ShuffleWriter {
428+
struct ShuffleWriter<'a> {
432429
path: String,
433-
writer: FileWriter<File>,
430+
writer: FileWriter<'a, File>,
434431
num_batches: u64,
435432
num_rows: u64,
436433
num_bytes: u64,
437434
}
438435

439-
impl ShuffleWriter {
436+
impl<'a> ShuffleWriter<'a> {
440437
fn new(path: &str, schema: &Schema) -> Result<Self> {
441-
let file = File::create(path)
438+
let mut file = File::create(path)
442439
.map_err(|e| {
443440
BallistaError::General(format!(
444441
"Failed to create partition file at {}: {:?}",
@@ -451,7 +448,7 @@ impl ShuffleWriter {
451448
num_rows: 0,
452449
num_bytes: 0,
453450
path: path.to_owned(),
454-
writer: FileWriter::try_new(file, schema)?,
451+
writer: FileWriter::try_new(&mut file, schema)?,
455452
})
456453
}
457454

@@ -480,7 +477,7 @@ impl ShuffleWriter {
480477
#[cfg(test)]
481478
mod tests {
482479
use super::*;
483-
use datafusion::arrow::array::{StringArray, StructArray, UInt32Array, UInt64Array};
480+
use datafusion::arrow::array::{Utf8Array, StructArray, UInt32Array, UInt64Array};
484481
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
485482
use datafusion::physical_plan::expressions::Column;
486483
use datafusion::physical_plan::limit::GlobalLimitExec;

ballista/rust/core/src/serde/physical_plan/from_proto.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ use datafusion::physical_plan::{
6161
expressions::{
6262
col, Avg, BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr,
6363
IsNullExpr, Literal, NegativeExpr, NotExpr, PhysicalSortExpr, TryCastExpr,
64-
DEFAULT_DATAFUSION_CAST_OPTIONS,
6564
},
6665
filter::FilterExec,
6766
functions::{self, BuiltinScalarFunction, ScalarFunctionExpr},
@@ -620,7 +619,6 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
620619
ExprType::Cast(e) => Arc::new(CastExpr::new(
621620
convert_box_required!(e.expr)?,
622621
convert_required!(e.arrow_type)?,
623-
DEFAULT_DATAFUSION_CAST_OPTIONS,
624622
)),
625623
ExprType::TryCast(e) => Arc::new(TryCastExpr::new(
626624
convert_box_required!(e.expr)?,

ballista/rust/core/src/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::serde::scheduler::PartitionStats;
3131

3232
use crate::config::BallistaConfig;
3333
use datafusion::arrow::datatypes::Schema;
34+
use datafusion::arrow::datatypes::SchemaRef;
3435
use datafusion::arrow::error::Result as ArrowResult;
3536
use datafusion::arrow::{
3637
array::*,

datafusion/benches/physical_plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ fn sort_preserving_merge_operator(batches: Vec<RecordBatch>, sort: &[&str]) {
5151

5252
let exec = MemoryExec::try_new(
5353
&batches.into_iter().map(|rb| vec![rb]).collect::<Vec<_>>(),
54-
schema,
54+
schema.clone(),
5555
None,
5656
)
5757
.unwrap();

0 commit comments

Comments
 (0)