Skip to content

Commit

Permalink
1, sink mysql json columns. 2, sink redis rdb data by rewrite besides…
Browse files Browse the repository at this point in the history
… restore. 3, sink basic ddls for mysql
  • Loading branch information
qianyiwen2019 committed Aug 15, 2023
1 parent 71477b4 commit fdb1c30
Show file tree
Hide file tree
Showing 101 changed files with 2,005 additions and 563 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,5 @@ uuid = { version = "1.3.1", features = ["v4"] }
nom = "7.1.3"
mongodb = { version = "2.5.0" }
dotenv = "0.15.0"
redis = {git = "https://github.com/qianyiwen2019/redis-rs", features = ["tokio-comp"]}
redis = { version = "0.23.1", features = ["tokio-comp"] }
thiserror = "1.0.44"
3 changes: 2 additions & 1 deletion dt-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ futures = { workspace = true }
serde_yaml = { workspace = true }
regex = { workspace = true }
nom = { workspace = true }
tokio = { workspace = true }
tokio = { workspace = true }
thiserror = { workspace = true }
6 changes: 3 additions & 3 deletions dt-common/src/config/config_enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use strum::{Display, EnumString, IntoStaticStr};

use crate::error::Error;

#[derive(Clone, Display, EnumString, IntoStaticStr, Debug)]
#[derive(Clone, Display, EnumString, IntoStaticStr, Debug, PartialEq, Eq)]
pub enum DbType {
#[strum(serialize = "mysql")]
Mysql,
Expand Down Expand Up @@ -36,7 +36,7 @@ pub enum ExtractType {
Basic,
}

#[derive(EnumString, IntoStaticStr)]
#[derive(Display, EnumString, IntoStaticStr)]
pub enum SinkType {
#[strum(serialize = "write")]
Write,
Expand Down Expand Up @@ -73,7 +73,7 @@ pub enum RouteType {
Tb,
}

#[derive(Clone, IntoStaticStr)]
#[derive(Clone, Debug, IntoStaticStr)]
pub enum ConflictPolicyEnum {
#[strum(serialize = "ignore")]
Ignore,
Expand Down
2 changes: 1 addition & 1 deletion dt-common/src/config/extractor_config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::config_enums::DbType;

#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum ExtractorConfig {
Basic {
url: String,
Expand Down
3 changes: 2 additions & 1 deletion dt-common/src/config/sinker_config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::config_enums::{ConflictPolicyEnum, DbType};

#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum SinkerConfig {
Basic {
url: String,
Expand Down Expand Up @@ -69,6 +69,7 @@ pub enum SinkerConfig {
Redis {
url: String,
batch_size: usize,
method: String,
},
}

Expand Down
34 changes: 21 additions & 13 deletions dt-common/src/config/task_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,10 @@ impl TaskConfig {

ExtractType::Basic => Ok(ExtractorConfig::Basic { url, db_type }),

t => Err(Error::Unexpected {
error: format!("extract type: {} not supported", t),
}),
extract_type => Err(Error::ConfigError(format!(
"extract type: {} not supported",
extract_type
))),
},

DbType::Redis => {
Expand All @@ -179,15 +180,17 @@ impl TaskConfig {
now_db_id: ini.getint(EXTRACTOR, "now_db_id").unwrap().unwrap(),
}),

t => Err(Error::Unexpected {
error: format!("extract type: {} not supported", t),
}),
extract_type => Err(Error::ConfigError(format!(
"extract type: {} not supported",
extract_type
))),
}
}

_ => Err(Error::Unexpected {
error: "extractor db type not supported".to_string(),
}),
db_type => Err(Error::ConfigError(format!(
"extractor db type: {} not supported",
db_type
))),
}
}

Expand Down Expand Up @@ -240,9 +243,10 @@ impl TaskConfig {

SinkType::Basic => Ok(SinkerConfig::Basic { url, db_type }),

_ => Err(Error::Unexpected {
error: "sinker db type not supported".to_string(),
}),
db_type => Err(Error::ConfigError(format!(
"sinker db type: {} not supported",
db_type
))),
},

DbType::Kafka => Ok(SinkerConfig::Kafka {
Expand All @@ -267,7 +271,11 @@ impl TaskConfig {
root_dir: ini.get(SINKER, "root_dir").unwrap(),
}),

DbType::Redis => Ok(SinkerConfig::Redis { url, batch_size }),
DbType::Redis => Ok(SinkerConfig::Redis {
url,
batch_size,
method: Self::get_optional_value(ini, SINKER, "method"),
}),
}
}

Expand Down
84 changes: 27 additions & 57 deletions dt-common/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,70 +1,40 @@
#[derive(Debug)]
use thiserror::Error;

#[derive(Error, Debug)]
pub enum Error {
ConfigError {
error: String,
},
#[error("config error: {0}")]
ConfigError(String),

BinlogError {
error: mysql_binlog_connector_rust::binlog_error::BinlogError,
},
#[error("extractor error: {0}")]
ExtractorError(String),

SqlxError {
error: sqlx::Error,
},
#[error("sinker error: {0}")]
SinkerError(String),

Unexpected {
error: String,
},
#[error("pull mysql binlog error: {0}")]
BinlogError(#[from] mysql_binlog_connector_rust::binlog_error::BinlogError),

MetadataError {
error: String,
},
#[error("sqlx error: {0}")]
SqlxError(#[from] sqlx::Error),

IoError {
error: std::io::Error,
},
#[error("unexpected error: {0}")]
Unexpected(String),

YamlError {
error: serde_yaml::Error,
},
#[error("parse redis rdb error: {0}")]
RedisRdbError(String),

EnvVarError {
error: std::env::VarError,
},
#[error("metadata error: {0}")]
MetadataError(String),

StructError {
error: String,
},
#[error("io error: {0}")]
IoError(#[from] std::io::Error),

ColumnNotMatch,
}
#[error("yaml error: {0}")]
YamlError(#[from] serde_yaml::Error),

impl From<mysql_binlog_connector_rust::binlog_error::BinlogError> for Error {
fn from(err: mysql_binlog_connector_rust::binlog_error::BinlogError) -> Self {
Self::BinlogError { error: err }
}
}

impl From<sqlx::Error> for Error {
fn from(err: sqlx::Error) -> Self {
Self::SqlxError { error: err }
}
}

impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
Self::IoError { error: err }
}
}

impl From<serde_yaml::Error> for Error {
fn from(err: serde_yaml::Error) -> Self {
Self::YamlError { error: err }
}
}
#[error("from utf8 error: {0}")]
FromUtf8Error(#[from] std::string::FromUtf8Error),

impl From<std::env::VarError> for Error {
fn from(err: std::env::VarError) -> Self {
Self::EnvVarError { error: err }
}
#[error("struct error: {0}")]
StructError(String),
}
2 changes: 0 additions & 2 deletions dt-common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// extern crate dt-meta;

pub mod config;
pub mod constants;
pub mod error;
Expand Down
19 changes: 15 additions & 4 deletions dt-common/src/utils/rdb_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub struct RdbFilter {
pub cache: HashMap<(String, String), bool>,
}

const DDL: &str = "ddl";

impl RdbFilter {
pub fn from_config(config: &FilterConfig, db_type: DbType) -> Result<Self, Error> {
match config {
Expand Down Expand Up @@ -80,12 +82,20 @@ impl RdbFilter {
}

pub fn filter_event(&mut self, db: &str, tb: &str, row_type: &str) -> bool {
if self.do_events.is_empty() || !self.do_events.contains(&row_type.to_string()) {
if self.do_events.is_empty() || !self.do_events.contains(row_type) {
return false;
}
self.filter_tb(db, tb)
}

pub fn filter_ddl(&mut self) -> bool {
// filter ddl by default
if self.do_events.is_empty() {
return true;
}
!self.do_events.contains(DDL)
}

fn contain_tb(
set: &HashSet<(String, String)>,
db: &str,
Expand Down Expand Up @@ -175,9 +185,10 @@ impl RdbFilter {
let tokens = ConfigTokenParser::parse(config_str, &delimiters, escape_pairs);
for token in tokens.iter() {
if !SqlUtil::is_valid_token(token, db_type, escape_pairs) {
return Err(Error::ConfigError {
error: format!("invalid filter config, check error near: {}", token),
});
return Err(Error::ConfigError(format!(
"invalid filter config, check error near: {}",
token
)));
}
}
Ok(tokens)
Expand Down
4 changes: 0 additions & 4 deletions dt-connector/src/extractor/mongo/mongo_cdc_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ impl Extractor for MongoCdcExtractor {
);
self.extract_internal().await
}

async fn close(&mut self) -> Result<(), Error> {
Ok(())
}
}

impl MongoCdcExtractor {
Expand Down
4 changes: 0 additions & 4 deletions dt-connector/src/extractor/mongo/mongo_snapshot_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ impl Extractor for MongoSnapshotExtractor {
);
self.extract_internal().await
}

async fn close(&mut self) -> Result<(), Error> {
Ok(())
}
}

impl MongoSnapshotExtractor {
Expand Down
Loading

0 comments on commit fdb1c30

Please sign in to comment.