Skip to content

Commit

Permalink
add redis-2-redis cdc and snapshot tests for redis versions: 2.8/4.0/…
Browse files Browse the repository at this point in the history
…5.0/6.0/6.2/7.0/rejson/rebloom
  • Loading branch information
qianyiwen2019 committed Aug 6, 2023
1 parent aa4809f commit 71477b4
Show file tree
Hide file tree
Showing 275 changed files with 8,658 additions and 286 deletions.
3 changes: 2 additions & 1 deletion dt-connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ url = { workspace = true }
log = { workspace = true }
log4rs = { workspace = true }
redis = { workspace = true }
thiserror = "1.0"
thiserror = "1.0"
async-std = "1.12.0"
18 changes: 0 additions & 18 deletions dt-connector/src/extractor/redis/rdb/entry_parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,3 @@ const RDB_MODULE_OPCODE_UINT: u8 = 2;
const RDB_MODULE_OPCODE_FLOAT: u8 = 3;
const RDB_MODULE_OPCODE_DOUBLE: u8 = 4;
const RDB_MODULE_OPCODE_STRING: u8 = 5;

// const STRING_TYPE: &str = "string";
// const LIST_TYPE: &str = "list";
// const SET_TYPE: &str = "set";
// const HASH_TYPE: &str = "hash";
// const ZSET_TYPE: &str = "zset";
// const AUX_TYPE: &str = "aux";
// const DB_SIZE_TYPE: &str = "dbsize";

// pub trait RedisObject {
// fn load_from_buffer(
// &mut self,
// reader: &mut RdbReader,
// key: &str,
// type_byte: u8,
// ) -> Result<(), Error>;
// fn rewrite(&self) -> Result<Vec<RedisCmd>, Error>;
// }
11 changes: 8 additions & 3 deletions dt-connector/src/extractor/redis/rdb/entry_parser/set_parser.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use byteorder::{ByteOrder, LittleEndian};
use std::io::Cursor;

use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
use dt_common::error::Error;
use dt_meta::redis::redis_object::{RedisString, SetObject};

Expand Down Expand Up @@ -36,8 +38,11 @@ impl SetLoader {
}

pub fn read_int_set(obj: &mut SetObject, reader: &mut RdbReader) -> Result<(), Error> {
let encoding_type = reader.read_u32()? as usize;
let size = reader.read_u32()?;
let buf = reader.read_string()?;
let mut reader = Cursor::new(buf.as_bytes());

let encoding_type = reader.read_u32::<LittleEndian>()? as usize;
let size = reader.read_u32::<LittleEndian>()?;
for _ in 0..size {
let buf = reader.read_raw(encoding_type)?;
let int_str = match encoding_type {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ impl StreamLoader {
let n_list_pack = reader.read_length()?;
for _ in 0..n_list_pack {
// Load key
let key = reader.read_string()?;

// key is streamId, like: 1612181627287-0
let key = reader.read_string()?;
let master_ms = BigEndian::read_i64(&key.as_bytes()[..8]); // ms
let master_seq = BigEndian::read_i64(&key.as_bytes()[8..]);

Expand Down Expand Up @@ -165,8 +164,8 @@ impl StreamLoader {

for _ in 0..n_pel {
// Load streamId
let ms = reader.read_u64()?;
let seq = reader.read_u64()?;
let ms = reader.read_be_u64()?;
let seq = reader.read_be_u64()?;
let stream_id = format!("{}-{}", ms, seq);

// Load deliveryTime
Expand Down Expand Up @@ -194,8 +193,8 @@ impl StreamLoader {
let n_pel = reader.read_length()?;
for _i in 0..n_pel {
// Load streamId
let ms = reader.read_u64()?;
let seq = reader.read_u64()?;
let ms = reader.read_be_u64()?;
let seq = reader.read_be_u64()?;
let stream_id = format!("{}-{}", ms, seq);

/* Send */
Expand Down
45 changes: 30 additions & 15 deletions dt-connector/src/extractor/redis/rdb/rdb_loader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use dt_common::{error::Error, log_debug, log_info};
use dt_meta::redis::redis_entry::RedisEntry;
use dt_common::{error::Error, log_info};
use dt_meta::redis::{redis_entry::RedisEntry, redis_object::RedisCmd};
use sqlx::types::chrono;

use crate::extractor::redis::RawByteReader;
Expand Down Expand Up @@ -69,9 +69,17 @@ impl RdbLoader<'_> {
}

"lua" => {
// let e = Entry::new_base_entry(vec!["script", "load", &value]);
// ch.send(e).await?;
// log::info!("LUA script: {}", value);
let mut cmd = RedisCmd::new();
cmd.add_str_arg("script");
cmd.add_str_arg("load");
cmd.add_redis_arg(&value);
log_info!("LUA script: {:?}", value);

let mut entry = RedisEntry::new();
entry.is_base = true;
entry.db_id = self.now_db_id;
entry.cmd = cmd;
return Ok(Some(entry));
}

_ => {
Expand Down Expand Up @@ -121,18 +129,25 @@ impl RdbLoader<'_> {
_ => {
let key = self.reader.read_string()?;
self.reader.copy_raw = true;
let value =
EntryParser::parse_object(&mut self.reader, type_byte, key.clone()).unwrap();
let value = EntryParser::parse_object(&mut self.reader, type_byte, key.clone());
self.reader.copy_raw = false;

let mut entry = RedisEntry::new();
entry.is_base = true;
entry.db_id = self.now_db_id;
entry.raw_bytes = self.reader.drain_raw_bytes();
entry.key = key;
entry.value = value;
entry.value_type_byte = type_byte;
return Ok(Some(entry));
if let Err(error) = value {
panic!(
"parsing rdb failed, key: {:?}, error: {:?}",
String::from(key),
error
);
} else {
let mut entry = RedisEntry::new();
entry.is_base = true;
entry.db_id = self.now_db_id;
entry.raw_bytes = self.reader.drain_raw_bytes();
entry.key = key;
entry.value = value.unwrap();
entry.value_type_byte = type_byte;
return Ok(Some(entry));
}
}
}

Expand Down
12 changes: 11 additions & 1 deletion dt-connector/src/extractor/redis/rdb/reader/int.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::extractor::redis::RawByteReader;

use super::rdb_reader::RdbReader;
use byteorder::{ByteOrder, LittleEndian};
use byteorder::{BigEndian, ByteOrder, LittleEndian};
use dt_common::error::Error;

impl RdbReader<'_> {
Expand Down Expand Up @@ -29,6 +29,11 @@ impl RdbReader<'_> {
Ok(LittleEndian::read_u64(&buf))
}

pub fn read_be_u64(&mut self) -> Result<u64, Error> {
let buf = self.read_raw(8)?;
Ok(BigEndian::read_u64(&buf))
}

pub fn read_i8(&mut self) -> Result<i8, Error> {
Ok(self.read_byte()? as i8)
}
Expand All @@ -52,4 +57,9 @@ impl RdbReader<'_> {
let buf = self.read_raw(8)?;
Ok(LittleEndian::read_i64(&buf))
}

pub fn read_be_i64(&mut self) -> Result<i64, Error> {
let buf = self.read_raw(8)?;
Ok(BigEndian::read_i64(&buf))
}
}
36 changes: 0 additions & 36 deletions dt-connector/src/extractor/redis/rdb/reader/int_set.rs

This file was deleted.

8 changes: 4 additions & 4 deletions dt-connector/src/extractor/redis/rdb/reader/length.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::extractor::redis::RawByteReader;

use super::rdb_reader::RdbReader;
use byteorder::{ByteOrder, LittleEndian};
use byteorder::{BigEndian, ByteOrder};
use dt_common::error::Error;

const RDB_6_BIT_LEN: u8 = 0;
Expand Down Expand Up @@ -34,20 +34,20 @@ impl RdbReader<'_> {

RDB_14_BIT_LEN => {
let next_byte = self.read_byte()?;
let len = u64::from(first_byte) & 0x3f << 8 | u64::from(next_byte);
let len = (u64::from(first_byte) & 0x3f) << 8 | u64::from(next_byte);
Ok((len, false))
}

RDB_32_OR_64_BIT_LEN => match first_byte {
RDB_32_BIT_LEN => {
let next_bytes = self.read_raw(4)?;
let len = LittleEndian::read_u32(&next_bytes) as u64;
let len = BigEndian::read_u32(&next_bytes) as u64;
Ok((len, false))
}

RDB_64_BIT_LEN => {
let next_bytes = self.read_raw(8)?;
let len = LittleEndian::read_u64(&next_bytes) as u64;
let len = BigEndian::read_u64(&next_bytes) as u64;
Ok((len, false))
}

Expand Down
41 changes: 22 additions & 19 deletions dt-connector/src/extractor/redis/rdb/reader/list_pack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,64 +58,68 @@ impl RdbReader<'_> {
Ok(elements)
}

// https://github.com/redis/redis/blob/unstable/src/listpack.c lpGetWithSize
fn read_listpack_entry(reader: &mut Cursor<&[u8]>) -> Result<RedisString, Error> {
let mut val: i64;
let mut uval: u64;
let negstart: u64;
let negmax: u64;

let fire_byte = reader.read_u8()?;
if (fire_byte & LP_ENCODING_7BIT_UINT_MASK) == LP_ENCODING_7BIT_UINT {
let first_byte = reader.read_u8()?;
if (first_byte & LP_ENCODING_7BIT_UINT_MASK) == LP_ENCODING_7BIT_UINT {
// 7bit uint
uval = u64::from(fire_byte & 0x7f); // 0x7f is 01111111
uval = u64::from(first_byte & 0x7f); // 0x7f is 01111111
negmax = 0;
negstart = u64::MAX; // uint
negstart = u64::MAX; // 7 bit ints are always positive
let _ = reader.read_raw(Self::lp_encode_backlen(1))?; // encode: 1 byte
} else if (fire_byte & LP_ENCODING_6BIT_STR_MASK) == LP_ENCODING_6BIT_STR {
} else if (first_byte & LP_ENCODING_6BIT_STR_MASK) == LP_ENCODING_6BIT_STR {
// 6bit length str
let length = usize::from(fire_byte & 0x3f); // 0x3f is 00111111
let length = usize::from(first_byte & 0x3f); // 0x3f is 00111111
let ele = reader.read_raw(length)?;
let _ = reader.read_raw(Self::lp_encode_backlen(1 + length)); // encode: 1byte, str: length
return Ok(RedisString::from(ele));
} else if (fire_byte & LP_ENCODING_13BIT_INT_MASK) == LP_ENCODING_13BIT_INT {

let ele = RedisString::from(ele);
return Ok(ele);
// return Ok(RedisString::from(ele));
} else if (first_byte & LP_ENCODING_13BIT_INT_MASK) == LP_ENCODING_13BIT_INT {
// 13bit int
let second_byte = reader.read_u8()?;
uval = (u64::from(fire_byte & 0x1f) << 8) + u64::from(second_byte); // 5bit + 8bit, 0x1f is 00011111
uval = (u64::from(first_byte & 0x1f) << 8) | u64::from(second_byte); // 5bit + 8bit, 0x1f is 00011111
negstart = (1 as u64) << 12;
negmax = 8191; // uint13_max
let _ = reader.read_raw(Self::lp_encode_backlen(2));
} else if (fire_byte & LP_ENCODING_16BIT_INT_MASK) == LP_ENCODING_16BIT_INT {
} else if (first_byte & LP_ENCODING_16BIT_INT_MASK) == LP_ENCODING_16BIT_INT {
// 16bit int
uval = reader.read_u16::<LittleEndian>()? as u64;
negstart = (1 as u64) << 15;
negmax = 65535; // uint16_max
negmax = u16::MAX as u64;
let _ = reader.read_raw(Self::lp_encode_backlen(2)); // encode: 1byte, int: 2
} else if (fire_byte & LP_ENCODING_24BIT_INT_MASK) == LP_ENCODING_24BIT_INT {
} else if (first_byte & LP_ENCODING_24BIT_INT_MASK) == LP_ENCODING_24BIT_INT {
// 24bit int
uval = reader.read_u24::<LittleEndian>()? as u64;
negstart = (1 as u64) << 23;
negmax = (u32::MAX >> 8) as u64; // uint24_max
let _ = reader.read_raw(Self::lp_encode_backlen(1 + 3)); // encode: 1byte, int: 3byte
} else if (fire_byte & LP_ENCODING_32BIT_INT_MASK) == LP_ENCODING_32BIT_INT {
} else if (first_byte & LP_ENCODING_32BIT_INT_MASK) == LP_ENCODING_32BIT_INT {
// 32bit int
uval = reader.read_u32::<LittleEndian>()? as u64;
negstart = (1 as u64) << 31;
negmax = u32::MAX as u64; // uint32_max
let _ = reader.read_raw(Self::lp_encode_backlen(1 + 4)); // encode: 1byte, int: 4byte
} else if (fire_byte & LP_ENCODING_64BIT_INT_MASK) == LP_ENCODING_64BIT_INT {
} else if (first_byte & LP_ENCODING_64BIT_INT_MASK) == LP_ENCODING_64BIT_INT {
// 64bit int
uval = reader.read_u64::<LittleEndian>()?;
negstart = (1 as u64) << 63;
negmax = u64::MAX; // uint64_max
let _ = reader.read_raw(Self::lp_encode_backlen(1 + 8)); // encode: 1byte, int: 8byte
} else if (fire_byte & LP_ENCODING_12BIT_STR_MASK) == LP_ENCODING_12BIT_STR {
} else if (first_byte & LP_ENCODING_12BIT_STR_MASK) == LP_ENCODING_12BIT_STR {
// 12bit length str
let second_byte = reader.read_u8()?;
let length = (((fire_byte as usize) & 0x0f) << 8) + second_byte as usize; // 4bit + 8bit
let length = (((first_byte as usize) & 0x0f) << 8) + second_byte as usize; // 4bit + 8bit
let ele = reader.read_raw(length)?;
let _ = reader.read_raw(Self::lp_encode_backlen(2 + length)); // encode: 2byte, str: length
return Ok(RedisString::from(ele));
} else if (fire_byte & LP_ENCODING_32BIT_STR_MASK) == LP_ENCODING_32BIT_STR {
} else if (first_byte & LP_ENCODING_32BIT_STR_MASK) == LP_ENCODING_32BIT_STR {
// 32bit length str
let length = reader.read_u32::<LittleEndian>()? as usize;
let ele = reader.read_raw(length)?;
Expand All @@ -127,7 +131,7 @@ impl RdbReader<'_> {
// negstart = math.MaxUint64
// negmax = 0
return Err(Error::Unexpected {
error: format!("unknown encoding: {}", fire_byte).to_string(),
error: format!("unknown encoding: {}", first_byte).to_string(),
});
}

Expand All @@ -143,7 +147,6 @@ impl RdbReader<'_> {
} else {
val = uval as i64;
}

Ok(RedisString::from(val.to_string()))
}

Expand Down
2 changes: 1 addition & 1 deletion dt-connector/src/extractor/redis/rdb/reader/rdb_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl RdbReader<'_> {

impl RawByteReader for RdbReader<'_> {
fn read_raw(&mut self, length: usize) -> Result<Vec<u8>, Error> {
let (buf, _n) = block_on(self.conn.recv_raw(length)).unwrap();
let buf = block_on(self.conn.read_raw(length)).unwrap();
self.position += length;
if self.copy_raw {
self.raw_bytes.extend_from_slice(&buf);
Expand Down
1 change: 0 additions & 1 deletion dt-connector/src/extractor/redis/rdb/reader/string.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use dt_common::error::Error;
use dt_meta::redis::redis_object::RedisString;
use rdkafka::message::ToBytes;

use crate::extractor::redis::RawByteReader;

Expand Down
9 changes: 5 additions & 4 deletions dt-connector/src/extractor/redis/rdb/reader/zip_list.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::io::Cursor;

use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
use byteorder::{BigEndian, ByteOrder, LittleEndian, ReadBytesExt};
use dt_common::error::Error;
use dt_meta::redis::redis_object::RedisString;

Expand Down Expand Up @@ -71,8 +71,9 @@ impl RdbReader<'_> {
}

// read encoding
let first2bits = (first_byte & 0xc0) >> 6; // first 2 bits of encoding
match first2bits {
let first_byte = reader.read_raw(1)?[0];
let first_2_bits = (first_byte & 0xc0) >> 6; // first 2 bits of encoding
match first_2_bits {
ZIP_STR_06B => {
let length = (first_byte & 0x3f) as usize; // 0x3f = 00111111
let buf = reader.read_raw(length)?;
Expand All @@ -88,7 +89,7 @@ impl RdbReader<'_> {

ZIP_STR_32B => {
let mut buf = reader.read_raw(4)?;
let length = LittleEndian::read_u32(&buf);
let length = BigEndian::read_u32(&buf);
buf = reader.read_raw(length as usize)?;
return Ok(RedisString::from(buf));
}
Expand Down
Loading

0 comments on commit 71477b4

Please sign in to comment.