Skip to content

Commit 9b387e6

Browse files
committed
Limit iterations v2
1 parent a98c801 commit 9b387e6

File tree

1 file changed

+9
-2
lines changed

1 file changed

+9
-2
lines changed

benchmark/logic_rust/concurrent_insert.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,29 @@ use scylla::client::session_builder::SessionBuilder;
44
use scylla::statement::prepared::PreparedStatement;
55
use std::env;
66
use std::sync::Arc;
7+
use tokio::sync::Barrier;
78
use uuid::Uuid;
89

910
const CONCURRENCY: usize = 2000;
11+
const STEP: usize = 200000;
1012

1113
async fn insert_data(
1214
session: Arc<Session>,
1315
start_index: usize,
1416
n: i32,
1517
insert_query: &PreparedStatement,
18+
barrier: Arc<Barrier>,
1619
) -> Result<(), Box<dyn std::error::Error>> {
1720
let mut index = start_index;
1821

1922
while index < n as usize {
2023
let id = Uuid::new_v4();
2124
session.execute_unpaged(insert_query, (id, 100)).await?;
25+
if index / STEP != (index + CONCURRENCY) / STEP {
26+
barrier.wait().await;
27+
}
2228
index += CONCURRENCY;
2329
}
24-
2530
Ok(())
2631
}
2732

@@ -60,12 +65,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
6065

6166
let mut handles = vec![];
6267
let session = Arc::new(session);
68+
let barrier = Arc::new(Barrier::new(CONCURRENCY));
6369

6470
for i in 0..CONCURRENCY {
6571
let session_clone = Arc::clone(&session);
6672
let insert_query_clone = insert_query.clone();
73+
let barrier_clone = barrier.clone();
6774
handles.push(tokio::spawn(async move {
68-
insert_data(session_clone, i, n, &insert_query_clone)
75+
insert_data(session_clone, i, n, &insert_query_clone, barrier_clone)
6976
.await
7077
.unwrap();
7178
}));

0 commit comments

Comments
 (0)