Skip to content

Commit

Permalink
add rust sdk test to workflows.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed Mar 29, 2023
1 parent 69ad079 commit 270c6ca
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 76 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ jobs:
working-directory: integration_tests
run: |
make run-java
- name: Run Rust SDK tests
working-directory: integration_tests
run: |
make run-rust
- name: Upload Logs
if: always()
uses: actions/upload-artifact@v3
Expand Down
4 changes: 4 additions & 0 deletions integration_tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,9 @@ run-java:
java -version
cd sdk/java && MAVEN_OPTS="--add-opens=java.base/java.nio=ALL-UNNAMED" mvn clean compile exec:java


run-go:
cd sdk/go && go run .

run-rust:
cd sdk/rust && cargo run
192 changes: 116 additions & 76 deletions integration_tests/sdk/rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ use ceresdb_client::{
RpcContext,
};

type TestDatas = (Vec<Vec<Value>>, Vec<Vec<Value>>);
const ENDPOINT: &str = "127.0.0.1:8831";

struct TestDatas {
col_names: Vec<String>,
rows: Vec<Vec<Value>>,
}

#[tokio::main]
async fn main() {
println!("Begin test, endpoint:{ENDPOINT}");
Expand All @@ -31,6 +35,7 @@ async fn main() {
test_auto_create_table(&client, &rpc_ctx, now, &test_datas).await;
test_add_column(&client, &rpc_ctx, now, &test_datas).await;

drop_table_if_exists(&client, &rpc_ctx, now).await;
print!("Test done")
}

Expand All @@ -44,14 +49,8 @@ async fn test_auto_create_table(

drop_table_if_exists(client, rpc_ctx, timestamp).await;

write(client, rpc_ctx, timestamp, &test_datas.0, false).await;

let mut query_data = Vec::new();
for data in &test_datas.0 {
let one_query_data = data.clone().into_iter().take(4).collect();
query_data.push(one_query_data);
}
sql_query(client, rpc_ctx, timestamp, &query_data).await;
write(client, rpc_ctx, timestamp, test_datas, false).await;
sql_query(client, rpc_ctx, timestamp, test_datas, false).await;
}

async fn test_add_column(
Expand All @@ -62,11 +61,8 @@ async fn test_add_column(
) {
println!("Test add column");

write(client, rpc_ctx, timestamp, &test_datas.1, true).await;

let mut query_data = test_datas.0.clone();
query_data.extend(test_datas.1.clone());
sql_query(client, rpc_ctx, timestamp, &query_data).await;
write(client, rpc_ctx, timestamp, test_datas, true).await;
sql_query(client, rpc_ctx, timestamp, test_datas, true).await;
}

async fn drop_table_if_exists(client: &Arc<dyn DbClient>, rpc_ctx: &RpcContext, timestamp: i64) {
Expand All @@ -82,47 +78,78 @@ async fn sql_query(
client: &Arc<dyn DbClient>,
rpc_ctx: &RpcContext,
timestamp: i64,
test_data: &Vec<Vec<Value>>,
test_data: &TestDatas,
new_column: bool,
) {
let test_table = format!("test_table_{timestamp}");
let query_req = SqlQueryRequest {
tables: vec![test_table.clone()],
sql: format!("SELECT * from {test_table}"),
};
let resp = client.sql_query(rpc_ctx, &query_req).await.unwrap();
let raw_rows = extract_raw_rows_from_sql_query(&resp);
assert_eq!(resp.affected_rows, 0);
let resp_rows = extract_rows_from_sql_query(&resp);
let mut expected_rows = Vec::new();
if !new_column {
let rows = test_data
.rows
.iter()
.take(2)
.map(|row| row.iter().take(4).cloned().collect::<Vec<_>>());

for row in rows {
let col_names = test_data.col_names.iter().take(4).cloned();
let row = col_names.zip(row.into_iter()).collect::<Vec<_>>();
expected_rows.push(row);
}
} else {
let rows = test_data.rows.iter().cloned();

for row in rows {
let col_names = test_data.col_names.iter().cloned();
let row = col_names.zip(row.into_iter()).collect::<Vec<_>>();
expected_rows.push(row);
}
}

let expected = format_rows(test_data);
let actual = format_rows(&raw_rows);
let expected = format_rows(&expected_rows);
let actual = format_rows(&resp_rows);
assert_eq!(expected, actual);
}

async fn write(
client: &Arc<dyn DbClient>,
rpc_ctx: &RpcContext,
timestamp: i64,
test_data: &Vec<Vec<Value>>,
test_data: &TestDatas,
new_column: bool,
) {
let mut write_req = WriteRequest::default();
let test_table = format!("test_table_{timestamp}");
let mut test_points = Vec::with_capacity(test_data.len());
for test_row in test_data {
let mut test_points = Vec::new();

let rows = if !new_column {
test_data.rows.iter().take(2).cloned().collect::<Vec<_>>()
} else {
vec![test_data.rows[2].clone(), test_data.rows[3].clone()]
};

for row in rows {
let point = {
let timestamp_val = match &test_row[0] {
let timestamp_val = match &row[0] {
Value::Timestamp(val) => *val,
_ => unreachable!(),
};
let builder = PointBuilder::new(test_table.clone())
.timestamp(timestamp_val)
.field("old-field0".to_string(), test_row[1].clone())
.field("old-field1".to_string(), test_row[2].clone())
.tag("old-tagk1".to_string(), test_row[3].clone());
.tag(test_data.col_names[1].clone(), row[1].clone())
.field(test_data.col_names[2].clone(), row[2].clone())
.field(test_data.col_names[3].clone(), row[3].clone());

if new_column {
builder
.tag("new-tag".to_string(), test_row[4].clone())
.field("new-field".to_string(), test_row[5].clone())
.tag(test_data.col_names[4].clone(), row[4].clone())
.field(test_data.col_names[5].clone(), row[5].clone())
.build()
.unwrap()
} else {
Expand All @@ -138,42 +165,52 @@ async fn write(
assert_eq!(resp.failed, 0);
}

fn generate_test_datas(timestamp: i64) -> (Vec<Vec<Value>>, Vec<Vec<Value>>) {
let col0 = vec![
Value::Timestamp(timestamp),
Value::String("old-tagv0".to_string()),
Value::Int64(123),
Value::UInt64(1222223333334),
Value::String("".to_string()),
Value::UInt64(0),
];
let col1 = vec![
Value::Timestamp(timestamp),
Value::String("old-tagv1".to_string()),
Value::Int64(124),
Value::UInt64(1222223333335),
Value::String("".to_string()),
Value::UInt64(0),
fn generate_test_datas(timestamp: i64) -> TestDatas {
let col_names = vec![
"timestamp".to_string(),
"old-tag".to_string(),
"old-field0".to_string(),
"old-field1".to_string(),
"new-tag".to_string(),
"new-field".to_string(),
];

let new_col0 = vec![
Value::Timestamp(timestamp),
Value::String("old-tagv0".to_string()),
Value::Int64(123),
Value::UInt64(1222223333334),
Value::String("new-tagv0".to_string()),
Value::UInt64(666666),
];
let new_col1 = vec![
Value::Timestamp(timestamp),
Value::String("old-tagv1".to_string()),
Value::Int64(124),
Value::UInt64(1222223333335),
Value::String("new-tagv1".to_string()),
Value::UInt64(88888888),
let rows = vec![
vec![
Value::Timestamp(timestamp),
Value::String("old-tagv0".to_string()),
Value::Int64(123),
Value::UInt64(1222223333334),
Value::String("".to_string()),
Value::UInt64(0),
],
vec![
Value::Timestamp(timestamp),
Value::String("old-tagv1".to_string()),
Value::Int64(124),
Value::UInt64(1222223333335),
Value::String("".to_string()),
Value::UInt64(0),
],
vec![
Value::Timestamp(timestamp),
Value::String("old-tagv0".to_string()),
Value::Int64(123),
Value::UInt64(1222223333334),
Value::String("new-tagv0".to_string()),
Value::UInt64(666666),
],
vec![
Value::Timestamp(timestamp),
Value::String("old-tagv1".to_string()),
Value::Int64(124),
Value::UInt64(1222223333335),
Value::String("new-tagv1".to_string()),
Value::UInt64(88888888),
],
];

(vec![col0, col1], vec![new_col0, new_col1])
TestDatas { col_names, rows }
}

fn current_timestamp_ms() -> i64 {
Expand All @@ -183,31 +220,34 @@ fn current_timestamp_ms() -> i64 {
.unwrap_or(0)
}

fn extract_raw_rows_from_sql_query(resp: &SqlQueryResponse) -> Vec<Vec<Value>> {
let mut raw_rows = Vec::with_capacity(resp.rows.len());
fn extract_rows_from_sql_query(resp: &SqlQueryResponse) -> Vec<Vec<(String, Value)>> {
let mut rows = Vec::with_capacity(resp.rows.len());
for row in &resp.rows {
let mut column_iter = row.columns().iter();
// In the automatically created table schema, `tsid` column will be added by
// CeresDB, we just ignore it.
column_iter.next();
let col_vals = column_iter.map(|col| col.value().clone()).collect();
raw_rows.push(col_vals);
let col_vals = column_iter
.map(|col| (col.name().to_string(), col.value().clone()))
.collect();
rows.push(col_vals);
}

raw_rows
rows
}

fn format_rows(rows: &Vec<Vec<Value>>) -> Vec<String> {
let mut formatted_rows = Vec::new();
for row in rows {
let mut row_str = row
.iter()
.map(|col| format!("{col:?}"))
.collect::<Vec<_>>();
row_str.sort();
formatted_rows.push(format!("{row_str:?}"));
}
fn format_rows(rows: &[Vec<(String, Value)>]) -> Vec<String> {
let mut sorted_row_strs = rows
.iter()
.map(|row| {
let mut sorted_row = row.clone();
sorted_row.sort_by(|col1, col2| col1.0.cmp(&col2.0));
let sorted_row = sorted_row.into_iter().map(|col| col.1).collect::<Vec<_>>();

format!("{sorted_row:?}")
})
.collect::<Vec<_>>();
sorted_row_strs.sort();

formatted_rows.sort();
formatted_rows
sorted_row_strs
}

0 comments on commit 270c6ca

Please sign in to comment.