diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 49db8157fc..549c035101 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -213,6 +213,10 @@ jobs: working-directory: integration_tests run: | make run-java + - name: Run Rust SDK tests + working-directory: integration_tests + run: | + make run-rust - name: Upload Logs if: always() uses: actions/upload-artifact@v3 diff --git a/integration_tests/Makefile b/integration_tests/Makefile index 0f6c1689a3..b48ac49ebc 100644 --- a/integration_tests/Makefile +++ b/integration_tests/Makefile @@ -49,5 +49,9 @@ run-java: java -version cd sdk/java && MAVEN_OPTS="--add-opens=java.base/java.nio=ALL-UNNAMED" mvn clean compile exec:java + run-go: cd sdk/go && go run . + +run-rust: + cd sdk/rust && cargo run diff --git a/integration_tests/sdk/rust/src/main.rs b/integration_tests/sdk/rust/src/main.rs index 5dde491132..8aade47087 100644 --- a/integration_tests/sdk/rust/src/main.rs +++ b/integration_tests/sdk/rust/src/main.rs @@ -15,9 +15,13 @@ use ceresdb_client::{ RpcContext, }; -type TestDatas = (Vec>, Vec>); const ENDPOINT: &str = "127.0.0.1:8831"; +struct TestDatas { + col_names: Vec, + rows: Vec>, +} + #[tokio::main] async fn main() { println!("Begin test, endpoint:{ENDPOINT}"); @@ -31,6 +35,7 @@ async fn main() { test_auto_create_table(&client, &rpc_ctx, now, &test_datas).await; test_add_column(&client, &rpc_ctx, now, &test_datas).await; + drop_table_if_exists(&client, &rpc_ctx, now).await; print!("Test done") } @@ -44,14 +49,8 @@ async fn test_auto_create_table( drop_table_if_exists(client, rpc_ctx, timestamp).await; - write(client, rpc_ctx, timestamp, &test_datas.0, false).await; - - let mut query_data = Vec::new(); - for data in &test_datas.0 { - let one_query_data = data.clone().into_iter().take(4).collect(); - query_data.push(one_query_data); - } - sql_query(client, rpc_ctx, timestamp, &query_data).await; + write(client, rpc_ctx, timestamp, test_datas, false).await; + sql_query(client, rpc_ctx, timestamp, test_datas, false).await; } async fn test_add_column( @@ -62,11 +61,8 @@ async fn test_add_column( ) { println!("Test add column"); - write(client, rpc_ctx, timestamp, &test_datas.1, true).await; - - let mut query_data = test_datas.0.clone(); - query_data.extend(test_datas.1.clone()); - sql_query(client, rpc_ctx, timestamp, &query_data).await; + write(client, rpc_ctx, timestamp, test_datas, true).await; + sql_query(client, rpc_ctx, timestamp, test_datas, true).await; } async fn drop_table_if_exists(client: &Arc, rpc_ctx: &RpcContext, timestamp: i64) { @@ -82,7 +78,8 @@ async fn sql_query( client: &Arc, rpc_ctx: &RpcContext, timestamp: i64, - test_data: &Vec>, + test_data: &TestDatas, + new_column: bool, ) { let test_table = format!("test_table_{timestamp}"); let query_req = SqlQueryRequest { @@ -90,10 +87,33 @@ async fn sql_query( sql: format!("SELECT * from {test_table}"), }; let resp = client.sql_query(rpc_ctx, &query_req).await.unwrap(); - let raw_rows = extract_raw_rows_from_sql_query(&resp); + assert_eq!(resp.affected_rows, 0); + let resp_rows = extract_rows_from_sql_query(&resp); + let mut expected_rows = Vec::new(); + if !new_column { + let rows = test_data + .rows + .iter() + .take(2) + .map(|row| row.iter().take(4).cloned().collect::>()); + + for row in rows { + let col_names = test_data.col_names.iter().take(4).cloned(); + let row = col_names.zip(row.into_iter()).collect::>(); + expected_rows.push(row); + } + } else { + let rows = test_data.rows.iter().cloned(); + + for row in rows { + let col_names = test_data.col_names.iter().cloned(); + let row = col_names.zip(row.into_iter()).collect::>(); + expected_rows.push(row); + } + } - let expected = format_rows(test_data); - let actual = format_rows(&raw_rows); + let expected = format_rows(&expected_rows); + let actual = format_rows(&resp_rows); assert_eq!(expected, actual); } @@ -101,28 +121,35 @@ async fn write( client: &Arc, rpc_ctx: &RpcContext, timestamp: i64, - test_data: &Vec>, + test_data: &TestDatas, new_column: bool, ) { let mut write_req = WriteRequest::default(); let test_table = format!("test_table_{timestamp}"); - let mut test_points = Vec::with_capacity(test_data.len()); - for test_row in test_data { + let mut test_points = Vec::new(); + + let rows = if !new_column { + test_data.rows.iter().take(2).cloned().collect::>() + } else { + vec![test_data.rows[2].clone(), test_data.rows[3].clone()] + }; + + for row in rows { let point = { - let timestamp_val = match &test_row[0] { + let timestamp_val = match &row[0] { Value::Timestamp(val) => *val, _ => unreachable!(), }; let builder = PointBuilder::new(test_table.clone()) .timestamp(timestamp_val) - .field("old-field0".to_string(), test_row[1].clone()) - .field("old-field1".to_string(), test_row[2].clone()) - .tag("old-tagk1".to_string(), test_row[3].clone()); + .tag(test_data.col_names[1].clone(), row[1].clone()) + .field(test_data.col_names[2].clone(), row[2].clone()) + .field(test_data.col_names[3].clone(), row[3].clone()); if new_column { builder - .tag("new-tag".to_string(), test_row[4].clone()) - .field("new-field".to_string(), test_row[5].clone()) + .tag(test_data.col_names[4].clone(), row[4].clone()) + .field(test_data.col_names[5].clone(), row[5].clone()) .build() .unwrap() } else { @@ -138,42 +165,52 @@ async fn write( assert_eq!(resp.failed, 0); } -fn generate_test_datas(timestamp: i64) -> (Vec>, Vec>) { - let col0 = vec![ - Value::Timestamp(timestamp), - Value::String("old-tagv0".to_string()), - Value::Int64(123), - Value::UInt64(1222223333334), - Value::String("".to_string()), - Value::UInt64(0), - ]; - let col1 = vec![ - Value::Timestamp(timestamp), - Value::String("old-tagv1".to_string()), - Value::Int64(124), - Value::UInt64(1222223333335), - Value::String("".to_string()), - Value::UInt64(0), +fn generate_test_datas(timestamp: i64) -> TestDatas { + let col_names = vec![ + "timestamp".to_string(), + "old-tag".to_string(), + "old-field0".to_string(), + "old-field1".to_string(), + "new-tag".to_string(), + "new-field".to_string(), ]; - let new_col0 = vec![ - Value::Timestamp(timestamp), - Value::String("old-tagv0".to_string()), - Value::Int64(123), - Value::UInt64(1222223333334), - Value::String("new-tagv0".to_string()), - Value::UInt64(666666), - ]; - let new_col1 = vec![ - Value::Timestamp(timestamp), - Value::String("old-tagv1".to_string()), - Value::Int64(124), - Value::UInt64(1222223333335), - Value::String("new-tagv1".to_string()), - Value::UInt64(88888888), + let rows = vec![ + vec![ + Value::Timestamp(timestamp), + Value::String("old-tagv0".to_string()), + Value::Int64(123), + Value::UInt64(1222223333334), + Value::String("".to_string()), + Value::UInt64(0), + ], + vec![ + Value::Timestamp(timestamp), + Value::String("old-tagv1".to_string()), + Value::Int64(124), + Value::UInt64(1222223333335), + Value::String("".to_string()), + Value::UInt64(0), + ], + vec![ + Value::Timestamp(timestamp), + Value::String("old-tagv0".to_string()), + Value::Int64(123), + Value::UInt64(1222223333334), + Value::String("new-tagv0".to_string()), + Value::UInt64(666666), + ], + vec![ + Value::Timestamp(timestamp), + Value::String("old-tagv1".to_string()), + Value::Int64(124), + Value::UInt64(1222223333335), + Value::String("new-tagv1".to_string()), + Value::UInt64(88888888), + ], ]; - (vec![col0, col1], vec![new_col0, new_col1]) + TestDatas { col_names, rows } } fn current_timestamp_ms() -> i64 { @@ -183,31 +220,34 @@ fn current_timestamp_ms() -> i64 { .unwrap_or(0) } -fn extract_raw_rows_from_sql_query(resp: &SqlQueryResponse) -> Vec> { - let mut raw_rows = Vec::with_capacity(resp.rows.len()); +fn extract_rows_from_sql_query(resp: &SqlQueryResponse) -> Vec> { + let mut rows = Vec::with_capacity(resp.rows.len()); for row in &resp.rows { let mut column_iter = row.columns().iter(); // In the automatically created table schema, `tsid` column will be added by // CeresDB, we just ignore it. column_iter.next(); - let col_vals = column_iter.map(|col| col.value().clone()).collect(); - raw_rows.push(col_vals); + let col_vals = column_iter + .map(|col| (col.name().to_string(), col.value().clone())) + .collect(); + rows.push(col_vals); } - raw_rows + rows } -fn format_rows(rows: &Vec>) -> Vec { - let mut formatted_rows = Vec::new(); - for row in rows { - let mut row_str = row - .iter() - .map(|col| format!("{col:?}")) - .collect::>(); - row_str.sort(); - formatted_rows.push(format!("{row_str:?}")); - } +fn format_rows(rows: &[Vec<(String, Value)>]) -> Vec { + let mut sorted_row_strs = rows + .iter() + .map(|row| { + let mut sorted_row = row.clone(); + sorted_row.sort_by(|col1, col2| col1.0.cmp(&col2.0)); + let sorted_row = sorted_row.into_iter().map(|col| col.1).collect::>(); + + format!("{sorted_row:?}") + }) + .collect::>(); + sorted_row_strs.sort(); - formatted_rows.sort(); - formatted_rows + sorted_row_strs }