From 69ad079c98fa1a46395e34a2477562bdd5c80356 Mon Sep 17 00:00:00 2001 From: kamille Date: Wed, 29 Mar 2023 18:02:23 +0800 Subject: [PATCH 1/3] add rust sdk test. --- Cargo.lock | 8 + Cargo.toml | 1 + integration_tests/sdk/rust/Cargo.toml | 10 ++ integration_tests/sdk/rust/src/main.rs | 213 +++++++++++++++++++++++++ 4 files changed, 232 insertions(+) create mode 100644 integration_tests/sdk/rust/Cargo.toml create mode 100644 integration_tests/sdk/rust/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 43893a822b..a8a61327f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5310,6 +5310,14 @@ dependencies = [ "syn", ] +[[package]] +name = "rust-sdk-test" +version = "0.1.0" +dependencies = [ + "ceresdb-client", + "tokio", +] + [[package]] name = "rust_decimal" version = "1.26.0" diff --git a/Cargo.toml b/Cargo.toml index a27cc61222..38bc437d4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ members = [ "components/tracing_util", "df_operator", "integration_tests", + "integration_tests/sdk/rust", "interpreters", "meta_client", "query_engine", diff --git a/integration_tests/sdk/rust/Cargo.toml b/integration_tests/sdk/rust/Cargo.toml new file mode 100644 index 0000000000..187d254ef9 --- /dev/null +++ b/integration_tests/sdk/rust/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "rust-sdk-test" +version = "0.1.0" + +[package.edition] +workspace = true + +[dependencies] +ceresdb-client = "1.0.0" +tokio = { version = "1.25", features = ["full"] } diff --git a/integration_tests/sdk/rust/src/main.rs b/integration_tests/sdk/rust/src/main.rs new file mode 100644 index 0000000000..5dde491132 --- /dev/null +++ b/integration_tests/sdk/rust/src/main.rs @@ -0,0 +1,213 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::{ + sync::Arc, + time::{self, SystemTime}, +}; + +use ceresdb_client::{ + db_client::{Builder, DbClient, Mode}, + model::{ + sql_query::{Request as SqlQueryRequest, Response as SqlQueryResponse}, + value::Value, + write::{point::PointBuilder, Request as WriteRequest}, + }, + RpcContext, +}; + +type TestDatas = (Vec>, Vec>); +const ENDPOINT: &str = "127.0.0.1:8831"; + +#[tokio::main] +async fn main() { + println!("Begin test, endpoint:{ENDPOINT}"); + + let client = Builder::new(ENDPOINT.to_string(), Mode::Direct).build(); + let rpc_ctx = RpcContext::default().database("public".to_string()); + let now = current_timestamp_ms(); + + let test_datas = generate_test_datas(now); + + test_auto_create_table(&client, &rpc_ctx, now, &test_datas).await; + test_add_column(&client, &rpc_ctx, now, &test_datas).await; + + print!("Test done") +} + +async fn test_auto_create_table( + client: &Arc, + rpc_ctx: &RpcContext, + timestamp: i64, + test_datas: &TestDatas, +) { + println!("Test auto create table"); + + drop_table_if_exists(client, rpc_ctx, timestamp).await; + + write(client, rpc_ctx, timestamp, &test_datas.0, false).await; + + let mut query_data = Vec::new(); + for data in &test_datas.0 { + let one_query_data = data.clone().into_iter().take(4).collect(); + query_data.push(one_query_data); + } + sql_query(client, rpc_ctx, timestamp, &query_data).await; +} + +async fn test_add_column( + client: &Arc, + rpc_ctx: &RpcContext, + timestamp: i64, + test_datas: &TestDatas, +) { + println!("Test add column"); + + write(client, rpc_ctx, timestamp, &test_datas.1, true).await; + + let mut query_data = test_datas.0.clone(); + query_data.extend(test_datas.1.clone()); + sql_query(client, rpc_ctx, timestamp, &query_data).await; +} + +async fn drop_table_if_exists(client: &Arc, rpc_ctx: &RpcContext, timestamp: i64) { + let test_table = format!("test_table_{timestamp}"); + let query_req = SqlQueryRequest { + tables: vec![test_table.clone()], + sql: format!("DROP TABLE IF EXISTS {test_table}"), + }; + let _ = client.sql_query(rpc_ctx, &query_req).await.unwrap(); +} + +async fn sql_query( + client: &Arc, + rpc_ctx: &RpcContext, + timestamp: i64, + test_data: &Vec>, +) { + let test_table = format!("test_table_{timestamp}"); + let query_req = SqlQueryRequest { + tables: vec![test_table.clone()], + sql: format!("SELECT * from {test_table}"), + }; + let resp = client.sql_query(rpc_ctx, &query_req).await.unwrap(); + let raw_rows = extract_raw_rows_from_sql_query(&resp); + + let expected = format_rows(test_data); + let actual = format_rows(&raw_rows); + assert_eq!(expected, actual); +} + +async fn write( + client: &Arc, + rpc_ctx: &RpcContext, + timestamp: i64, + test_data: &Vec>, + new_column: bool, +) { + let mut write_req = WriteRequest::default(); + let test_table = format!("test_table_{timestamp}"); + let mut test_points = Vec::with_capacity(test_data.len()); + for test_row in test_data { + let point = { + let timestamp_val = match &test_row[0] { + Value::Timestamp(val) => *val, + _ => unreachable!(), + }; + let builder = PointBuilder::new(test_table.clone()) + .timestamp(timestamp_val) + .field("old-field0".to_string(), test_row[1].clone()) + .field("old-field1".to_string(), test_row[2].clone()) + .tag("old-tagk1".to_string(), test_row[3].clone()); + + if new_column { + builder + .tag("new-tag".to_string(), test_row[4].clone()) + .field("new-field".to_string(), test_row[5].clone()) + .build() + .unwrap() + } else { + builder.build().unwrap() + } + }; + test_points.push(point); + } + write_req.add_points(test_points); + + let resp = client.write(rpc_ctx, &write_req).await.unwrap(); + assert_eq!(resp.success, 2); + assert_eq!(resp.failed, 0); +} + +fn generate_test_datas(timestamp: i64) -> (Vec>, Vec>) { + let col0 = vec![ + Value::Timestamp(timestamp), + Value::String("old-tagv0".to_string()), + Value::Int64(123), + Value::UInt64(1222223333334), + Value::String("".to_string()), + Value::UInt64(0), + ]; + let col1 = vec![ + Value::Timestamp(timestamp), + Value::String("old-tagv1".to_string()), + Value::Int64(124), + Value::UInt64(1222223333335), + Value::String("".to_string()), + Value::UInt64(0), + ]; + + let new_col0 = vec![ + Value::Timestamp(timestamp), + Value::String("old-tagv0".to_string()), + Value::Int64(123), + Value::UInt64(1222223333334), + Value::String("new-tagv0".to_string()), + Value::UInt64(666666), + ]; + let new_col1 = vec![ + Value::Timestamp(timestamp), + Value::String("old-tagv1".to_string()), + Value::Int64(124), + Value::UInt64(1222223333335), + Value::String("new-tagv1".to_string()), + Value::UInt64(88888888), + ]; + + (vec![col0, col1], vec![new_col0, new_col1]) +} + +fn current_timestamp_ms() -> i64 { + SystemTime::now() + .duration_since(time::UNIX_EPOCH) + .map(|duration| duration.as_millis() as i64) + .unwrap_or(0) +} + +fn extract_raw_rows_from_sql_query(resp: &SqlQueryResponse) -> Vec> { + let mut raw_rows = Vec::with_capacity(resp.rows.len()); + for row in &resp.rows { + let mut column_iter = row.columns().iter(); + // In the automatically created table schema, `tsid` column will be added by + // CeresDB, we just ignore it. + column_iter.next(); + let col_vals = column_iter.map(|col| col.value().clone()).collect(); + raw_rows.push(col_vals); + } + + raw_rows +} + +fn format_rows(rows: &Vec>) -> Vec { + let mut formatted_rows = Vec::new(); + for row in rows { + let mut row_str = row + .iter() + .map(|col| format!("{col:?}")) + .collect::>(); + row_str.sort(); + formatted_rows.push(format!("{row_str:?}")); + } + + formatted_rows.sort(); + formatted_rows +} From 575a8a4581760b0cd57add4e8a11527d0ef2bc62 Mon Sep 17 00:00:00 2001 From: kamille Date: Wed, 29 Mar 2023 18:06:17 +0800 Subject: [PATCH 2/3] add rust sdk test to workflows. --- .github/workflows/ci.yml | 4 + integration_tests/Makefile | 4 + integration_tests/sdk/rust/src/main.rs | 198 +++++++++++++++---------- 3 files changed, 127 insertions(+), 79 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 49db8157fc..549c035101 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -213,6 +213,10 @@ jobs: working-directory: integration_tests run: | make run-java + - name: Run Rust SDK tests + working-directory: integration_tests + run: | + make run-rust - name: Upload Logs if: always() uses: actions/upload-artifact@v3 diff --git a/integration_tests/Makefile b/integration_tests/Makefile index 0f6c1689a3..b48ac49ebc 100644 --- a/integration_tests/Makefile +++ b/integration_tests/Makefile @@ -49,5 +49,9 @@ run-java: java -version cd sdk/java && MAVEN_OPTS="--add-opens=java.base/java.nio=ALL-UNNAMED" mvn clean compile exec:java + run-go: cd sdk/go && go run . + +run-rust: + cd sdk/rust && cargo run diff --git a/integration_tests/sdk/rust/src/main.rs b/integration_tests/sdk/rust/src/main.rs index 5dde491132..a7d0e217cf 100644 --- a/integration_tests/sdk/rust/src/main.rs +++ b/integration_tests/sdk/rust/src/main.rs @@ -15,9 +15,13 @@ use ceresdb_client::{ RpcContext, }; -type TestDatas = (Vec>, Vec>); const ENDPOINT: &str = "127.0.0.1:8831"; +struct TestDatas { + col_names: Vec, + rows: Vec>, +} + #[tokio::main] async fn main() { println!("Begin test, endpoint:{ENDPOINT}"); @@ -31,6 +35,7 @@ async fn main() { test_auto_create_table(&client, &rpc_ctx, now, &test_datas).await; test_add_column(&client, &rpc_ctx, now, &test_datas).await; + drop_table_if_exists(&client, &rpc_ctx, now).await; print!("Test done") } @@ -44,14 +49,8 @@ async fn test_auto_create_table( drop_table_if_exists(client, rpc_ctx, timestamp).await; - write(client, rpc_ctx, timestamp, &test_datas.0, false).await; - - let mut query_data = Vec::new(); - for data in &test_datas.0 { - let one_query_data = data.clone().into_iter().take(4).collect(); - query_data.push(one_query_data); - } - sql_query(client, rpc_ctx, timestamp, &query_data).await; + write(client, rpc_ctx, timestamp, test_datas, false).await; + sql_query(client, rpc_ctx, timestamp, test_datas, false).await; } async fn test_add_column( @@ -62,11 +61,8 @@ async fn test_add_column( ) { println!("Test add column"); - write(client, rpc_ctx, timestamp, &test_datas.1, true).await; - - let mut query_data = test_datas.0.clone(); - query_data.extend(test_datas.1.clone()); - sql_query(client, rpc_ctx, timestamp, &query_data).await; + write(client, rpc_ctx, timestamp, test_datas, true).await; + sql_query(client, rpc_ctx, timestamp, test_datas, true).await; } async fn drop_table_if_exists(client: &Arc, rpc_ctx: &RpcContext, timestamp: i64) { @@ -82,7 +78,8 @@ async fn sql_query( client: &Arc, rpc_ctx: &RpcContext, timestamp: i64, - test_data: &Vec>, + test_data: &TestDatas, + new_column: bool, ) { let test_table = format!("test_table_{timestamp}"); let query_req = SqlQueryRequest { @@ -90,10 +87,33 @@ async fn sql_query( sql: format!("SELECT * from {test_table}"), }; let resp = client.sql_query(rpc_ctx, &query_req).await.unwrap(); - let raw_rows = extract_raw_rows_from_sql_query(&resp); + assert_eq!(resp.affected_rows, 0); + let resp_rows = extract_rows_from_sql_query(&resp); + let mut expected_rows = Vec::new(); + if !new_column { + let rows = test_data + .rows + .iter() + .take(2) + .map(|row| row.iter().take(4).cloned().collect::>()); + + for row in rows { + let col_names = test_data.col_names.iter().take(4).cloned(); + let row = col_names.zip(row.into_iter()).collect::>(); + expected_rows.push(row); + } + } else { + let rows = test_data.rows.iter().cloned(); + + for row in rows { + let col_names = test_data.col_names.iter().cloned(); + let row = col_names.zip(row.into_iter()).collect::>(); + expected_rows.push(row); + } + } - let expected = format_rows(test_data); - let actual = format_rows(&raw_rows); + let expected = format_rows(&expected_rows); + let actual = format_rows(&resp_rows); assert_eq!(expected, actual); } @@ -101,79 +121,96 @@ async fn write( client: &Arc, rpc_ctx: &RpcContext, timestamp: i64, - test_data: &Vec>, + test_data: &TestDatas, new_column: bool, ) { - let mut write_req = WriteRequest::default(); let test_table = format!("test_table_{timestamp}"); - let mut test_points = Vec::with_capacity(test_data.len()); - for test_row in test_data { + let mut write_req = WriteRequest::default(); + let mut points = Vec::new(); + + let rows = if !new_column { + test_data.rows.iter().take(2).cloned().collect::>() + } else { + vec![test_data.rows[2].clone(), test_data.rows[3].clone()] + }; + + for row in rows { let point = { - let timestamp_val = match &test_row[0] { + let timestamp_val = match &row[0] { Value::Timestamp(val) => *val, _ => unreachable!(), }; let builder = PointBuilder::new(test_table.clone()) .timestamp(timestamp_val) - .field("old-field0".to_string(), test_row[1].clone()) - .field("old-field1".to_string(), test_row[2].clone()) - .tag("old-tagk1".to_string(), test_row[3].clone()); + .tag(test_data.col_names[1].clone(), row[1].clone()) + .field(test_data.col_names[2].clone(), row[2].clone()) + .field(test_data.col_names[3].clone(), row[3].clone()); if new_column { builder - .tag("new-tag".to_string(), test_row[4].clone()) - .field("new-field".to_string(), test_row[5].clone()) + .tag(test_data.col_names[4].clone(), row[4].clone()) + .field(test_data.col_names[5].clone(), row[5].clone()) .build() .unwrap() } else { builder.build().unwrap() } }; - test_points.push(point); + points.push(point); } - write_req.add_points(test_points); + write_req.add_points(points); let resp = client.write(rpc_ctx, &write_req).await.unwrap(); assert_eq!(resp.success, 2); assert_eq!(resp.failed, 0); } -fn generate_test_datas(timestamp: i64) -> (Vec>, Vec>) { - let col0 = vec![ - Value::Timestamp(timestamp), - Value::String("old-tagv0".to_string()), - Value::Int64(123), - Value::UInt64(1222223333334), - Value::String("".to_string()), - Value::UInt64(0), - ]; - let col1 = vec![ - Value::Timestamp(timestamp), - Value::String("old-tagv1".to_string()), - Value::Int64(124), - Value::UInt64(1222223333335), - Value::String("".to_string()), - Value::UInt64(0), +fn generate_test_datas(timestamp: i64) -> TestDatas { + let col_names = vec![ + "timestamp".to_string(), + "old-tag".to_string(), + "old-field0".to_string(), + "old-field1".to_string(), + "new-tag".to_string(), + "new-field".to_string(), ]; - let new_col0 = vec![ - Value::Timestamp(timestamp), - Value::String("old-tagv0".to_string()), - Value::Int64(123), - Value::UInt64(1222223333334), - Value::String("new-tagv0".to_string()), - Value::UInt64(666666), - ]; - let new_col1 = vec![ - Value::Timestamp(timestamp), - Value::String("old-tagv1".to_string()), - Value::Int64(124), - Value::UInt64(1222223333335), - Value::String("new-tagv1".to_string()), - Value::UInt64(88888888), + let rows = vec![ + vec![ + Value::Timestamp(timestamp), + Value::String("old-tagv0".to_string()), + Value::Int64(123), + Value::UInt64(1222223333334), + Value::String("".to_string()), + Value::UInt64(0), + ], + vec![ + Value::Timestamp(timestamp), + Value::String("old-tagv1".to_string()), + Value::Int64(124), + Value::UInt64(1222223333335), + Value::String("".to_string()), + Value::UInt64(0), + ], + vec![ + Value::Timestamp(timestamp), + Value::String("old-tagv0".to_string()), + Value::Int64(123), + Value::UInt64(1222223333334), + Value::String("new-tagv0".to_string()), + Value::UInt64(666666), + ], + vec![ + Value::Timestamp(timestamp), + Value::String("old-tagv1".to_string()), + Value::Int64(124), + Value::UInt64(1222223333335), + Value::String("new-tagv1".to_string()), + Value::UInt64(88888888), + ], ]; - (vec![col0, col1], vec![new_col0, new_col1]) + TestDatas { col_names, rows } } fn current_timestamp_ms() -> i64 { @@ -183,31 +220,34 @@ fn current_timestamp_ms() -> i64 { .unwrap_or(0) } -fn extract_raw_rows_from_sql_query(resp: &SqlQueryResponse) -> Vec> { - let mut raw_rows = Vec::with_capacity(resp.rows.len()); +fn extract_rows_from_sql_query(resp: &SqlQueryResponse) -> Vec> { + let mut rows = Vec::with_capacity(resp.rows.len()); for row in &resp.rows { let mut column_iter = row.columns().iter(); // In the automatically created table schema, `tsid` column will be added by // CeresDB, we just ignore it. column_iter.next(); - let col_vals = column_iter.map(|col| col.value().clone()).collect(); - raw_rows.push(col_vals); + let col_vals = column_iter + .map(|col| (col.name().to_string(), col.value().clone())) + .collect(); + rows.push(col_vals); } - raw_rows + rows } -fn format_rows(rows: &Vec>) -> Vec { - let mut formatted_rows = Vec::new(); - for row in rows { - let mut row_str = row - .iter() - .map(|col| format!("{col:?}")) - .collect::>(); - row_str.sort(); - formatted_rows.push(format!("{row_str:?}")); - } +fn format_rows(rows: &[Vec<(String, Value)>]) -> Vec { + let mut sorted_row_strs = rows + .iter() + .map(|row| { + let mut sorted_row = row.clone(); + sorted_row.sort_by(|col1, col2| col1.0.cmp(&col2.0)); + let sorted_row = sorted_row.into_iter().map(|col| col.1).collect::>(); + + format!("{sorted_row:?}") + }) + .collect::>(); + sorted_row_strs.sort(); - formatted_rows.sort(); - formatted_rows + sorted_row_strs } From 71a156abc031fe2e69ee5fd35b204e4b88bdc7d4 Mon Sep 17 00:00:00 2001 From: kamille Date: Thu, 30 Mar 2023 14:19:56 +0800 Subject: [PATCH 3/3] refactor the select sql. --- integration_tests/sdk/rust/src/main.rs | 34 +++++++++++++++++--------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/integration_tests/sdk/rust/src/main.rs b/integration_tests/sdk/rust/src/main.rs index a7d0e217cf..dbcfbf0687 100644 --- a/integration_tests/sdk/rust/src/main.rs +++ b/integration_tests/sdk/rust/src/main.rs @@ -81,10 +81,28 @@ async fn sql_query( test_data: &TestDatas, new_column: bool, ) { + let all_columns = test_data.col_names.clone(); + let selections = if !new_column { + format!( + "`{}`,`{}`,`{}`,`{}`", + all_columns[0], all_columns[1], all_columns[2], all_columns[3] + ) + } else { + format!( + "`{}`,`{}`,`{}`,`{}`,`{}`,`{}`", + all_columns[0], + all_columns[1], + all_columns[2], + all_columns[3], + all_columns[4], + all_columns[5] + ) + }; + let test_table = format!("test_table_{timestamp}"); let query_req = SqlQueryRequest { tables: vec![test_table.clone()], - sql: format!("SELECT * from {test_table}"), + sql: format!("SELECT {selections} from {test_table}"), }; let resp = client.sql_query(rpc_ctx, &query_req).await.unwrap(); assert_eq!(resp.affected_rows, 0); @@ -136,12 +154,8 @@ async fn write( for row in rows { let point = { - let timestamp_val = match &row[0] { - Value::Timestamp(val) => *val, - _ => unreachable!(), - }; let builder = PointBuilder::new(test_table.clone()) - .timestamp(timestamp_val) + .timestamp(timestamp) .tag(test_data.col_names[1].clone(), row[1].clone()) .field(test_data.col_names[2].clone(), row[2].clone()) .field(test_data.col_names[3].clone(), row[3].clone()); @@ -223,11 +237,9 @@ fn current_timestamp_ms() -> i64 { fn extract_rows_from_sql_query(resp: &SqlQueryResponse) -> Vec> { let mut rows = Vec::with_capacity(resp.rows.len()); for row in &resp.rows { - let mut column_iter = row.columns().iter(); - // In the automatically created table schema, `tsid` column will be added by - // CeresDB, we just ignore it. - column_iter.next(); - let col_vals = column_iter + let col_vals = row + .columns() + .iter() .map(|col| (col.name().to_string(), col.value().clone())) .collect(); rows.push(col_vals);