Skip to content

Commit

Permalink
fix(memory): release unused size when purging app (#79)
Browse files Browse the repository at this point in the history
In addition to the change described in PR title, this also introduces 
the `benchmark_client` to test the correctness for the server, 
which is used to check whether the data is purged when the app is timeout.
  • Loading branch information
zuston authored Aug 26, 2023
1 parent 8b7fda6 commit b70b67f
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 176 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ edition = "2021"
name="uniffle-worker"
path= "src/main.rs"

[[bin]]
name="benchmark"
path= "src/benchmark_client.rs"

[lib]
name = "uniffle_worker"
path = "src/lib.rs"
Expand Down
43 changes: 28 additions & 15 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ pub struct WritingViewContext {
pub data_blocks: Vec<PartitionedDataBlock>,
}

#[derive(Debug, Clone)]
pub struct ReadingViewContext {
pub uid: PartitionedUId,
pub reading_options: ReadingOptions,
Expand All @@ -321,6 +322,7 @@ impl RequireBufferContext {
}
}

#[derive(Debug, Clone)]
pub enum ReadingOptions {
#[allow(non_camel_case_types)]
MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(i64, i64),
Expand Down Expand Up @@ -414,14 +416,14 @@ impl AppManager {
let _ = match event {
PurgeEvent::HEART_BEAT_TIMEOUT(app_id) => {
info!(
"The app:{} data of heartbeat timeout will be purged.",
"The app:[{}]'s data will be purged due to heartbeat timeout",
&app_id
);
app_manager_cloned.purge_app_data(app_id).await
}
PurgeEvent::APP_PURGE(app_id) => {
info!(
"The app:{} has been finished, its data will be purged.",
"The app:[{}] has been finished, its data will be purged.",
&app_id
);
app_manager_cloned.purge_app_data(app_id).await
Expand Down Expand Up @@ -558,7 +560,7 @@ mod test {
};
use crate::config::{Config, HybridStoreConfig, LocalfileStoreConfig, MemoryStoreConfig};

use crate::store::PartitionedDataBlock;
use crate::store::{PartitionedDataBlock, ResponseData};
use croaring::treemap::JvmSerializer;
use croaring::Treemap;

Expand Down Expand Up @@ -614,27 +616,38 @@ mod test {
},
],
};

// case1: put
let result = app.insert(writing_ctx);
if result.await.is_err() {
panic!()
}

let _reading_ctx = ReadingViewContext {
let reading_ctx = ReadingViewContext {
uid: Default::default(),
reading_options: ReadingOptions::MEMORY_LAST_BLOCK_ID_AND_MAX_SIZE(-1, 1000000),
};

// let result = app.select(readingCtx);
// if result.await.is_err() {
// panic!()
// }
//
// match result.await.unwrap() {
// ResponseData::mem(data) => {
// assert_eq!(2, data.shuffle_data_block_segments.len());
// },
// _ => todo!()
// }
// case2: get
let result = app.select(reading_ctx).await;
if result.is_err() {
panic!()
}

match result.unwrap() {
ResponseData::Mem(data) => {
assert_eq!(2, data.shuffle_data_block_segments.len());
}
_ => todo!(),
}

// case3: purge
app_manager_ref
.purge_app_data(app_id.to_string())
.await
.expect("");

assert_eq!(false, app_manager_ref.get_app(app_id).is_none());
}
}

Expand Down
25 changes: 25 additions & 0 deletions src/benchmark_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use anyhow::Result;
use tokio::time::Instant;
use uniffle_worker::proto::uniffle::shuffle_server_client::ShuffleServerClient;
use uniffle_worker::write_read_for_one_time;

// This is a general benchmark client for apache uniffle server,
// you could use different concurrency to test the performance

// todo: implement more options to control the concurrency and batch data size

#[tokio::main]
async fn main() -> Result<()> {
let port = 21100;
let host = "0.0.0.0";

let timer = Instant::now();
let client = ShuffleServerClient::connect(format!("http://{}:{}", host, port)).await?;
write_read_for_one_time(client)
.await
.expect("failed to test write -> read.");

println!("cost [{}] ms", timer.elapsed().as_millis());

Ok(())
}
157 changes: 156 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,20 @@ use crate::app::AppManager;
use crate::grpc::DefaultShuffleServer;
use crate::http::{HTTPServer, HTTP_SERVICE};
use crate::metric::configure_metric_service;
use crate::proto::uniffle::shuffle_server_client::ShuffleServerClient;
use crate::proto::uniffle::shuffle_server_server::ShuffleServerServer;
use crate::proto::uniffle::{
GetLocalShuffleDataRequest, GetLocalShuffleIndexRequest, GetMemoryShuffleDataRequest,
RequireBufferRequest, SendShuffleDataRequest, ShuffleBlock, ShuffleData,
ShuffleRegisterRequest,
};
use crate::util::gen_worker_uid;
use anyhow::Result;
use bytes::{Buf, Bytes, BytesMut};
use log::info;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use tonic::transport::Server;
use std::time::Duration;
use tonic::transport::{Channel, Server};

pub async fn start_uniffle_worker(config: config::Config) -> Result<()> {
let rpc_port = config.grpc_port.unwrap_or(19999);
Expand All @@ -63,3 +71,150 @@ pub async fn start_uniffle_worker(config: config::Config) -> Result<()> {
});
Ok(())
}

pub async fn write_read_for_one_time(mut client: ShuffleServerClient<Channel>) -> Result<()> {
let app_id = "write_read_test-app-id".to_string();

let register_response = client
.register_shuffle(ShuffleRegisterRequest {
app_id: app_id.clone(),
shuffle_id: 0,
partition_ranges: vec![],
remote_storage: None,
user: "".to_string(),
shuffle_data_distribution: 1,
max_concurrency_per_partition_to_write: 10,
})
.await?
.into_inner();
assert_eq!(register_response.status, 0);

let mut all_bytes_data = BytesMut::new();
let mut block_ids = vec![];

let batch_size = 3000;

for idx in 0..batch_size {
block_ids.push(idx as i64);

let data = b"hello world";
let len = data.len();

all_bytes_data.extend_from_slice(data);

let buffer_required_resp = client
.require_buffer(RequireBufferRequest {
require_size: len as i32,
app_id: app_id.clone(),
shuffle_id: 0,
partition_ids: vec![],
})
.await?
.into_inner();

assert_eq!(0, buffer_required_resp.status);

let response = client
.send_shuffle_data(SendShuffleDataRequest {
app_id: app_id.clone(),
shuffle_id: 0,
require_buffer_id: buffer_required_resp.require_buffer_id,
shuffle_data: vec![ShuffleData {
partition_id: idx,
block: vec![ShuffleBlock {
block_id: idx as i64,
length: len as i32,
uncompress_length: 0,
crc: 0,
data: Bytes::copy_from_slice(data),
task_attempt_id: 0,
}],
}],
timestamp: 0,
})
.await?;

let response = response.into_inner();
assert_eq!(0, response.status);
}

tokio::time::sleep(Duration::from_secs(1)).await;

let mut accepted_block_ids = vec![];
let mut accepted_data_bytes = BytesMut::new();

// firstly. read from the memory
for idx in 0..batch_size {
let response_data = client
.get_memory_shuffle_data(GetMemoryShuffleDataRequest {
app_id: app_id.clone(),
shuffle_id: 0,
partition_id: idx,
last_block_id: -1,
read_buffer_size: 10000000,
timestamp: 0,
serialized_expected_task_ids_bitmap: Default::default(),
})
.await?;
let response = response_data.into_inner();
let segments = response.shuffle_data_block_segments;
for segment in segments {
accepted_block_ids.push(segment.block_id)
}
let data = response.data;
accepted_data_bytes.extend_from_slice(&data);
}

// secondly, read from the localfile
for idx in 0..batch_size {
let local_index_data = client
.get_local_shuffle_index(GetLocalShuffleIndexRequest {
app_id: app_id.clone(),
shuffle_id: 0,
partition_id: idx,
partition_num_per_range: 1,
partition_num: 0,
})
.await?;

let mut bytes = local_index_data.into_inner().index_data;
if bytes.is_empty() {
continue;
}
// index_bytes_holder.put_i64(next_offset);
// index_bytes_holder.put_i32(length);
// index_bytes_holder.put_i32(uncompress_len);
// index_bytes_holder.put_i64(crc);
// index_bytes_holder.put_i64(block_id);
// index_bytes_holder.put_i64(task_attempt_id);
bytes.get_i64();
let len = bytes.get_i32();
bytes.get_i32();
bytes.get_i64();
let id = bytes.get_i64();
accepted_block_ids.push(id);

let partitioned_local_data = client
.get_local_shuffle_data(GetLocalShuffleDataRequest {
app_id: app_id.clone(),
shuffle_id: 0,
partition_id: idx,
partition_num_per_range: 0,
partition_num: 0,
offset: 0,
length: len,
timestamp: 0,
})
.await?;
accepted_data_bytes.extend_from_slice(&partitioned_local_data.into_inner().data);
}

// check the block ids
assert_eq!(batch_size as usize, accepted_block_ids.len());
assert_eq!(block_ids, accepted_block_ids);

// check the shuffle data
assert_eq!(all_bytes_data.freeze(), accepted_data_bytes.freeze());

Ok(())
}
5 changes: 4 additions & 1 deletion src/store/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::store::{
use anyhow::{anyhow, Result};

use async_trait::async_trait;
use log::{debug, error};
use log::{debug, error, info};
use prometheus::core::{Atomic, AtomicU64};
use std::any::Any;

Expand Down Expand Up @@ -433,19 +433,22 @@ impl Store for HybridStore {

async fn purge(&self, app_id: String) -> Result<()> {
self.hot_store.purge(app_id.clone()).await?;
info!("Removed data of app:[{}] in hot store", &app_id);
if self.warm_store.is_some() {
self.warm_store
.as_ref()
.unwrap()
.purge(app_id.clone())
.await?;
info!("Removed data of app:[{}] in warm store", &app_id);
}
if self.cold_store.is_some() {
self.cold_store
.as_ref()
.unwrap()
.purge(app_id.clone())
.await?;
info!("Removed data of app:[{}] in cold store", &app_id);
}
Ok(())
}
Expand Down
Loading

0 comments on commit b70b67f

Please sign in to comment.