Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature burst #278

Merged
merged 12 commits into from
Aug 8, 2023
12 changes: 6 additions & 6 deletions Cargo.lock
liketoeatcheese marked this conversation as resolved.
Show resolved Hide resolved
hatoo marked this conversation as resolved.
Show resolved Hide resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

205 changes: 151 additions & 54 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ impl Default for ClientState {
}
}

pub enum QueryLimit {
Qps(usize),
Burst(std::time::Duration, usize),
}

impl Client {
#[cfg(unix)]
async fn client(
Expand Down Expand Up @@ -562,23 +567,48 @@ pub async fn work(
pub async fn work_with_qps(
client: Client,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
qps: usize,
burst: QueryLimit,
liketoeatcheese marked this conversation as resolved.
Show resolved Hide resolved
n_tasks: usize,
n_workers: usize,
) {
let (tx, rx) = flume::unbounded();

tokio::spawn(async move {
let start = std::time::Instant::now();
for i in 0..n_tasks {
tokio::time::sleep_until(
(start + i as u32 * std::time::Duration::from_secs(1) / qps as u32).into(),
)
.await;
tx.send_async(()).await.unwrap();
match burst {
QueryLimit::Qps(qps) => {
tokio::spawn(async move {
let start = std::time::Instant::now();
for i in 0..n_tasks {
tokio::time::sleep_until(
(start + i as u32 * std::time::Duration::from_secs(1) / qps as u32).into(),
)
.await;
tx.send_async(()).await.unwrap();
}
// tx gone
});
}
// tx gone
});
QueryLimit::Burst(duration, rate) => {
tokio::spawn(async move {
let mut n = 0;
// Handle via rate till n_tasks out of bound
while n < n_tasks {
liketoeatcheese marked this conversation as resolved.
Show resolved Hide resolved
tokio::time::sleep(duration).await;
for _ in 0..rate {
tx.send_async(()).await.unwrap();
}
n += rate;
}
// Handle the remaining tasks
if n - n_tasks < rate && n - n_tasks > 0 {
liketoeatcheese marked this conversation as resolved.
Show resolved Hide resolved
tokio::time::sleep(duration).await;
for _ in 0..n_tasks - n {
tx.send_async(()).await.unwrap();
}
}
// tx gone
});
}
}

let client = Arc::new(client);

Expand Down Expand Up @@ -610,23 +640,48 @@ pub async fn work_with_qps(
pub async fn work_with_qps_latency_correction(
client: Client,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
qps: usize,
burst: QueryLimit,
n_tasks: usize,
n_workers: usize,
) {
let (tx, rx) = flume::unbounded();

tokio::spawn(async move {
let start = std::time::Instant::now();
for i in 0..n_tasks {
tx.send_async(std::time::Instant::now()).await.unwrap();
tokio::time::sleep_until(
(start + i as u32 * std::time::Duration::from_secs(1) / qps as u32).into(),
)
.await;
match burst {
QueryLimit::Qps(qps) => {
tokio::spawn(async move {
let start = std::time::Instant::now();
for i in 0..n_tasks {
tx.send_async(std::time::Instant::now()).await.unwrap();
tokio::time::sleep_until(
(start + i as u32 * std::time::Duration::from_secs(1) / qps as u32).into(),
)
.await;
}
// tx gone
});
}
// tx gone
});
QueryLimit::Burst(duration, rate) => {
tokio::spawn(async move {
let mut n = 0;
// Handle via rate till n_tasks out of bound
while n < n_tasks {
liketoeatcheese marked this conversation as resolved.
Show resolved Hide resolved
tokio::time::sleep(duration).await;
for _ in 0..rate {
tx.send_async(std::time::Instant::now()).await.unwrap();
}
n += rate;
}
// Handle the remaining tasks
if n - n_tasks < rate && n - n_tasks > 0 {
tokio::time::sleep(duration).await;
for _ in 0..n_tasks - n {
tx.send_async(std::time::Instant::now()).await.unwrap();
}
}
// tx gone
});
}
}

let client = Arc::new(client);

Expand Down Expand Up @@ -695,28 +750,50 @@ pub async fn work_until(
pub async fn work_until_with_qps(
client: Client,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
qps: usize,
burst: QueryLimit,
start: std::time::Instant,
dead_line: std::time::Instant,
n_workers: usize,
) {
let (tx, rx) = flume::bounded(qps);
let rx = match burst {
QueryLimit::Qps(qps) => {
let (tx, rx) = flume::bounded(qps);
tokio::spawn(async move {
for i in 0.. {
if std::time::Instant::now() > dead_line {
break;
}
tokio::time::sleep_until(
(start + i as u32 * std::time::Duration::from_secs(1) / qps as u32).into(),
)
.await;
if tx.send_async(()).await.is_err() {
break;
}
}
// tx gone
});
rx
}
QueryLimit::Burst(duration, rate) => {
let (tx, rx) = flume::unbounded();
tokio::spawn(async move {
// Handle via rate till deadline is reached
for _ in 0.. {
if std::time::Instant::now() > dead_line {
break;
}

tokio::spawn(async move {
for i in 0.. {
if std::time::Instant::now() > dead_line {
break;
}
tokio::time::sleep_until(
(start + i as u32 * std::time::Duration::from_secs(1) / qps as u32).into(),
)
.await;
if tx.send_async(()).await.is_err() {
break;
}
tokio::time::sleep(duration).await;
for _ in 0..rate {
tx.send_async(()).await.unwrap();
}
}
// tx gone
});
rx
}
// tx gone
});
};

let client = Arc::new(client);

Expand Down Expand Up @@ -749,29 +826,49 @@ pub async fn work_until_with_qps(
pub async fn work_until_with_qps_latency_correction(
client: Client,
report_tx: flume::Sender<Result<RequestResult, ClientError>>,
qps: usize,
burst: QueryLimit,
start: std::time::Instant,
dead_line: std::time::Instant,
n_workers: usize,
) {
let (tx, rx) = flume::unbounded();
match burst {
QueryLimit::Qps(qps) => {
tokio::spawn(async move {
for i in 0.. {
let now = std::time::Instant::now();
if now > dead_line {
break;
}
tokio::time::sleep_until(
(start + i as u32 * std::time::Duration::from_secs(1) / qps as u32).into(),
)
.await;
if tx.send_async(now).await.is_err() {
liketoeatcheese marked this conversation as resolved.
Show resolved Hide resolved
break;
}
}
// tx gone
});
}
QueryLimit::Burst(duration, rate) => {
tokio::spawn(async move {
// Handle via rate till deadline is reached
loop {
let now = std::time::Instant::now();
if now > dead_line {
break;
}

tokio::spawn(async move {
for i in 0.. {
let now = std::time::Instant::now();
if now > dead_line {
break;
}
tokio::time::sleep_until(
(start + i as u32 * std::time::Duration::from_secs(1) / qps as u32).into(),
)
.await;
if tx.send_async(now).await.is_err() {
break;
}
tokio::time::sleep(duration).await;
for _ in 0..rate {
tx.send_async(now).await.unwrap();
liketoeatcheese marked this conversation as resolved.
Show resolved Hide resolved
}
}
// tx gone
});
}
// tx gone
});
};

let client = Arc::new(client);

Expand Down
Loading
Loading