Skip to content

Test delete #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 129 additions & 98 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fs::read_to_string;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use anyhow::Result;
use databend_driver::new_connection;
Expand Down Expand Up @@ -69,45 +69,43 @@ async fn execute(dsn: &str, iterations: u32) -> Result<u32> {
num_of_success += 1;
}

if (batch_id + 1) % 7 == 0 {
// introduce more conflicts if possible in on replace into stmt
let ids = vec![batch_id, batch_id / 2, batch_id / 3];
exec_replace_conflict(&dsn, &ids).await?;
}
// introduce more conflicts if possible in on replace into stmt
let ids = vec![batch_id];
exec_replace_conflict(&dsn, &ids).await?;
}
Ok::<_, anyhow::Error>(num_of_success)
}
});

let shutdown = Arc::new(AtomicBool::new(false));
// let shutdown = Arc::new(AtomicBool::new(false));

// background tasks to maintain the table
let maintain_handle = tokio::spawn({
let dsn = dsn.to_string();
let shutdown = shutdown.clone();
async move {
let mut batch_id = 0;
loop {
if shutdown.load(Ordering::Relaxed) {
break;
}
// we do not care if this fails
let _ = exec_table_maintenance(&dsn, batch_id).await;
batch_id += 1;
}
Ok::<_, anyhow::Error>(())
}
});
// let maintain_handle = tokio::spawn({
// let dsn = dsn.to_string();
// let shutdown = shutdown.clone();
// async move {
// let mut batch_id = 0;
// loop {
// if shutdown.load(Ordering::Relaxed) {
// break;
// }
// // we do not care if this fails
// let _ = exec_table_maintenance(&dsn, batch_id).await;
// batch_id += 1;
// }
// Ok::<_, anyhow::Error>(())
// }
// });

// join all the join handles

// wait for replace stmts to finish
let success_replace_stmts = replace_handle.await??;

// then we shutdown the table maintenance tasks
shutdown.store(true, Ordering::Relaxed);
// shutdown.store(true, Ordering::Relaxed);

maintain_handle.await??;
// maintain_handle.await??;

Ok(success_replace_stmts)
}
Expand All @@ -119,7 +117,7 @@ async fn exec_replace_conflict(dsn: &str, batch_ids: &[u32]) -> Result<bool> {
.map(|id| id.to_string())
.collect::<Vec<_>>()
.join(",");
info!("executing replace (with conflict) : [{}]", ids);
info!("executing merge-into (with conflict) : [{}]", ids);

// generate sub query which combine all the data that generated by history batch ids

Expand All @@ -133,17 +131,23 @@ async fn exec_replace_conflict(dsn: &str, batch_ids: &[u32]) -> Result<bool> {

// replace these history data into the table (itself). while table being compacted and re-clustered
// this may lead to partial and total block update.
let sql = format!("replace into test_order on(id, insert_time) ({sub_query})");

let sql = format!(
"merge into test_order as t
using ({sub_query}) as s
on t.id = s.id and t.insert_time = s.insert_time
when matched then delete
when not matched then insert *
"
);

match conn.exec(&sql).await {
Ok(_) => {
info!("Ok. replace batch (with conflict) : [{}]", ids);
info!("Ok. merge-into batch (with conflict) : [{}]", ids);
Ok(true)
}
Err(e) => {
// replace may be failed due to concurrent mutations (compact, purge, recluster)
info!("Err. replace batch (with conflict) : [{}]. {e}", ids);
info!("Err. merge-into batch (with conflict) : [{}]. {e}", ids);
Ok(false)
}
}
Expand All @@ -152,61 +156,88 @@ async fn exec_replace_conflict(dsn: &str, batch_ids: &[u32]) -> Result<bool> {
async fn exec_replace(dsn: &str, batch_id: u32) -> Result<bool> {
let conn = new_connection(dsn)?;

info!("executing replace batch : {}", batch_id);
info!("executing merge-into batch : {}", batch_id);
let batch_correlated_value = batch_id * 7;
let truncate_sql = "truncate table random_source_store";
match conn.exec(&truncate_sql).await {
Ok(_) => {}
Err(e) => {
panic!("{:?}", e);
}
};

let insert_sql = format!(
"insert into random_source_store (select
id,
{batch_id} as id1,
{batch_correlated_value} as id2,
id3, id4, id5, id6, id7,
s1, s2, s3, s4, s5, s6, s7, s8, s9, s10, s11, s12, s13,
d1, d2, d3, d4, d5, d6, d7, d8, d9, d10,
insert_time,
insert_time1,
insert_time2,
insert_time3,
i
from random_source limit 1000)"
);

match conn.exec(&insert_sql).await {
Ok(_) => {}
Err(e) => {
panic!("{:?}", e);
}
};
//on(id, insert_time)
let sql = format!(
"
replace into test_order on(id, insert_time)
select
id,
{batch_id} as id1,
{batch_correlated_value} as id2,
id3, id4, id5, id6, id7,
s1, s2, s3, s4, s5, s6, s7, s8, s9, s10, s11, s12, s13,
d1, d2, d3, d4, d5, d6, d7, d8, d9, d10,
insert_time,
insert_time1,
insert_time2,
insert_time3,
i
from random_source limit 1000
merge into test_order as t
using (
select * from random_source_store
) as s

on t.id = s.id and t.insert_time = s.insert_time

when matched then update *
when not matched then insert *
"
);

match conn.exec(&sql).await {
Ok(_) => {
info!("Ok. replace batch : {}", batch_id);
info!("Ok. merge-into batch : {}", batch_id);
Ok(true)
}
Err(e) => {
// replace may be failed due to concurrent mutations (compact, purge, recluster)
info!("Err. replace batch : {}. {e}", batch_id);
info!("Err. merge-into batch : {}. {e}", batch_id);
Ok(false)
}
}
}

async fn exec_table_maintenance(dsn: &str, batch_id: i32) -> Result<()> {
info!("executing table maintenance batch : {}", batch_id);
let conn = new_connection(dsn)?;
let sqls = vec![
//"select * from test_order ignore_result",
"optimize table test_order compact segment",
"optimize table test_order compact",
"optimize table test_order purge",
"alter table test_order recluster",
];
for sql in sqls {
match conn.exec(sql).await {
Ok(_) => {
info!("Ok. maintenance batch : {}", batch_id);
}
Err(e) => {
info!("Err. maintenance batch : {}. {e}", batch_id);
}
}
}
Ok(())
}
// async fn exec_table_maintenance(dsn: &str, batch_id: i32) -> Result<()> {
// info!("executing table maintenance batch : {}", batch_id);
// let conn = new_connection(dsn)?;
// let sqls = vec![
// //"select * from test_order ignore_result",
// "optimize table test_order compact segment",
// "optimize table test_order compact",
// "optimize table test_order purge",
// "alter table test_order recluster",
// ];
// for sql in sqls {
// match conn.exec(sql).await {
// Ok(_) => {
// info!("Ok. maintenance batch : {}", batch_id);
// }
// Err(e) => {
// info!("Err. maintenance batch : {}. {e}", batch_id);
// }
// }
// }
// Ok(())
// }

async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> {
info!("==========================");
Expand All @@ -217,15 +248,15 @@ async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> {
info!(" ");
info!(" ");
info!(
"number of successfully executed replace-into statements : {}",
"number of successfully executed merge-into statements : {}",
success_replace_stmts
);
info!(" ");
info!(" ");

// - check the table data match the number of successfully executed replace into statements
{
info!("CHECK: value of successfully executed replace into statements");
info!("CHECK: value of successfully executed merge-into statements");

// For most of the cases, there should be 1000 * success_replace_stmts rows
//
Expand All @@ -236,9 +267,9 @@ async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> {

let mut rows = conn.query_iter("select count() from test_order").await?;
let r = rows.next().await.unwrap().unwrap();
let count: (u32, ) = r.try_into()?;
let count: (u32,) = r.try_into()?;
info!(
"CHECK: value of successfully executed replace into statements: client {}, server {}",
"CHECK: value of successfully executed merge-into statements: client {}, server {}",
success_replace_stmts * 1000,
count.0
);
Expand All @@ -249,43 +280,43 @@ async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> {
let mut rows = conn
.query_iter(
"
select count() from
(select count() a, id1 from test_order group by id1)
where a != 1000",
select count() from test_order
",
)
.await?;
let r = rows.next().await.unwrap().unwrap();
let count: (u32, ) = r.try_into()?;
let count: (u32,) = r.try_into()?;
// conflict test deleted all data
assert_eq!(0, count.0);

// show the number of distinct value of id2
// not required to be equal, since there might be communication failures
let mut rows = conn
.query_iter("select count(distinct(id2)) from test_order")
.await?;
let r = rows.next().await.unwrap().unwrap();
let count: (u32, ) = r.try_into()?;

assert_eq!(success_replace_stmts, count.0);
info!(
"CHECK: distinct ids: client {}, server {}",
success_replace_stmts, count.0
);
// let mut rows = conn
// .query_iter("select count(distinct(id2)) from test_order")
// .await?;
// let r = rows.next().await.unwrap().unwrap();
// let count: (u32,) = r.try_into()?;

// assert_eq!(success_replace_stmts, count.0);
// info!(
// "CHECK: distinct ids: client {}, server {}",
// success_replace_stmts, count.0
// );
}

// - check the value of correlated column
// for all the rows, id2 should be equal to id1 * 7
{
let mut rows = conn
.query_iter("select count() from test_order where id2 != id1 * 7")
.await?;
let r = rows.next().await.unwrap().unwrap();
let count: (i64, ) = r.try_into()?;
// {
// let mut rows = conn
// .query_iter("select count() from test_order where id2 != id1 * 7")
// .await?;
// let r = rows.next().await.unwrap().unwrap();
// let count: (i64,) = r.try_into()?;

info!("CHECK: value of correlated column");
// info!("CHECK: value of correlated column");

assert_eq!(0, count.0);
}
// assert_eq!(0, count.0);
// }

// - full table scan, ensure that the table data is not damaged
info!("CHECK: full table scanning");
Expand All @@ -306,7 +337,7 @@ async fn verify(dsn: &str, success_replace_stmts: u32) -> Result<()> {
info!(" ");

info!("========METRICS============");
let mut rows = conn.query_iter("select metric, value from system.metrics where metric like '%replace%' or metric like '%conflict%' order by metric")
let mut rows = conn.query_iter("select metric, value from system.metrics where metric like '%merge%' or metric like '%conflict%' order by metric")
.await?;
while let Some(r) = rows.next().await {
let (metric, value): (String, String) = r.unwrap().try_into()?;
Expand Down
Loading