Skip to content

Commit

Permalink
Use node-level local limit (#20)
Browse files Browse the repository at this point in the history
* Use node-level local limit

* serialize limit in shuffle writer

* Revert "Merge pull request #19 from coralogix/sc-5792"

This reverts commit 08140ef, reversing
changes made to a7f1384.

* add log

* make sure we don't forget limit for shuffle writer

* update accum correctly and try to break early

* Check local limit accumulator before polling for more data

* fix build

Co-authored-by: Martins Purins <martins.purins@coralogix.com>
  • Loading branch information
thinkharderdev and mpurins-coralogix committed Nov 28, 2022
1 parent a802315 commit ff96bcd
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 13 deletions.
2 changes: 2 additions & 0 deletions ballista/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ datafusion-proto = "14.0.0"
futures = "0.3"
hashbrown = "0.13"

lazy_static = "1.4.0"
itertools = "0.10"
libloading = "0.7.3"
log = "0.4"
lru = "0.8.1"
object_store = "0.5.0"
once_cell = "1.9.0"

Expand Down
3 changes: 3 additions & 0 deletions ballista/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ message ShuffleWriterExecNode {
uint32 stage_id = 2;
PhysicalPlanNode input = 3;
PhysicalHashRepartition output_partitioning = 4;
oneof optional_limit {
uint64 limit = 6;
}
}

message ShuffleReaderExecNode {
Expand Down
84 changes: 81 additions & 3 deletions ballista/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,30 @@ use datafusion::arrow::error::ArrowError;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::repartition::BatchPartitioner;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use lazy_static::lazy_static;
use log::{debug, info};
use lru::LruCache;
use parking_lot::Mutex;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};

lazy_static! {
static ref LIMIT_ACCUMULATORS: Mutex<LruCache<(String, usize), Arc<AtomicUsize>>> =
Mutex::new(LruCache::new(NonZeroUsize::new(40).unwrap()));
}

fn get_limit_accumulator(job_id: &str, stage: usize) -> Arc<AtomicUsize> {
let mut guard = LIMIT_ACCUMULATORS.lock();

if let Some(accumulator) = guard.get(&(job_id.to_owned(), stage)) {
accumulator.clone()
} else {
let accumulator = Arc::new(AtomicUsize::new(0));
guard.push((job_id.to_owned(), stage), accumulator.clone());

accumulator
}
}

/// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
/// can be executed as one unit with each partition being executed in parallel. The output of each
Expand All @@ -75,6 +98,8 @@ pub struct ShuffleWriterExec {
shuffle_output_partitioning: Option<Partitioning>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Maximum number of rows to return
limit: Option<usize>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -121,6 +146,26 @@ impl ShuffleWriterExec {
work_dir,
shuffle_output_partitioning,
metrics: ExecutionPlanMetricsSet::new(),
limit: None,
})
}

pub fn try_new_with_limit(
job_id: String,
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
work_dir: String,
shuffle_output_partitioning: Option<Partitioning>,
limit: Option<usize>,
) -> Result<Self> {
Ok(Self {
job_id,
stage_id,
plan,
work_dir,
shuffle_output_partitioning,
metrics: ExecutionPlanMetricsSet::new(),
limit,
})
}

Expand All @@ -139,6 +184,10 @@ impl ShuffleWriterExec {
self.shuffle_output_partitioning.as_ref()
}

pub fn limit(&self) -> Option<usize> {
self.limit
}

pub fn execute_shuffle_write(
&self,
input_partition: usize,
Expand All @@ -152,6 +201,10 @@ impl ShuffleWriterExec {
let output_partitioning = self.shuffle_output_partitioning.clone();
let plan = self.plan.clone();

let limit_and_accumulator = self
.limit
.map(|l| (l, get_limit_accumulator(&self.job_id, self.stage_id)));

async move {
let now = Instant::now();
let mut stream = plan.execute(input_partition, context)?;
Expand All @@ -170,6 +223,7 @@ impl ShuffleWriterExec {
&mut stream,
path,
&write_metrics.write_time,
limit_and_accumulator,
)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
Expand Down Expand Up @@ -211,10 +265,26 @@ impl ShuffleWriterExec {
write_metrics.repart_time.clone(),
)?;

while let Some(result) = stream.next().await {
while let Some(result) = {
let poll_more = limit_and_accumulator.as_ref().map_or(
true,
|(limit, accum)| {
let total_rows = accum.load(Ordering::SeqCst);
total_rows < *limit
},
);

if poll_more {
stream.next().await
} else {
None
}
} {
let input_batch = result?;

write_metrics.input_rows.add(input_batch.num_rows());
let num_rows = input_batch.num_rows();

write_metrics.input_rows.add(num_rows);

partitioner.partition(
input_batch,
Expand Down Expand Up @@ -253,6 +323,13 @@ impl ShuffleWriterExec {
Ok(())
},
)?;

if let Some((limit, accum)) = limit_and_accumulator.as_ref() {
let total_rows = accum.fetch_add(num_rows, Ordering::SeqCst);
if total_rows > *limit {
break;
}
}
}

let mut part_locs = vec![];
Expand Down Expand Up @@ -325,12 +402,13 @@ impl ExecutionPlan for ShuffleWriterExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(ShuffleWriterExec::try_new(
Ok(Arc::new(ShuffleWriterExec::try_new_with_limit(
self.job_id.clone(),
self.stage_id,
children[0].clone(),
self.work_dir.clone(),
self.shuffle_output_partitioning.clone(),
self.limit,
)?))
}

Expand Down
15 changes: 14 additions & 1 deletion ballista/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,12 +579,20 @@ impl AsExecutionPlan for PhysicalPlanNode {
input.schema().as_ref(),
)?;

Ok(Arc::new(ShuffleWriterExec::try_new(
let limit = shuffle_writer.optional_limit.as_ref();

Ok(Arc::new(ShuffleWriterExec::try_new_with_limit(
shuffle_writer.job_id.clone(),
shuffle_writer.stage_id as usize,
input,
"".to_string(), // this is intentional but hacky - the executor will fill this in
output_partitioning,
limit.map(|limit| {
let protobuf::shuffle_writer_exec_node::OptionalLimit::Limit(
limit,
) = limit;
*limit as usize
}),
)?))
}
PhysicalPlanType::ShuffleReader(shuffle_reader) => {
Expand Down Expand Up @@ -1153,6 +1161,11 @@ impl AsExecutionPlan for PhysicalPlanNode {
stage_id: exec.stage_id() as u32,
input: Some(Box::new(input)),
output_partitioning,
optional_limit: exec.limit().map(|limit| {
protobuf::shuffle_writer_exec_node::OptionalLimit::Limit(
limit as u64,
)
}),
},
))),
})
Expand Down
23 changes: 22 additions & 1 deletion ballista/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use datafusion_proto::logical_plan::{
};
use futures::StreamExt;
use log::error;
use log::info;
#[cfg(feature = "s3")]
use object_store::aws::AmazonS3Builder;
use object_store::ObjectStore;
Expand Down Expand Up @@ -121,6 +122,7 @@ pub async fn write_stream_to_disk(
stream: &mut Pin<Box<dyn RecordBatchStream + Send>>,
path: &str,
disk_write_metric: &metrics::Time,
limit: Option<(usize, Arc<AtomicUsize>)>,
) -> Result<PartitionStats> {
let file = File::create(path).map_err(|e| {
error!("Failed to create partition file at {}: {:?}", path, e);
Expand All @@ -132,7 +134,18 @@ pub async fn write_stream_to_disk(
let mut num_bytes = 0;
let mut writer = FileWriter::try_new(file, stream.schema().as_ref())?;

while let Some(result) = stream.next().await {
while let Some(result) = {
let poll_more = limit.as_ref().map_or(true, |(limit, accum)| {
let total_rows = accum.load(Ordering::SeqCst);
total_rows < *limit
});

if poll_more {
stream.next().await
} else {
None
}
} {
let batch = result?;

let batch_size_bytes: usize = batch_byte_size(&batch);
Expand All @@ -143,6 +156,14 @@ pub async fn write_stream_to_disk(
let timer = disk_write_metric.timer();
writer.write(&batch)?;
timer.done();

if let Some((limit, accum)) = limit.as_ref() {
let total_rows = accum.fetch_add(batch.num_rows(), Ordering::SeqCst);
if total_rows >= *limit {
info!("stopping shuffle write early (path: {})", path);
break;
}
}
}
let timer = disk_write_metric.timer();
writer.finish()?;
Expand Down
3 changes: 2 additions & 1 deletion ballista/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,13 @@ impl Executor {
plan.as_any().downcast_ref::<ShuffleWriterExec>()
{
// recreate the shuffle writer with the correct working directory
ShuffleWriterExec::try_new(
ShuffleWriterExec::try_new_with_limit(
job_id,
stage_id,
plan.children()[0].clone(),
self.work_dir.clone(),
shuffle_writer.shuffle_output_partitioning().cloned(),
shuffle_writer.limit(),
)
} else {
Err(DataFusionError::Internal(
Expand Down
28 changes: 21 additions & 7 deletions ballista/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use datafusion::physical_plan::{
with_new_children_if_necessary, ExecutionPlan, Partitioning,
};

use datafusion::physical_plan::limit::LocalLimitExec;
use log::{debug, info};

type PartialQueryStageResult = (Arc<dyn ExecutionPlan>, Vec<Arc<ShuffleWriterExec>>);
Expand Down Expand Up @@ -308,13 +309,26 @@ fn create_shuffle_writer(
plan: Arc<dyn ExecutionPlan>,
partitioning: Option<Partitioning>,
) -> Result<Arc<ShuffleWriterExec>> {
Ok(Arc::new(ShuffleWriterExec::try_new(
job_id.to_owned(),
stage_id,
plan,
"".to_owned(), // executor will decide on the work_dir path
partitioning,
)?))
if let Some(local_limit_exec) = plan.as_any().downcast_ref::<LocalLimitExec>() {
// This doesn't really capture all cases where we would want to do this but should work for the
// most basic case we care about.
Ok(Arc::new(ShuffleWriterExec::try_new_with_limit(
job_id.to_owned(),
stage_id,
plan.clone(),
"".to_owned(), // executor will decide on the work_dir path
partitioning,
Some(local_limit_exec.fetch()),
)?))
} else {
Ok(Arc::new(ShuffleWriterExec::try_new(
job_id.to_owned(),
stage_id,
plan,
"".to_owned(), // executor will decide on the work_dir path
partitioning,
)?))
}
}

#[cfg(test)]
Expand Down

0 comments on commit ff96bcd

Please sign in to comment.