Skip to content

Commit

Permalink
Merge pull request #306 from apecloud/fix/redis_two_way
Browse files Browse the repository at this point in the history
support two-way data sync for redis cluster
  • Loading branch information
qianyiwen2019 authored Jan 2, 2025
2 parents 8aaf1ea + f6bec07 commit 49976bd
Show file tree
Hide file tree
Showing 14 changed files with 1,260 additions and 106 deletions.
1 change: 1 addition & 0 deletions dt-common/src/meta/dt_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub enum DtData {
Commit {
xid: String,
},
Heartbeat {},
#[serde(skip)]
Redis {
entry: RedisEntry,
Expand Down
3 changes: 3 additions & 0 deletions dt-common/src/meta/redis/cluster_node.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

#[derive(Clone)]
pub struct ClusterNode {
pub is_master: bool,
Expand All @@ -8,4 +10,5 @@ pub struct ClusterNode {
pub port: String,
pub address: String,
pub slots: Vec<u16>,
pub slot_hash_tag_map: HashMap<u16, String>,
}
29 changes: 28 additions & 1 deletion dt-common/src/utils/redis_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::error::Error;
use crate::log_info;
use crate::meta::redis::cluster_node::ClusterNode;
use crate::meta::redis::command::cmd_encoder::CmdEncoder;
use crate::meta::redis::command::key_parser::KeyParser;
use crate::meta::redis::redis_object::RedisCmd;
use anyhow::{bail, Context};
use redis::{Connection, ConnectionLike, Value};
Expand Down Expand Up @@ -122,17 +123,34 @@ impl RedisUtil {

_ => {
bail! {Error::RedisResultError(
"redis result type can not be parsed as string".into(),
format!("redis result type can not be parsed as string, value: {:?}", value),
)}
}
}
Ok(results)
}

fn get_slot_hash_tag_map() -> HashMap<u16, String> {
let mut res: HashMap<u16, String> = HashMap::new();
for i in 0.. {
let key = i.to_string();
let slot = KeyParser::calc_slot(key.as_bytes());
// 0 to 16383
if (slot as usize) < SLOTS_COUNT && !res.contains_key(&slot) {
res.insert(slot, key);
}
if res.len() >= SLOTS_COUNT {
break;
}
}
res
}

fn parse_cluster_nodes(nodes_str: &str) -> anyhow::Result<Vec<ClusterNode>> {
// refer: https://github.com/tair-opensource/RedisShake/blob/v4/internal/utils/cluster_nodes.go
let mut all_slots_count = 0;
let mut parsed_nodes = Vec::new();
let all_slot_hash_tag_map = Self::get_slot_hash_tag_map();

log_info!("cluster nodes: {}", nodes_str);
// 5bafc7277da3038a8fbf01873179260351ed0a0a 172.28.0.13:6379@16379 master - 0 1712124938134 3 connected 12589-15758 15760-16383
Expand Down Expand Up @@ -172,6 +190,7 @@ impl RedisUtil {
host,
address,
slots: Vec::new(),
slot_hash_tag_map: HashMap::new(),
};

if !is_master {
Expand Down Expand Up @@ -211,6 +230,14 @@ impl RedisUtil {
}

all_slots_count += slots.len();
if !slots.is_empty() {
let mut node_slot_hash_tag_map = HashMap::with_capacity(slots.len());
for i in slots.iter() {
node_slot_hash_tag_map
.insert(*i, all_slot_hash_tag_map.get(i).unwrap().to_owned());
}
node.slot_hash_tag_map = node_slot_hash_tag_map;
}
node.slots = slots;
parsed_nodes.push(node);
}
Expand Down
24 changes: 15 additions & 9 deletions dt-connector/src/data_marker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashSet;

use dt_common::{
config::{config_enums::DbType, data_marker_config::DataMarkerConfig},
meta::dt_data::DtData,
meta::{dt_data::DtData, redis::redis_entry::RedisEntry},
};

#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -74,18 +74,24 @@ impl DataMarker {

pub fn is_marker_info(&self, dt_data: &DtData) -> bool {
match dt_data {
DtData::Dml { row_data } => self.is_marker_info_2(&row_data.schema, &row_data.tb),

DtData::Redis { entry } => {
let entry_key = entry.cmd.get_str_arg(1);
entry_key == self.marker
}

DtData::Dml { row_data } => self.is_rdb_marker_info(&row_data.schema, &row_data.tb),
DtData::Redis { entry } => self.is_redis_marker_info(entry),
_ => false,
}
}

pub fn is_marker_info_2(&self, schema: &str, tb: &str) -> bool {
pub fn is_redis_marker_info(&self, entry: &RedisEntry) -> bool {
let entry_key = if entry.is_raw() {
entry.key.to_string()
} else {
entry.cmd.get_str_arg(1)
};
// if self.marker is "data_marker_topo1_test",
// both "data_marker_topo1_test" and "data_marker_topo1_test{b}" can match
entry_key == self.marker || entry_key.starts_with(&format!("{}{{", self.marker))
}

pub fn is_rdb_marker_info(&self, schema: &str, tb: &str) -> bool {
self.marker_schema == schema && self.marker_tb == tb
}

Expand Down
47 changes: 27 additions & 20 deletions dt-connector/src/extractor/base_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct BaseExtractor {
impl BaseExtractor {
pub fn is_data_marker_info(&self, schema: &str, tb: &str) -> bool {
if let Some(data_marker) = &self.data_marker {
return data_marker.is_marker_info_2(schema, tb);
return data_marker.is_rdb_marker_info(schema, tb);
}
false
}
Expand All @@ -53,6 +53,29 @@ impl BaseExtractor {
return Ok(());
}

if self.refresh_and_check_data_marker(&dt_data) {
return Ok(());
}

self.monitor.counters.record_count += 1;
self.monitor.counters.data_size += dt_data.get_data_size();
self.monitor.try_flush(false);

let data_origin_node = if let Some(data_marker) = &mut self.data_marker {
data_marker.data_origin_node.clone()
} else {
String::new()
};

let item = DtItem {
dt_data,
position,
data_origin_node,
};
self.buffer.push(item).await
}

pub fn refresh_and_check_data_marker(&mut self, dt_data: &DtData) -> bool {
// data_marker does not support DDL event yet.
// user needs to ensure only one-way DDL replication exists in the topology
if let Some(data_marker) = &mut self.data_marker {
Expand All @@ -62,7 +85,7 @@ impl BaseExtractor {
if data_marker.is_marker_info(&dt_data) {
data_marker.refresh(&dt_data);
// after data_marker refreshed, discard the marker data itself
return Ok(());
return true;
} else {
// the first dml/ddl after the last transaction commit is NOT marker_info,
// then current transaction should NOT be filtered by default.
Expand All @@ -75,26 +98,10 @@ impl BaseExtractor {

// data from origin node are filtered
if data_marker.filter {
return Ok(());
return true;
}
}

self.monitor.counters.record_count += 1;
self.monitor.counters.data_size += dt_data.get_data_size();
self.monitor.try_flush(false);

let data_origin_node = if let Some(data_marker) = &mut self.data_marker {
data_marker.data_origin_node.clone()
} else {
String::new()
};

let item = DtItem {
dt_data,
position,
data_origin_node,
};
self.buffer.push(item).await
false
}

pub async fn push_row(&mut self, row_data: RowData, position: Position) -> anyhow::Result<()> {
Expand Down
28 changes: 21 additions & 7 deletions dt-connector/src/extractor/redis/redis_psync_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use dt_common::rdb_filter::RdbFilter;
use dt_common::utils::sql_util::SqlUtil;
use dt_common::utils::time_util::TimeUtil;
use dt_common::{error::Error, log_info};
use dt_common::{log_error, log_position, log_warn};
use dt_common::{log_debug, log_error, log_position, log_warn};

use crate::extractor::base_extractor::BaseExtractor;
use crate::extractor::redis::rdb::rdb_parser::RdbParser;
Expand Down Expand Up @@ -180,6 +180,12 @@ impl RedisPsyncExtractor {
self.extract_type,
ExtractType::Snapshot | ExtractType::SnapshotAndCdc
) {
if let Some(data_marker) = &self.base_extractor.data_marker {
if data_marker.is_redis_marker_info(&entry) {
continue;
}
}

Self::push_to_buf(
&mut self.base_extractor,
&mut self.filter,
Expand Down Expand Up @@ -246,11 +252,13 @@ impl RedisPsyncExtractor {

let (value, n) = self.conn.read_with_len().await?;
if Value::Nil == value {
TimeUtil::sleep_millis(1).await;
continue;
}

self.repl_offset += n as u64;
let cmd = self.handle_redis_value(value).await?;
log_debug!("received cmd: [{}]", cmd);

if !cmd.args.is_empty() {
let cmd_name = cmd.get_name().to_ascii_lowercase();
Expand Down Expand Up @@ -290,16 +298,22 @@ impl RedisPsyncExtractor {
// 1, only the first command following MULTI be considered as data marker info.
// 2, data_marker will be reset follwing EXEC.
self.base_extractor
.push_dt_data(DtData::Begin {}, position)
.await?;
// ignore MULTI & EXEC, otherwise we may get error: "MULTI calls can not be nested"
.refresh_and_check_data_marker(&DtData::Begin {});
// ignore MULTI & EXEC
continue;
}

// transaction end
if cmd_name == "exec" {
self.base_extractor
.refresh_and_check_data_marker(&DtData::Commit { xid: String::new() });
continue;
}

// transaction end or a single ping(should NOT be in a transaction)
if cmd_name == "exec" || cmd_name == "ping" {
// a single ping(should NOT be in a transaction)
if cmd_name == "ping" {
self.base_extractor
.push_dt_data(DtData::Commit { xid: String::new() }, position)
.push_dt_data(DtData::Heartbeat {}, position)
.await?;
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion dt-connector/src/extractor/redis/redis_resp_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl RedisRespReader {
let len = res.len();
self.read_len += len;

if len == 1 {
if len <= 1 {
return Ok(Value::Nil);
}
if len < 3 {
Expand Down
Loading

0 comments on commit 49976bd

Please sign in to comment.