Skip to content

Commit

Permalink
feat(batch): support mysql_query for mysql batch ingestion (#19071)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Nov 1, 2024
1 parent 9671fa3 commit 3b8b913
Show file tree
Hide file tree
Showing 29 changed files with 1,050 additions and 113 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ deltalake = { version = "0.20.1", features = [
itertools = "0.13.0"
jsonbb = "0.1.4"
lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "2682b85" }
mysql_async = { version = "0.34", default-features = false, features = [
"default",
] }
parquet = { version = "53", features = ["async"] }
thiserror-ext = "0.1.2"
tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" }
Expand Down
73 changes: 73 additions & 0 deletions e2e_test/source_inline/tvf/mysql_query.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
control substitution on

system ok
mysql -e "DROP DATABASE IF EXISTS tvf; CREATE DATABASE tvf;"

system ok
mysql -e "
USE tvf;
CREATE TABLE test (
id bigint primary key,
v0 bit,
v1 bool,
v2 tinyint(1),
v3 tinyint(2),
v4 smallint,
v5 mediumint,
v6 integer,
v7 bigint,
v8 float,
v9 double,
v10 numeric(4, 2),
v11 decimal(4, 2),
v12 char(255),
v13 varchar(255),
v14 bit(10),
v15 tinyblob,
v16 blob,
v17 mediumblob,
v18 longblob,
v19 date,
v20 time,
v21 timestamp,
v22 json,
v23 int
);
INSERT INTO test SELECT
1 as id,
true as v0,
true as v1,
2 as v2,
3 as v3,
4 as v4,
5 as v5,
6 as v6,
7 as v7,
1.08 as v8,
1.09 as v9,
1.10 as v10,
1.11 as v11,
'char' as v12,
'varchar' as v13,
b'1010' as v14,
x'16' as v15,
x'17' as v16,
x'18' as v17,
x'19' as v18,
'2021-01-01' as v19,
'12:34:56' as v20,
'2021-01-01 12:34:56' as v21,
JSON_OBJECT('key1', 1, 'key2', 'abc') as v22,
null as v23;
"

query
select * from mysql_query('$MYSQL_HOST', '$MYSQL_TCP_PORT', '$RISEDEV_MYSQL_USER', '$MYSQL_PWD', 'tvf', 'select * from test;');
----
1 t 1 2 3 4 5 6 7 1.08 1.09 1.10 1.11 char varchar \x000a \x16 \x17 \x18 \x19 2021-01-01 12:34:56 2021-01-01 12:34:56+00:00 {"key1": 1, "key2": "abc"} NULL

system ok
mysql -e "
USE tvf;
DROP DATABASE tvf;
"
4 changes: 2 additions & 2 deletions e2e_test/source_legacy/cdc_inline/auto_schema_map_mysql.slt
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ SELECT
c_binary_255
FROM rw_mysql_types_test order by c_boolean;
----
0 NULL NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
1 NULL -128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
0 f NULL NULL -8388608 -2147483647 9223372036854775806 -10 -10000 -10000 c d \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
1 t -128 -32767 -8388608 -2147483647 -9223372036854775807 -10 -10000 -10000 a b \x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000

query TTTTTTTT
SELECT
Expand Down
12 changes: 12 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ message PostgresQueryNode {
string query = 7;
}

// NOTE(kwannoel): This will only be used in batch mode. We can change the definition as needed.
message MySqlQueryNode {
repeated plan_common.ColumnDesc columns = 1;
string hostname = 2;
string port = 3;
string username = 4;
string password = 5;
string database = 6;
string query = 7;
}

message ProjectNode {
repeated expr.ExprNode select_list = 1;
}
Expand Down Expand Up @@ -386,6 +397,7 @@ message PlanNode {
FileScanNode file_scan = 38;
IcebergScanNode iceberg_scan = 39;
PostgresQueryNode postgres_query = 40;
MySqlQueryNode mysql_query = 41;
// The following nodes are used for testing.
bool block_executor = 100;
bool busy_loop_executor = 101;
Expand Down
2 changes: 2 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ message TableFunction {
FILE_SCAN = 19;
// postgres query
POSTGRES_QUERY = 20;
// mysql query
MYSQL_QUERY = 21;
// User defined table function
USER_DEFINED = 100;
}
Expand Down
19 changes: 19 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,25 @@ profile:
address: schemaregistry
port: 8082

local-inline-source-test:
config-path: src/config/ci-recovery.toml
steps:
- use: minio
- use: sqlite
- use: meta-node
meta-backend: sqlite
- use: compute-node
enable-tiered-cache: true
- use: frontend
- use: compactor
- use: pubsub
persist-data: true
- use: kafka
persist-data: true
- use: schema-registry
- use: mysql
- use: postgres

ci-inline-source-test:
config-path: src/config/ci-recovery.toml
steps:
Expand Down
3 changes: 3 additions & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ assert_matches = "1"
async-recursion = "1"
async-trait = "0.1"
bytes = "1"
chrono = "0.4"
either = "1"
foyer = { workspace = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
Expand All @@ -29,6 +30,7 @@ hytra = "0.1.2"
iceberg = { workspace = true }
itertools = { workspace = true }
memcomparable = "0.2"
mysql_async = { workspace = true }
opendal = { workspace = true }
parking_lot = { workspace = true }
parquet = { workspace = true }
Expand All @@ -45,6 +47,7 @@ risingwave_hummock_sdk = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
risingwave_storage = { workspace = true }
rust_decimal = "1"
rw_futures_util = { workspace = true }
scopeguard = "1"
serde_json = "1"
Expand Down
9 changes: 6 additions & 3 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
use std::sync::Arc;

pub use anyhow::anyhow;
use iceberg::Error as IcebergError;
use mysql_async::Error as MySqlError;
use parquet::errors::ParquetError;
use risingwave_common::array::ArrayError;
use risingwave_common::error::{def_anyhow_newtype, def_anyhow_variant, BoxedError};
Expand All @@ -29,7 +31,7 @@ use risingwave_rpc_client::error::{RpcError, ToTonicStatus};
use risingwave_storage::error::StorageError;
use thiserror::Error;
use thiserror_ext::Construct;
use tokio_postgres;
use tokio_postgres::Error as PostgresError;
use tonic::Status;

use crate::worker_manager::worker_node_manager::FragmentId;
Expand Down Expand Up @@ -192,7 +194,8 @@ def_anyhow_variant! {
pub BatchExternalSystemError,
BatchError ExternalSystemError,

tokio_postgres::Error => "Postgres error",
iceberg::Error => "Iceberg error",
PostgresError => "Postgres error",
IcebergError => "Iceberg error",
ParquetError => "Parquet error",
MySqlError => "MySQL error",
}
3 changes: 3 additions & 0 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mod managed;
mod max_one_row;
mod merge_sort;
mod merge_sort_exchange;
mod mysql_query;
mod order_by;
mod postgres_query;
mod project;
Expand Down Expand Up @@ -65,6 +66,7 @@ pub use managed::*;
pub use max_one_row::*;
pub use merge_sort::*;
pub use merge_sort_exchange::*;
pub use mysql_query::*;
pub use order_by::*;
pub use postgres_query::*;
pub use project::*;
Expand Down Expand Up @@ -247,6 +249,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {
NodeBody::FileScan => FileScanExecutorBuilder,
NodeBody::IcebergScan => IcebergScanExecutorBuilder,
NodeBody::PostgresQuery => PostgresQueryExecutorBuilder,
NodeBody::MysqlQuery => MySqlQueryExecutorBuilder,
// Follow NodeBody only used for test
NodeBody::BlockExecutor => BlockExecutorBuilder,
NodeBody::BusyLoopExecutor => BusyLoopExecutorBuilder,
Expand Down
Loading

0 comments on commit 3b8b913

Please sign in to comment.