Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions native-engine/blaze-jni-bridge/src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ define_conf!(IntConf, PARTIAL_AGG_SKIPPING_MIN_ROWS);
define_conf!(BooleanConf, PARQUET_ENABLE_PAGE_FILTERING);
define_conf!(BooleanConf, PARQUET_ENABLE_BLOOM_FILTER);
define_conf!(StringConf, SPARK_IO_COMPRESSION_CODEC);
define_conf!(IntConf, SPARK_TASK_CPUS);
define_conf!(StringConf, SPILL_COMPRESSION_CODEC);

pub trait BooleanConf {
Expand Down
3 changes: 1 addition & 2 deletions native-engine/blaze/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ once_cell = "1.20.2"
panic-message = "0.3.0"
paste = "1.0.15"
prost = "0.13.4"
raw-cpuid = "11.2.0"
tokio = "=1.42.0"
tokio = "1.42.0"

[target.'cfg(not(windows))'.dependencies]
jemallocator = { version = "0.5.0", features = ["disable_initial_exec_tls"] }
39 changes: 6 additions & 33 deletions native-engine/blaze/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ use arrow::{
record_batch::RecordBatch,
};
use blaze_jni_bridge::{
is_task_running, jni_bridge::JavaClasses, jni_call, jni_call_static, jni_convert_byte_array,
jni_exception_check, jni_exception_occurred, jni_new_global_ref, jni_new_object,
jni_new_string,
conf::{IntConf, SPARK_TASK_CPUS},
is_task_running,
jni_bridge::JavaClasses,
jni_call, jni_call_static, jni_convert_byte_array, jni_exception_check, jni_exception_occurred,
jni_new_global_ref, jni_new_object, jni_new_string,
};
use blaze_serde::protobuf::TaskDefinition;
use datafusion::{
Expand Down Expand Up @@ -93,42 +95,13 @@ impl NativeExecutionRuntime {
&ExecutionPlanMetricsSet::new(),
);

// determine number of tokio worker threads
// use the real number of available physical cores
let default_parallelism = std::thread::available_parallelism()
.map(|v| v.get())
.unwrap_or(1);

fn cpu_has_htt() -> bool {
#[cfg(any(
all(target_arch = "x86", not(target_env = "sgx"), target_feature = "sse"),
all(target_arch = "x86_64", not(target_env = "sgx"))
))]
{
use raw_cpuid::CpuId;
let has_htt = CpuId::new()
.get_feature_info()
.map(|info| info.has_htt())
.unwrap_or(false);
return has_htt;
}
false
}

let mut num_worker_threads = if cpu_has_htt() {
default_parallelism / 2
} else {
default_parallelism
};
num_worker_threads = num_worker_threads.max(1);

// create tokio runtime
// propagate classloader and task context to spawned children threads
let spark_task_context = jni_call_static!(JniBridge.getTaskContext() -> JObject)?;
let spark_task_context_global = jni_new_global_ref!(spark_task_context.as_obj())?;
let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name(format!("blaze-native-stage-{stage_id}-part-{partition_id}"))
.worker_threads(num_worker_threads)
.worker_threads(SPARK_TASK_CPUS.value().unwrap_or(1) as usize)
.on_thread_start(move || {
let classloader = JavaClasses::get().classloader;
let _ = jni_call_static!(
Expand Down
2 changes: 1 addition & 1 deletion native-engine/datafusion-ext-commons/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ paste = "1.0.15"
radsort = "0.1.1"
tempfile = "3"
thrift = "0.17.0"
tokio = "=1.42.0"
tokio = "1.42.0"
unchecked-index = "0.2.2"

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion native-engine/datafusion-ext-plans/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ parking_lot = "0.12.3"
paste = "1.0.15"
smallvec = "2.0.0-alpha.9"
tempfile = "3"
tokio = "=1.42.0"
tokio = "1.42.0"
unchecked-index = "0.2.2"
uuid = "1.11.0"
zstd = "0.13.2"
Expand Down
6 changes: 2 additions & 4 deletions native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,8 @@ impl AggHashMap {
}

pub fn upsert_records(&mut self, keys: Vec<impl AggHashMapKey>) -> Vec<u32> {
tokio::task::block_in_place(|| {
self.map.reserve(keys.len());
self.map.upsert_many(keys)
})
self.map.reserve(keys.len());
self.map.upsert_many(keys)
}

pub fn take_keys(&mut self) -> Vec<OwnedKey> {
Expand Down
154 changes: 75 additions & 79 deletions native-engine/datafusion-ext-plans/src/ipc_reader_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ use std::{
fmt::{Debug, Formatter},
fs::File,
io::{BufReader, Cursor, Read, Seek, SeekFrom},
sync::{mpsc::Receiver, Arc},
sync::{
atomic::{AtomicUsize, Ordering::SeqCst},
Arc,
},
};

use arrow::{
array::{ArrayRef, RecordBatch, RecordBatchOptions},
array::{Array, ArrayRef, RecordBatch, RecordBatchOptions},
datatypes::SchemaRef,
};
use async_trait::async_trait;
use blaze_jni_bridge::{
is_task_running, jni_call, jni_call_static, jni_get_byte_array_region, jni_get_direct_buffer,
jni_get_string, jni_new_direct_byte_buffer, jni_new_global_ref, jni_new_string,
jni_call, jni_call_static, jni_get_byte_array_region, jni_get_direct_buffer, jni_get_string,
jni_new_direct_byte_buffer, jni_new_global_ref, jni_new_string,
};
use datafusion::{
error::{DataFusionError, Result},
Expand All @@ -46,12 +49,9 @@ use datafusion_ext_commons::{
};
use jni::objects::{GlobalRef, JObject};
use once_cell::sync::OnceCell;
use tokio::task::JoinHandle;
use parking_lot::Mutex;

use crate::common::{
execution_context::ExecutionContext, ipc_compression::IpcCompressionReader,
timer_helper::TimerHelper,
};
use crate::common::{execution_context::ExecutionContext, ipc_compression::IpcCompressionReader};

#[derive(Debug, Clone)]
pub struct IpcReaderExec {
Expand Down Expand Up @@ -141,25 +141,8 @@ impl ExecutionPlan for IpcReaderExec {
let blocks_local = jni_call!(ScalaFunction0(blocks_provider.as_obj()).apply() -> JObject)?;
assert!(!blocks_local.as_obj().is_null());

// spawn a blocking thread for reading ipcs and providing batches
let blocks = jni_new_global_ref!(blocks_local.as_obj())?;
let (rx, handle) = read_ipc_into_channel(blocks, exec_ctx.clone());
let output = exec_ctx.output_with_sender("IpcReader", move |sender| async move {
loop {
match rx.recv() {
Ok(batch) => {
sender.send(batch).await;
}
Err(_disconnected) => {
drop(rx);
handle.await.expect("tokio error")?;
break;
}
}
}
Ok(())
});
Ok(output)
read_ipc(blocks, exec_ctx)
}

fn metrics(&self) -> Option<MetricsSet> {
Expand All @@ -171,97 +154,110 @@ impl ExecutionPlan for IpcReaderExec {
}
}

fn read_ipc_into_channel(
fn read_ipc(
blocks: GlobalRef,
exec_ctx: Arc<ExecutionContext>,
) -> (Receiver<RecordBatch>, JoinHandle<Result<()>>) {
let (tx, rx) = std::sync::mpsc::sync_channel(1);
let handle = tokio::task::spawn_blocking(move || {
let elapsed_compute = exec_ctx.baseline_metrics().elapsed_compute().clone();
let _timer = elapsed_compute.timer();
) -> Result<SendableRecordBatchStream> {
let size_counter = exec_ctx.register_counter_metric("size");

Ok(exec_ctx.clone().output_with_sender("IpcReader", move |sender| async move {
sender.exclude_time(exec_ctx.baseline_metrics().elapsed_compute());
log::info!("start ipc reading");

let size_counter = exec_ctx.register_counter_metric("size");
let _timer = exec_ctx.baseline_metrics().elapsed_compute().timer();
let batch_size = batch_size();
let output_batch_mem_size = suggested_output_batch_mem_size();
let mut staging_cols: Vec<Vec<ArrayRef>> = vec![];
let mut staging_num_rows = 0;
let mut staging_mem_size = 0;
let staging_cols: Arc<Mutex<Vec<Vec<ArrayRef>>>> = Arc::new(Mutex::new(vec![]));
let staging_num_rows = AtomicUsize::new(0);
let staging_mem_size = AtomicUsize::new(0);

while is_task_running() {
loop {
// get next block
let blocks = blocks.clone();
if !jni_call!(ScalaIterator(blocks.as_obj()).hasNext() -> bool)? {
break;
}
let next_block = jni_new_global_ref!(
jni_call!(ScalaIterator(blocks.as_obj()).next() -> JObject)?.as_obj()
)?;
let next = tokio::task::spawn_blocking(move || {
if !jni_call!(ScalaIterator(blocks.as_obj()).hasNext() -> bool)? {
return Ok::<_, DataFusionError>(None);
}
let block = jni_new_global_ref!(
jni_call!(ScalaIterator(blocks.as_obj()).next() -> JObject)?.as_obj()
)?;
Ok(Some(block))
})
.await
.or_else(|err| df_execution_err!("{err}"))??;

// get ipc reader
let mut reader = match next_block {
b if jni_call!(BlazeBlockObject(b.as_obj()).hasFileSegment() -> bool)? => {
get_file_reader(b.as_obj())?
let mut reader = Box::pin(match next {
Some(block) if jni_call!(BlazeBlockObject(block.as_obj()).hasFileSegment() -> bool)? => {
get_file_reader(block.as_obj())?
}
b if jni_call!(BlazeBlockObject(b.as_obj()).hasByteBuffer() -> bool)? => {
get_byte_buffer_reader(b.as_obj())?
Some(block) if jni_call!(BlazeBlockObject(block.as_obj()).hasByteBuffer() -> bool)? => {
get_byte_buffer_reader(block.as_obj())?
}
b => get_channel_reader(b.as_obj())?,
};

while let Some((num_rows, cols)) = reader.read_batch(&exec_ctx.output_schema())? {
let mut cols_mem_size = 0;
staging_cols.resize_with(cols.len(), || vec![]);
for (col_idx, col) in cols.into_iter().enumerate() {
cols_mem_size += col.get_array_mem_size();
staging_cols[col_idx].push(col);
}
staging_num_rows += num_rows;
staging_mem_size += cols_mem_size;

if staging_num_rows >= batch_size || staging_mem_size >= output_batch_mem_size {
let coalesced_cols = std::mem::take(&mut staging_cols)
Some(block) => get_channel_reader(block.as_obj())?,
None => break,
});

while let Some((num_rows, cols)) = reader
.as_mut()
.read_batch(&exec_ctx.output_schema())?
{
let (cur_staging_num_rows, cur_staging_mem_size) = {
let staging_cols_cloned = staging_cols.clone();
let mut staging_cols = staging_cols_cloned.lock();
let mut cols_mem_size = 0;
staging_cols.resize_with(cols.len(), || vec![]);
for (col_idx, col) in cols.into_iter().enumerate() {
cols_mem_size += col.get_array_mem_size();
staging_cols[col_idx].push(col);
}
drop(staging_cols);
staging_num_rows.fetch_add(num_rows, SeqCst);
staging_mem_size.fetch_add(cols_mem_size, SeqCst);
(staging_num_rows.load(SeqCst), staging_mem_size.load(SeqCst))
};

if cur_staging_num_rows >= batch_size
|| cur_staging_mem_size >= suggested_output_batch_mem_size()
{
let coalesced_cols = std::mem::take(&mut *staging_cols.clone().lock())
.into_iter()
.map(|cols| coalesce_arrays_unchecked(cols[0].data_type(), &cols))
.collect::<Vec<_>>();
let batch = RecordBatch::try_new_with_options(
exec_ctx.output_schema(),
coalesced_cols,
&RecordBatchOptions::new().with_row_count(Some(staging_num_rows)),
&RecordBatchOptions::new().with_row_count(Some(cur_staging_num_rows))
)?;
staging_num_rows = 0;
staging_mem_size = 0;
staging_num_rows.store(0, SeqCst);
staging_mem_size.store(0, SeqCst);
size_counter.add(batch.get_array_mem_size());
exec_ctx.baseline_metrics().record_output(batch.num_rows());
if !elapsed_compute.exclude_timer(|| tx.send(batch)).is_ok() {
break;
}
sender.send(batch).await;
}
}
}

if staging_num_rows > 0 {
let coalesced_cols = staging_cols
let cur_staging_num_rows = staging_num_rows.load(SeqCst);
if cur_staging_num_rows > 0 {
let coalesced_cols = std::mem::take(&mut *staging_cols.clone().lock())
.into_iter()
.map(|cols| coalesce_arrays_unchecked(cols[0].data_type(), &cols))
.collect::<Vec<_>>();
let batch = RecordBatch::try_new_with_options(
exec_ctx.output_schema(),
coalesced_cols,
&RecordBatchOptions::new().with_row_count(Some(staging_num_rows)),
&RecordBatchOptions::new().with_row_count(Some(cur_staging_num_rows))
)?;
size_counter.add(batch.get_array_mem_size());
exec_ctx.baseline_metrics().record_output(batch.num_rows());
let _ = elapsed_compute.exclude_timer(|| tx.send(batch));
sender.send(batch).await;
}
Ok::<_, DataFusionError>(())
});
(rx, handle)
Ok(())
}))
}

fn get_channel_reader(block: JObject) -> Result<IpcCompressionReader<Box<dyn Read + Send>>> {
let channel_reader = ReadableByteChannelReader::try_new(block)?;

Ok(IpcCompressionReader::new(Box::new(
BufReader::with_capacity(65536, channel_reader),
)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ impl JoinHashMap {
}

pub fn lookup_many(&self, hashes: Vec<u32>) -> Vec<MapValue> {
tokio::task::block_in_place(|| self.table.lookup_many(hashes))
self.table.lookup_many(hashes)
}

pub fn get_range(&self, map_value: MapValue) -> &[u32] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,16 @@ impl MemConsumer for SortShuffleRepartitioner {

async fn spill(&self) -> Result<()> {
let data = self.data.lock().await.drain();
let mut spill = try_new_spill(self.exec_ctx.spill_metrics())?;
let spill_metrics = self.exec_ctx.spill_metrics().clone();
let spill = tokio::task::spawn_blocking(move || {
let mut spill = try_new_spill(&spill_metrics)?;
let offsets = data.write(spill.get_buf_writer())?;
Ok::<_, DataFusionError>(ShuffleSpill { spill, offsets })
})
.await
.expect("tokio spawn_blocking error")?;

let offsets = data.write(spill.get_buf_writer())?;
self.spills
.lock()
.await
.push(ShuffleSpill { spill, offsets });
self.spills.lock().await.push(spill);
self.update_mem_used(0).await?;
Ok(())
}
Expand Down
Loading