Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,6 @@ message ScanLimit {
message FileScanExecConf {
repeated FileGroup file_groups = 1;
Schema schema = 2;
uint32 batch_size = 3;
repeated uint32 projection = 4;
ScanLimit limit = 5;
Statistics statistics = 6;
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.collect::<Result<Vec<_>, _>>()?,
partition_count as usize,
),
PartitionMethod::RoundRobin(batch_size) => {
Partitioning::RoundRobinBatch(batch_size as usize)
PartitionMethod::RoundRobin(partition_count) => {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batch_size here is wrong, it's partition count

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice find

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh god, good find, 4096 partitions is not going to be fun.

Partitioning::RoundRobinBatch(partition_count as usize)
}
};

Expand Down
10 changes: 5 additions & 5 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ mod roundtrip_tests {
async fn roundtrip_repartition() -> Result<()> {
use datafusion::logical_plan::Partitioning;

let test_batch_sizes = [usize::MIN, usize::MAX, 43256];
let test_partition_counts = [usize::MIN, usize::MAX, 43256];

let test_expr: Vec<Expr> =
vec![col("c1") + col("c2"), Expr::Literal((4.0).into())];
Expand All @@ -92,8 +92,8 @@ mod roundtrip_tests {
.map_err(BallistaError::DataFusionError)?,
);

for batch_size in test_batch_sizes.iter() {
let rr_repartition = Partitioning::RoundRobinBatch(*batch_size);
for partition_count in test_partition_counts.iter() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

let rr_repartition = Partitioning::RoundRobinBatch(*partition_count);

let roundtrip_plan = LogicalPlan::Repartition(Repartition {
input: plan.clone(),
Expand All @@ -102,7 +102,7 @@ mod roundtrip_tests {

roundtrip_test!(roundtrip_plan);

let h_repartition = Partitioning::Hash(test_expr.clone(), *batch_size);
let h_repartition = Partitioning::Hash(test_expr.clone(), *partition_count);

let roundtrip_plan = LogicalPlan::Repartition(Repartition {
input: plan.clone(),
Expand All @@ -111,7 +111,7 @@ mod roundtrip_tests {

roundtrip_test!(roundtrip_plan);

let no_expr_hrepartition = Partitioning::Hash(Vec::new(), *batch_size);
let no_expr_hrepartition = Partitioning::Hash(Vec::new(), *partition_count);

let roundtrip_plan = LogicalPlan::Repartition(Repartition {
input: plan.clone(),
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,8 +826,8 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
partition_count: *partition_count as u64,
})
}
Partitioning::RoundRobinBatch(batch_size) => {
PartitionMethod::RoundRobin(*batch_size as u64)
Partitioning::RoundRobinBatch(partition_count) => {
PartitionMethod::RoundRobin(*partition_count as u64)
}
};

Expand Down
9 changes: 4 additions & 5 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use datafusion::logical_plan::{
use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::file_format::{
AvroExec, CsvExec, ParquetExec, PhysicalPlanConfig,
AvroExec, CsvExec, FileScanConfig, ParquetExec,
};
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::PartitionMode;
Expand Down Expand Up @@ -770,10 +770,10 @@ impl TryInto<Statistics> for &protobuf::Statistics {
}
}

impl TryInto<PhysicalPlanConfig> for &protobuf::FileScanExecConf {
impl TryInto<FileScanConfig> for &protobuf::FileScanExecConf {
type Error = BallistaError;

fn try_into(self) -> Result<PhysicalPlanConfig, Self::Error> {
fn try_into(self) -> Result<FileScanConfig, Self::Error> {
let schema = Arc::new(convert_required!(self.schema)?);
let projection = self
.projection
Expand All @@ -787,7 +787,7 @@ impl TryInto<PhysicalPlanConfig> for &protobuf::FileScanExecConf {
};
let statistics = convert_required!(self.statistics)?;

Ok(PhysicalPlanConfig {
Ok(FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema: schema,
file_groups: self
Expand All @@ -797,7 +797,6 @@ impl TryInto<PhysicalPlanConfig> for &protobuf::FileScanExecConf {
.collect::<Result<Vec<_>, _>>()?,
statistics,
projection,
batch_size: self.batch_size as usize,
limit: self.limit.as_ref().map(|sl| sl.limit as usize),
table_partition_cols: vec![],
})
Expand Down
7 changes: 3 additions & 4 deletions ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use datafusion::physical_plan::{
};
use datafusion::physical_plan::{file_format::AvroExec, filter::FilterExec};
use datafusion::physical_plan::{
file_format::PhysicalPlanConfig, hash_aggregate::AggregateMode,
file_format::FileScanConfig, hash_aggregate::AggregateMode,
};
use datafusion::{
datasource::PartitionedFile, physical_plan::coalesce_batches::CoalesceBatchesExec,
Expand Down Expand Up @@ -677,10 +677,10 @@ impl From<&Statistics> for protobuf::Statistics {
}
}

impl TryFrom<&PhysicalPlanConfig> for protobuf::FileScanExecConf {
impl TryFrom<&FileScanConfig> for protobuf::FileScanExecConf {
type Error = BallistaError;
fn try_from(
conf: &PhysicalPlanConfig,
conf: &FileScanConfig,
) -> Result<protobuf::FileScanExecConf, Self::Error> {
let file_groups = conf
.file_groups
Expand All @@ -700,7 +700,6 @@ impl TryFrom<&PhysicalPlanConfig> for protobuf::FileScanExecConf {
.map(|n| *n as u32)
.collect(),
schema: Some(conf.file_schema.as_ref().into()),
batch_size: conf.batch_size as u32,
table_partition_cols: conf.table_partition_cols.to_vec(),
})
}
Expand Down
10 changes: 3 additions & 7 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,9 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
println!("Loading table '{}' into memory", table);
let start = Instant::now();

let memtable = MemTable::load(
table_provider,
opt.batch_size,
Some(opt.partitions),
runtime.clone(),
)
.await?;
let memtable =
MemTable::load(table_provider, Some(opt.partitions), runtime.clone())
.await?;
println!(
"Loaded table '{}' into memory in {} ms",
table,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/benches/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn sort_preserving_merge_operator(batches: Vec<RecordBatch>, sort: &[&str]) {
None,
)
.unwrap();
let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec), 8192));
let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec)));

let rt = Runtime::new().unwrap();
let rt_env = Arc::new(RuntimeEnv::default());
Expand Down
7 changes: 3 additions & 4 deletions datafusion/benches/sort_limit_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,9 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
ctx.state.lock().unwrap().config.target_partitions = 1;
let runtime = ctx.state.lock().unwrap().runtime_env.clone();

let mem_table =
MemTable::load(Arc::new(csv), 16 * 1024, Some(partitions), runtime)
.await
.unwrap();
let mem_table = MemTable::load(Arc::new(csv), Some(partitions), runtime)
.await
.unwrap();
ctx.register_table("aggregate_test_100", Arc::new(mem_table))
.unwrap();
ctx_holder.lock().unwrap().push(Arc::new(Mutex::new(ctx)))
Expand Down
1 change: 0 additions & 1 deletion datafusion/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ pub trait TableProvider: Sync + Send {
async fn scan(
&self,
projection: &Option<Vec<usize>>,
batch_size: usize,
filters: &[Expr],
// limit can be used to reduce the amount scanned
// from the datasource as a performance optimization.
Expand Down
1 change: 0 additions & 1 deletion datafusion/src/datasource/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ impl TableProvider for EmptyTable {
async fn scan(
&self,
projection: &Option<Vec<usize>>,
_batch_size: usize,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Expand Down
54 changes: 30 additions & 24 deletions datafusion/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::avro_to_arrow::read_avro_schema_from_reader;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::file_format::{AvroExec, PhysicalPlanConfig};
use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

Expand Down Expand Up @@ -61,7 +61,7 @@ impl FileFormat for AvroFormat {

async fn create_physical_plan(
&self,
conf: PhysicalPlanConfig,
conf: FileScanConfig,
_filters: &[Expr],
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = AvroExec::new(conf);
Expand All @@ -81,7 +81,7 @@ mod tests {
};

use super::*;
use crate::execution::runtime_env::RuntimeEnv;
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use arrow::array::{
BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
TimestampMicrosecondArray,
Expand All @@ -90,9 +90,9 @@ mod tests {

#[tokio::test]
async fn read_small_batches() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::new().with_batch_size(2))?);
let projection = None;
let runtime = Arc::new(RuntimeEnv::default());
let exec = get_exec("alltypes_plain.avro", &projection, 2, None).await?;
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
let stream = exec.execute(0, runtime).await?;

let tt_batches = stream
Expand All @@ -111,9 +111,10 @@ mod tests {

#[tokio::test]
async fn read_limit() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
let projection = None;
let exec = get_exec("alltypes_plain.avro", &projection, 1024, Some(1)).await?;
let batches = collect(exec).await?;
let exec = get_exec("alltypes_plain.avro", &projection, Some(1)).await?;
let batches = collect(exec, runtime).await?;
assert_eq!(1, batches.len());
assert_eq!(11, batches[0].num_columns());
assert_eq!(1, batches[0].num_rows());
Expand All @@ -123,8 +124,9 @@ mod tests {

#[tokio::test]
async fn read_alltypes_plain_avro() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
let projection = None;
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;

let x: Vec<String> = exec
.schema()
Expand All @@ -149,7 +151,7 @@ mod tests {
x
);

let batches = collect(exec).await?;
let batches = collect(exec, runtime).await?;
assert_eq!(batches.len(), 1);

let expected = vec![
Expand All @@ -173,10 +175,11 @@ mod tests {

#[tokio::test]
async fn read_bool_alltypes_plain_avro() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
let projection = Some(vec![1]);
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;

let batches = collect(exec).await?;
let batches = collect(exec, runtime).await?;
assert_eq!(batches.len(), 1);
assert_eq!(1, batches[0].num_columns());
assert_eq!(8, batches[0].num_rows());
Expand All @@ -201,10 +204,11 @@ mod tests {

#[tokio::test]
async fn read_i32_alltypes_plain_avro() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
let projection = Some(vec![0]);
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;

let batches = collect(exec).await?;
let batches = collect(exec, runtime).await?;
assert_eq!(batches.len(), 1);
assert_eq!(1, batches[0].num_columns());
assert_eq!(8, batches[0].num_rows());
Expand All @@ -226,10 +230,11 @@ mod tests {

#[tokio::test]
async fn read_i96_alltypes_plain_avro() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
let projection = Some(vec![10]);
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;

let batches = collect(exec).await?;
let batches = collect(exec, runtime).await?;
assert_eq!(batches.len(), 1);
assert_eq!(1, batches[0].num_columns());
assert_eq!(8, batches[0].num_rows());
Expand All @@ -251,10 +256,11 @@ mod tests {

#[tokio::test]
async fn read_f32_alltypes_plain_avro() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
let projection = Some(vec![6]);
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;

let batches = collect(exec).await?;
let batches = collect(exec, runtime).await?;
assert_eq!(batches.len(), 1);
assert_eq!(1, batches[0].num_columns());
assert_eq!(8, batches[0].num_rows());
Expand All @@ -279,10 +285,11 @@ mod tests {

#[tokio::test]
async fn read_f64_alltypes_plain_avro() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
let projection = Some(vec![7]);
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;

let batches = collect(exec).await?;
let batches = collect(exec, runtime).await?;
assert_eq!(batches.len(), 1);
assert_eq!(1, batches[0].num_columns());
assert_eq!(8, batches[0].num_rows());
Expand All @@ -307,10 +314,11 @@ mod tests {

#[tokio::test]
async fn read_binary_alltypes_plain_avro() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
let projection = Some(vec![9]);
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;

let batches = collect(exec).await?;
let batches = collect(exec, runtime).await?;
assert_eq!(batches.len(), 1);
assert_eq!(1, batches[0].num_columns());
assert_eq!(8, batches[0].num_rows());
Expand All @@ -336,7 +344,6 @@ mod tests {
async fn get_exec(
file_name: &str,
projection: &Option<Vec<usize>>,
batch_size: usize,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let testdata = crate::test_util::arrow_test_data();
Expand All @@ -353,13 +360,12 @@ mod tests {
let file_groups = vec![vec![local_unpartitioned_file(filename.to_owned())]];
let exec = format
.create_physical_plan(
PhysicalPlanConfig {
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_schema,
file_groups,
statistics,
projection: projection.clone(),
batch_size,
limit,
table_partition_cols: vec![],
},
Expand Down
Loading