Skip to content

Commit

Permalink
[apache#1213] feat(rust): Support block filter by taskId when getting…
Browse files Browse the repository at this point in the history
… memory data (apache#1311)

### What changes were proposed in this pull request?

Support block filter by taskId when getting memory data

### Why are the changes needed?

In AQE, after executing the sub-QueryStages, collect the shuffle data size, So if we can filter block, 
it will improve the performance of AQE.

Fix: apache#1213

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

1. UTs

---------

Co-authored-by: 蒋文龙 <jiangwenlong@192.168.1.7>
Co-authored-by: 蒋文龙 <jiangwenlong@192.168.1.6>
  • Loading branch information
3 people authored and zuston committed Dec 1, 2023
1 parent e9741c8 commit 9853f5a
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 2 deletions.
2 changes: 2 additions & 0 deletions rust/experimental/server/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ pub struct WritingViewContext {
pub struct ReadingViewContext {
pub uid: PartitionedUId,
pub reading_options: ReadingOptions,
pub serialized_expected_task_ids_bitmap: Option<Treemap>,
}

pub struct ReadingIndexViewContext {
Expand Down Expand Up @@ -630,6 +631,7 @@ mod test {
let reading_ctx = ReadingViewContext {
uid: Default::default(),
reading_options: ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1000000),
serialized_expected_task_ids_bitmap: Default::default(),
};

// case2: get
Expand Down
18 changes: 18 additions & 0 deletions rust/experimental/server/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use crate::proto::uniffle::{
use crate::store::{PartitionedData, ResponseDataIndex};
use await_tree::InstrumentAwait;
use bytes::{BufMut, BytesMut};
use croaring::treemap::JvmSerializer;
use croaring::Treemap;
use std::collections::HashMap;

use log::{debug, error, info, warn};
Expand Down Expand Up @@ -284,6 +286,7 @@ impl ShuffleServer for DefaultShuffleServer {
.select(ReadingViewContext {
uid: partition_id.clone(),
reading_options: ReadingOptions::FILE_OFFSET_AND_LEN(req.offset, req.length as i64),
serialized_expected_task_ids_bitmap: Default::default(),
})
.instrument_await(format!(
"select data from localfile. uid: {:?}",
Expand Down Expand Up @@ -341,6 +344,20 @@ impl ShuffleServer for DefaultShuffleServer {
shuffle_id,
partition_id,
};

let serialized_expected_task_ids_bitmap =
if !req.serialized_expected_task_ids_bitmap.is_empty() {
match Treemap::deserialize(&req.serialized_expected_task_ids_bitmap) {
Ok(filter) => Some(filter),
Err(e) => {
error!("Failed to deserialize: {}", e);
None
}
}
} else {
None
};

let data_fetched_result = app
.unwrap()
.select(ReadingViewContext {
Expand All @@ -349,6 +366,7 @@ impl ShuffleServer for DefaultShuffleServer {
req.last_block_id,
req.read_buffer_size as i64,
),
serialized_expected_task_ids_bitmap,
})
.instrument_await(format!("select data from memory. uid: {:?}", &partition_id))
.await;
Expand Down
4 changes: 4 additions & 0 deletions rust/experimental/server/src/store/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@ mod tests {
let response_data = runtime.wait(store.get(ReadingViewContext {
uid: uid.clone(),
reading_options: MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1024 * 1024 * 1024),
serialized_expected_task_ids_bitmap: Default::default(),
}))?;

let mut accepted_block_ids = vec![];
Expand Down Expand Up @@ -739,6 +740,7 @@ mod tests {
last_block_id,
data_len as i64,
),
serialized_expected_task_ids_bitmap: Default::default(),
};

let read_data = store.get(reading_view_ctx).await;
Expand Down Expand Up @@ -773,6 +775,7 @@ mod tests {
let reading_view_ctx = ReadingViewContext {
uid: uid.clone(),
reading_options: ReadingOptions::FILE_OFFSET_AND_LEN(offset, length as i64),
serialized_expected_task_ids_bitmap: Default::default(),
};
let read_data = store.get(reading_view_ctx).await.unwrap();
match read_data {
Expand Down Expand Up @@ -829,6 +832,7 @@ mod tests {
last_block_id,
data_len as i64,
),
serialized_expected_task_ids_bitmap: Default::default(),
};

let read_data = runtime.wait(store.get(reading_view_ctx));
Expand Down
1 change: 1 addition & 0 deletions rust/experimental/server/src/store/localfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,7 @@ mod test {
let reading_ctx = ReadingViewContext {
uid,
reading_options: ReadingOptions::FILE_OFFSET_AND_LEN(0, size as i64),
serialized_expected_task_ids_bitmap: Default::default(),
};

let read_result = local_store.get(reading_ctx).await;
Expand Down
87 changes: 85 additions & 2 deletions rust/experimental/server/src/store/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use std::str::FromStr;

use crate::store::mem::InstrumentAwait;
use crate::store::mem::MemoryBufferTicket;
use croaring::Treemap;
use log::error;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -209,15 +210,21 @@ impl MemoryStore {
buffer.clone()
}

pub(crate) fn read_partial_data_with_max_size_limit<'a>(
pub(crate) fn read_partial_data_with_max_size_limit_and_filter<'a>(
&'a self,
blocks: Vec<&'a PartitionedDataBlock>,
fetched_size_limit: i64,
serialized_expected_task_ids_bitmap: Option<Treemap>,
) -> (Vec<&PartitionedDataBlock>, i64) {
let mut fetched = vec![];
let mut fetched_size = 0;

for block in blocks {
if let Some(ref filter) = serialized_expected_task_ids_bitmap {
if !filter.contains(block.task_attempt_id as u64) {
continue;
}
}
if fetched_size >= fetched_size_limit {
break;
}
Expand Down Expand Up @@ -416,7 +423,11 @@ impl Store for MemoryStore {
}
}

self.read_partial_data_with_max_size_limit(candidate_blocks, max_size)
self.read_partial_data_with_max_size_limit_and_filter(
candidate_blocks,
max_size,
ctx.serialized_expected_task_ids_bitmap,
)
}
_ => (vec![], 0),
};
Expand Down Expand Up @@ -728,6 +739,7 @@ mod test {
use crate::config::MemoryStoreConfig;
use crate::runtime::manager::RuntimeManager;
use anyhow::Result;
use croaring::Treemap;

#[test]
fn test_ticket_timeout() -> Result<()> {
Expand Down Expand Up @@ -975,6 +987,7 @@ mod test {
last_block_id,
default_single_read_size,
),
serialized_expected_task_ids_bitmap: Default::default(),
};
if let Ok(data) = store.get(ctx).await {
match data {
Expand Down Expand Up @@ -1066,6 +1079,7 @@ mod test {
let reading_ctx = ReadingViewContext {
uid: uid.clone(),
reading_options: ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1000000),
serialized_expected_task_ids_bitmap: Default::default(),
};
let data = runtime.wait(store.get(reading_ctx.clone())).expect("");
assert_eq!(1, data.from_memory().shuffle_data_block_segments.len());
Expand Down Expand Up @@ -1114,6 +1128,7 @@ mod test {
let reading_ctx = ReadingViewContext {
uid: Default::default(),
reading_options: ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1000000),
serialized_expected_task_ids_bitmap: Default::default(),
};

match runtime.wait(store.get(reading_ctx)).unwrap() {
Expand All @@ -1125,4 +1140,72 @@ mod test {
_ => panic!("should not"),
}
}

#[test]
fn test_block_id_filter_for_memory() {
let store = MemoryStore::new(1024 * 1024 * 1024);
let runtime = store.runtime_manager.clone();

// 1. insert 2 block
let writing_ctx = WritingViewContext {
uid: Default::default(),
data_blocks: vec![
PartitionedDataBlock {
block_id: 0,
length: 10,
uncompress_length: 100,
crc: 99,
data: Default::default(),
task_attempt_id: 0,
},
PartitionedDataBlock {
block_id: 1,
length: 20,
uncompress_length: 200,
crc: 99,
data: Default::default(),
task_attempt_id: 1,
},
],
};
runtime.wait(store.insert(writing_ctx)).unwrap();

// 2. block_ids_filter is empty, should return 2 blocks
let mut reading_ctx = ReadingViewContext {
uid: Default::default(),
reading_options: ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1000000),
serialized_expected_task_ids_bitmap: Default::default(),
};

match runtime.wait(store.get(reading_ctx)).unwrap() {
Mem(data) => {
assert_eq!(data.shuffle_data_block_segments.len(), 2);
}
_ => panic!("should not"),
}

// 3. set serialized_expected_task_ids_bitmap, and set last_block_id equals 1, should return 1 block
let mut bitmap = Treemap::default();
bitmap.add(1);
reading_ctx = ReadingViewContext {
uid: Default::default(),
reading_options: ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(0, 1000000),
serialized_expected_task_ids_bitmap: Option::from(bitmap.clone()),
};

match runtime.wait(store.get(reading_ctx)).unwrap() {
Mem(data) => {
assert_eq!(data.shuffle_data_block_segments.len(), 1);
assert_eq!(data.shuffle_data_block_segments.get(0).unwrap().offset, 0);
assert_eq!(
data.shuffle_data_block_segments
.get(0)
.unwrap()
.uncompress_length,
200
);
}
_ => panic!("should not"),
}
}
}

0 comments on commit 9853f5a

Please sign in to comment.