Skip to content

Commit

Permalink
support filter by where conditions for snapshot task
Browse files Browse the repository at this point in the history
  • Loading branch information
qianyiwen2019 committed Dec 16, 2024
1 parent 52bbe47 commit 8f61acb
Show file tree
Hide file tree
Showing 26 changed files with 291 additions and 225 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ Refer to [test docs](./dt-tests/README.md) for details.

- Image size

| ape_dts:2.0.3 | debezium/connect:2.7 |
| ape_dts:2.0.14 | debezium/connect:2.7 |
| :-------- | :-------- |
| 86.4 MB | 1.38 GB |

Expand Down Expand Up @@ -144,4 +144,4 @@ Refer to [test docs](./dt-tests/README.md) for details.

Wechat Group

<img src="./docs/img/wechat_group.jpg" width="55%" style="display: block; margin-left: 0;"/>
<img src="./docs/img/wechat_group.png" width="55%" style="display: block; margin-left: 0;"/>
4 changes: 2 additions & 2 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@

- 镜像对比

| ape_dts:2.0.3 | debezium/connect:2.7 |
| ape_dts:2.0.14 | debezium/connect:2.7 |
| :-------- | :-------- |
| 86.4 MB | 1.38 GB |

Expand Down Expand Up @@ -142,4 +142,4 @@

微信交流群

<img src="./docs/img/wechat_group.jpg" width="55%" style="display: block; margin-left: 0;"/>
<img src="./docs/img/wechat_group.png" width="55%" style="display: block; margin-left: 0;"/>
Binary file removed docs/img/wechat_group.jpg
Binary file not shown.
Binary file added docs/img/wechat_group.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions dt-common/src/config/filter_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ pub struct FilterConfig {
pub do_structures: String,
pub do_ddls: String,
pub ignore_cmds: String,
pub where_conditions: String,
}
1 change: 1 addition & 0 deletions dt-common/src/config/task_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ impl TaskConfig {
do_ddls: loader.get_optional(FILTER, "do_ddls"),
do_structures: loader.get_with_default(FILTER, "do_structures", ASTRISK.to_string()),
ignore_cmds: loader.get_optional(FILTER, "ignore_cmds"),
where_conditions: loader.get_optional(FILTER, "where_conditions"),
})
}

Expand Down
28 changes: 28 additions & 0 deletions dt-common/src/rdb_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use regex::Regex;
use serde::{Deserialize, Serialize};

type IgnoreCols = HashMap<(String, String), HashSet<String>>;
type WhereConditions = HashMap<(String, String), String>;

const JSON_PREFIX: &str = "json:";

Expand All @@ -31,6 +32,7 @@ pub struct RdbFilter {
pub do_structures: HashSet<String>,
pub do_ddls: HashSet<String>,
pub ignore_cmds: HashSet<String>,
pub where_conditions: WhereConditions,
pub cache: HashMap<(String, String), bool>,
}

Expand All @@ -47,6 +49,7 @@ impl RdbFilter {
do_structures: Self::parse_single_tokens(&config.do_structures, db_type)?,
do_ddls: Self::parse_single_tokens(&config.do_ddls, db_type)?,
ignore_cmds: Self::parse_single_tokens(&config.ignore_cmds, db_type)?,
where_conditions: Self::parse_where_conditions(&config.where_conditions)?,
cache: HashMap::new(),
})
}
Expand Down Expand Up @@ -129,6 +132,11 @@ impl RdbFilter {
self.do_tbs.insert((schema.into(), tb.into()));
}

pub fn get_where_condition(&self, schema: &str, tb: &str) -> Option<&String> {
self.where_conditions
.get(&(schema.to_string(), tb.to_string()))
}

fn match_all(set: &HashSet<String>) -> bool {
set.len() == 1 && set.contains("*")
}
Expand Down Expand Up @@ -225,6 +233,26 @@ impl RdbFilter {
}
Ok(results)
}

fn parse_where_conditions(config_str: &str) -> anyhow::Result<WhereConditions> {
let mut results = WhereConditions::new();
if config_str.trim().is_empty() {
return Ok(results);
}
// where_conditions=json:[{"db":"test_db","tb":"tb_1","condition":"id > 1 and `age` > 100"}]
#[derive(Serialize, Deserialize)]
struct Condition {
db: String,
tb: String,
condition: String,
}
let config: Vec<Condition> =
serde_json::from_str(config_str.trim_start_matches(JSON_PREFIX))?;
for i in config {
results.insert((i.db, i.tb), i.condition);
}
Ok(results)
}
}

#[cfg(test)]
Expand Down
18 changes: 18 additions & 0 deletions dt-connector/src/extractor/base_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use dt_common::{
error::Error,
log_error, log_info, log_warn,
meta::{ddl_meta::ddl_data::DdlData, dt_queue::DtQueue, struct_meta::struct_data::StructData},
rdb_filter::RdbFilter,
utils::{sql_util::SqlUtil, time_util::TimeUtil},
};
use dt_common::{
Expand Down Expand Up @@ -140,6 +141,23 @@ impl BaseExtractor {
Ok(ddl_data)
}

pub fn get_where_sql(filter: &RdbFilter, schema: &str, tb: &str, condition: &str) -> String {
let mut res: String = String::new();
if let Some(where_condition) = filter.get_where_condition(schema, tb) {
res = format!("WHERE {}", where_condition);
}

if condition.is_empty() {
return res;
}

if res.is_empty() {
format!("WHERE {}", condition)
} else {
format!("{} AND {}", res, condition)
}
}

pub fn precheck_heartbeat(
&self,
heartbeat_interval_secs: u64,
Expand Down
60 changes: 39 additions & 21 deletions dt-connector/src/extractor/mysql/mysql_snapshot_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,11 @@ impl MysqlSnapshotExtractor {

let ignore_cols = self.filter.get_ignore_cols(&self.db, &self.tb);
let cols_str = self.build_extract_cols_str(tb_meta)?;
let sql = format!("SELECT {} FROM `{}`.`{}`", cols_str, self.db, self.tb);
let where_sql = BaseExtractor::get_where_sql(&self.filter, &self.db, &self.tb, "");
let sql = format!(
"SELECT {} FROM `{}`.`{}` {}",
cols_str, self.db, self.tb, where_sql
);

let mut rows = sqlx::query(&sql).fetch(&self.conn_pool);
while let Some(row) = rows.try_next().await.unwrap() {
Expand All @@ -164,21 +168,26 @@ impl MysqlSnapshotExtractor {
let ignore_cols = self.filter.get_ignore_cols(&self.db, &self.tb);
let cols_str = self.build_extract_cols_str(tb_meta)?;

let sql1 = format!(
"SELECT {} FROM `{}`.`{}` ORDER BY `{}` ASC LIMIT {}",
cols_str, self.db, self.tb, order_col, self.batch_size
let where_sql_1 = BaseExtractor::get_where_sql(&self.filter, &self.db, &self.tb, "");
let sql_1 = format!(
"SELECT {} FROM `{}`.`{}` {} ORDER BY `{}` ASC LIMIT {}",
cols_str, self.db, self.tb, where_sql_1, order_col, self.batch_size
);
let sql2 = format!(
"SELECT {} FROM `{}`.`{}` WHERE `{}` > ? ORDER BY `{}` ASC LIMIT {}",
cols_str, self.db, self.tb, order_col, order_col, self.batch_size

let condition_2 = format!("`{}` > ?", order_col);
let where_sql_2 =
BaseExtractor::get_where_sql(&self.filter, &self.db, &self.tb, &condition_2);
let sql_2 = format!(
"SELECT {} FROM `{}`.`{}` {} ORDER BY `{}` ASC LIMIT {}",
cols_str, self.db, self.tb, where_sql_2, order_col, self.batch_size
);

loop {
let start_value_for_bind = start_value.clone();
let query = if let ColValue::None = start_value {
sqlx::query(&sql1)
sqlx::query(&sql_1)
} else {
sqlx::query(&sql2).bind_col_value(Some(&start_value_for_bind), order_col_type)
sqlx::query(&sql_2).bind_col_value(Some(&start_value_for_bind), order_col_type)
};

let mut rows = query.fetch(&self.conn_pool);
Expand Down Expand Up @@ -234,17 +243,26 @@ impl MysqlSnapshotExtractor {
let mut start_value = resume_value;
let cols_str = self.build_extract_cols_str(tb_meta)?;

let sql1 = format!(
"SELECT {} FROM `{}`.`{}` ORDER BY `{}` ASC LIMIT {}",
cols_str, self.db, self.tb, order_col, batch_size
let where_sql_1 = BaseExtractor::get_where_sql(&self.filter, &self.db, &self.tb, "");
let sql_1 = format!(
"SELECT {} FROM `{}`.`{}` {} ORDER BY `{}` ASC LIMIT {}",
cols_str, self.db, self.tb, where_sql_1, order_col, batch_size
);
let sql2 = format!(
"SELECT {} FROM `{}`.`{}` WHERE `{}` > ? AND `{}` <= ? ORDER BY `{}` ASC LIMIT {}",
cols_str, self.db, self.tb, order_col, order_col, order_col, batch_size

let condition_2 = format!("`{}` > ? AND `{}` <= ?", order_col, order_col);
let where_sql_2 =
BaseExtractor::get_where_sql(&self.filter, &self.db, &self.tb, &condition_2);
let sql_2 = format!(
"SELECT {} FROM `{}`.`{}` {} ORDER BY `{}` ASC LIMIT {}",
cols_str, self.db, self.tb, where_sql_2, order_col, batch_size
);
let sql3 = format!(
"SELECT {} FROM `{}`.`{}` WHERE `{}` > ? ORDER BY `{}` ASC LIMIT {}",
cols_str, self.db, self.tb, order_col, order_col, batch_size

let condition_3 = format!("`{}` > ?", order_col);
let where_sql_3 =
BaseExtractor::get_where_sql(&self.filter, &self.db, &self.tb, &condition_3);
let sql_3 = format!(
"SELECT {} FROM `{}`.`{}` {} ORDER BY `{}` ASC LIMIT {}",
cols_str, self.db, self.tb, where_sql_3, order_col, batch_size
);

loop {
Expand All @@ -259,7 +277,7 @@ impl MysqlSnapshotExtractor {

if let ColValue::None = start_value {
let mut slice_count = 0;
let query = sqlx::query(&sql1);
let query = sqlx::query(&sql_1);
let mut rows = query.fetch(&self.conn_pool);
while let Some(row) = rows.try_next().await.unwrap() {
start_value =
Expand Down Expand Up @@ -295,9 +313,9 @@ impl MysqlSnapshotExtractor {
Self::get_sub_extractor_range(&start_value, i, batch_size);
let sql = if i == parallel_size - 1 {
// the last extractor
sql3.clone()
sql_3.clone()
} else {
sql2.clone()
sql_2.clone()
};

let future: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
Expand Down
30 changes: 14 additions & 16 deletions dt-connector/src/extractor/pg/pg_snapshot_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,15 @@ impl PgSnapshotExtractor {

let mut extracted_count = 0;
let mut start_value = resume_value;
let sql1 = self.build_extract_sql(tb_meta, false)?;
let sql2 = self.build_extract_sql(tb_meta, true)?;
let sql_1 = self.build_extract_sql(tb_meta, false)?;
let sql_2 = self.build_extract_sql(tb_meta, true)?;
let ignore_cols = self.filter.get_ignore_cols(&self.schema, &self.tb);
loop {
let start_value_for_bind = start_value.clone();
let query = if let ColValue::None = start_value {
sqlx::query(&sql1)
sqlx::query(&sql_1)
} else {
sqlx::query(&sql2).bind_col_value(Some(&start_value_for_bind), order_col_type)
sqlx::query(&sql_2).bind_col_value(Some(&start_value_for_bind), order_col_type)
};

let mut rows = query.fetch(&self.conn_pool);
Expand Down Expand Up @@ -183,31 +183,29 @@ impl PgSnapshotExtractor {
let ignore_cols = self.filter.get_ignore_cols(&self.schema, &self.tb);
let query_builder = RdbQueryBuilder::new_for_pg(tb_meta, ignore_cols);
let cols_str = query_builder.build_extract_cols_str()?;
let where_sql = BaseExtractor::get_where_sql(&self.filter, &self.schema, &self.tb, "");

// SELECT col_1, col_2::text FROM tb_1 WHERE col_1 > $1 ORDER BY col_1;
if let Some(order_col) = &tb_meta.basic.order_col {
if has_start_value {
let order_col_type = tb_meta.get_col_type(order_col)?;
let condition = format!(r#""{}" > $1::{}"#, order_col, order_col_type.alias);
let where_sql =
BaseExtractor::get_where_sql(&self.filter, &self.schema, &self.tb, &condition);
Ok(format!(
r#"SELECT {} FROM "{}"."{}" WHERE "{}" > $1::{} ORDER BY "{}" ASC LIMIT {}"#,
cols_str,
self.schema,
self.tb,
order_col,
order_col_type.alias,
order_col,
self.batch_size
r#"SELECT {} FROM "{}"."{}" {} ORDER BY "{}" ASC LIMIT {}"#,
cols_str, self.schema, self.tb, where_sql, order_col, self.batch_size
))
} else {
Ok(format!(
r#"SELECT {} FROM "{}"."{}" ORDER BY "{}" ASC LIMIT {}"#,
cols_str, self.schema, self.tb, order_col, self.batch_size
r#"SELECT {} FROM "{}"."{}" {} ORDER BY "{}" ASC LIMIT {}"#,
cols_str, self.schema, self.tb, where_sql, order_col, self.batch_size
))
}
} else {
Ok(format!(
r#"SELECT {} FROM "{}"."{}""#,
cols_str, self.schema, self.tb
r#"SELECT {} FROM "{}"."{}" {}"#,
cols_str, self.schema, self.tb, where_sql
))
}
}
Expand Down
2 changes: 0 additions & 2 deletions dt-task/src/task_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ impl TaskUtil {
max_connections: u32,
enable_sqlx_log: bool,
) -> anyhow::Result<Pool<MySql>> {
log_info!("mysql url: {}", url);
let mut conn_options = MySqlConnectOptions::from_str(url)?;
// The default character set is `utf8mb4`
conn_options
Expand All @@ -55,7 +54,6 @@ impl TaskUtil {
max_connections: u32,
enable_sqlx_log: bool,
) -> anyhow::Result<Pool<Postgres>> {
log_info!("pg url: {}", url);
let mut conn_options = PgConnectOptions::from_str(url)?;
conn_options
.log_statements(log::LevelFilter::Info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ CREATE TABLE Upper_Case_DB.Upper_Case_TB (
PRIMARY KEY(Id),
UNIQUE KEY(FIELD_1, field_2, Field_3)
);
```
```

CREATE TABLE test_db_1.where_condition_1 ( f_0 int, f_1 int );
CREATE TABLE test_db_1.where_condition_2 ( f_0 int, f_1 int, PRIMARY KEY (f_0) );
CREATE TABLE test_db_1.where_condition_3 ( f_0 int, f_1 int );
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ CREATE TABLE Upper_Case_DB.Upper_Case_TB (
PRIMARY KEY(Id),
UNIQUE KEY(FIELD_1, field_2, Field_3)
);
```
```

CREATE TABLE test_db_1.where_condition_1 ( f_0 int, f_1 int );
CREATE TABLE test_db_1.where_condition_2 ( f_0 int, f_1 int, PRIMARY KEY (f_0) );
CREATE TABLE test_db_1.where_condition_3 ( f_0 int, f_1 int );
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,9 @@ INSERT INTO test_db_1.set_table VALUES(1, ''), (2, 'a'), (3, 'a,b'), (4, 'a,b,c,
INSERT INTO test_db_1.ignore_cols_1 VALUES(1, 1, 1, 1),(2, 2, 2, 2);
INSERT INTO test_db_1.ignore_cols_2 VALUES(1, 1, 1, 1),(2, 2, 2, 2);

INSERT INTO Upper_Case_DB.Upper_Case_TB VALUES(1, 1, 1, 1, 1),(2, 2, 2, 2, 2);
INSERT INTO Upper_Case_DB.Upper_Case_TB VALUES(1, 1, 1, 1, 1),(2, 2, 2, 2, 2);

-- test where condition
INSERT INTO test_db_1.where_condition_1 VALUES(1, 1),(2, 2),(3, 3),(4, 4),(5, 5),(6, 6),(7, 7),(8, 8),(9, 9),(10, 10);
INSERT INTO test_db_1.where_condition_2 VALUES(1, 1),(2, 2),(3, 3),(4, 4),(5, 5),(6, 6),(7, 7),(8, 8),(9, 9),(10, 10);
INSERT INTO test_db_1.where_condition_3 VALUES(1, 1),(2, 2),(3, 3),(4, 4),(5, 5),(6, 6),(7, 7),(8, 8),(9, 9),(10, 10);
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ do_tbs=test_db_1.*,upper_case_db.*
ignore_tbs=
do_events=insert
ignore_cols=json:[{"db":"test_db_1","tb":"ignore_cols_1","ignore_cols":["f_2","f_3"]},{"db":"test_db_1","tb":"ignore_cols_2","ignore_cols":["f_3"]}]
where_conditions=json:[{"db":"test_db_1","tb":"where_condition_1","condition":"f_0 > 1"},{"db":"test_db_1","tb":"where_condition_2","condition":"f_0 > 1 AND f_1 < 9"}]

[router]
db_map=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ CREATE TABLE test_db_1.tb_2 (`id` varchar(255) NOT NULL, `value` int(11) DEFAULT

-- no primary key, can not be extracted parallelly
CREATE TABLE test_db_1.tb_3 (`id` int(11) NOT NULL, `value` int(11) DEFAULT NULL);

CREATE TABLE test_db_1.where_condition_1 ( f_0 int, f_1 int, PRIMARY KEY (f_0) );
CREATE TABLE test_db_1.where_condition_2 ( f_0 int, f_1 int, PRIMARY KEY (f_0) );
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ CREATE TABLE test_db_1.tb_2 (`id` varchar(255) NOT NULL, `value` int(11) DEFAULT

-- no primary key, can not be extracted parallelly
CREATE TABLE test_db_1.tb_3 (`id` int(11) NOT NULL, `value` int(11) DEFAULT NULL);

CREATE TABLE test_db_1.where_condition_1 ( f_0 int, f_1 int, PRIMARY KEY (f_0) );
CREATE TABLE test_db_1.where_condition_2 ( f_0 int, f_1 int, PRIMARY KEY (f_0) );
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ INSERT INTO test_db_1.tb_1 VALUES (1,1),(2,2),(3,3),(7,7),(9,9),(10,10),(11,11),
INSERT INTO test_db_1.tb_2 VALUES ("1",1),("2",2),("3",3),("7",7),("9",9),("10",10),("11",11),("12",12),("14",14),("16",16),("17",17),("18",18),("19",19);

INSERT INTO test_db_1.tb_3 VALUES (1,1),(2,2),(3,3),(7,7),(9,9),(10,10),(11,11),(12,12),(14,14),(16,16),(17,17),(18,18),(19,19);

-- test where condition
INSERT INTO test_db_1.where_condition_1 VALUES(1, 1),(2, 2),(3, 3),(4, 4),(5, 5),(6, 6),(7, 7),(8, 8),(9, 9),(10, 10);
INSERT INTO test_db_1.where_condition_2 VALUES(1, 1),(2, 2),(3, 3),(4, 4),(5, 5),(6, 6),(7, 7),(8, 8),(9, 9),(10, 10);
Loading

0 comments on commit 8f61acb

Please sign in to comment.