Skip to content

Commit 7a067b8

Browse files
authored
Merge pull request redis-rs#445 from doyshinda/refactor/cluster_routing
Refactor cluster routing logic into its own file
2 parents 1bb7df3 + ea6b1f5 commit 7a067b8

File tree

3 files changed

+186
-177
lines changed

3 files changed

+186
-177
lines changed

src/cluster.rs

Lines changed: 11 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,12 @@ use rand::{
5050
};
5151

5252
use super::{
53-
cmd, parse_redis_value, Cmd, Connection, ConnectionAddr, ConnectionInfo, ConnectionLike,
54-
ErrorKind, IntoConnectionInfo, RedisError, RedisResult, Value,
53+
cmd, Cmd, Connection, ConnectionAddr, ConnectionInfo, ConnectionLike, ErrorKind,
54+
IntoConnectionInfo, RedisError, RedisResult, Value,
5555
};
5656

5757
pub use crate::cluster_client::{ClusterClient, ClusterClientBuilder};
58-
59-
const SLOT_SIZE: usize = 16384;
58+
use crate::cluster_routing::{RoutingInfo, Slot, SLOT_SIZE};
6059

6160
type SlotMap = BTreeMap<u16, String>;
6261

@@ -128,7 +127,7 @@ impl ClusterConnection {
128127
pub fn check_connection(&mut self) -> bool {
129128
let mut connections = self.connections.borrow_mut();
130129
for conn in connections.values_mut() {
131-
if !check_connection(conn) {
130+
if !conn.check_connection() {
132131
return false;
133132
}
134133
}
@@ -157,7 +156,7 @@ impl ClusterConnection {
157156
};
158157

159158
if let Ok(mut conn) = connect(info.clone(), readonly, password.clone()) {
160-
if check_connection(&mut conn) {
159+
if conn.check_connection() {
161160
connections.insert(addr, conn);
162161
break;
163162
}
@@ -199,7 +198,7 @@ impl ClusterConnection {
199198
if !new_connections.contains_key(addr) {
200199
if connections.contains_key(addr) {
201200
let mut conn = connections.remove(addr).unwrap();
202-
if check_connection(&mut conn) {
201+
if conn.check_connection() {
203202
new_connections.insert(addr.to_string(), conn);
204203
continue;
205204
}
@@ -208,7 +207,7 @@ impl ClusterConnection {
208207
if let Ok(mut conn) =
209208
connect(addr.as_ref(), self.readonly, self.password.clone())
210209
{
211-
if check_connection(&mut conn) {
210+
if conn.check_connection() {
212211
new_connections.insert(addr.to_string(), conn);
213212
}
214213
}
@@ -240,7 +239,9 @@ impl ClusterConnection {
240239
"Slot refresh error.",
241240
format!(
242241
"Received overlapping slots {} and {}..{}",
243-
prev_end, slot_data.start, slot_data.end
242+
prev_end,
243+
slot_data.start(),
244+
slot_data.end()
244245
),
245246
)));
246247
}
@@ -519,12 +520,6 @@ where
519520
Ok(con)
520521
}
521522

522-
fn check_connection(conn: &mut Connection) -> bool {
523-
let mut cmd = Cmd::new();
524-
cmd.arg("PING");
525-
cmd.query::<String>(conn).is_ok()
526-
}
527-
528523
fn get_random_connection<'a>(
529524
connections: &'a mut HashMap<String, Connection>,
530525
excludes: Option<&'a HashSet<String>>,
@@ -544,128 +539,6 @@ fn get_random_connection<'a>(
544539
(addr, con)
545540
}
546541

547-
fn get_hashtag(key: &[u8]) -> Option<&[u8]> {
548-
let open = key.iter().position(|v| *v == b'{');
549-
let open = match open {
550-
Some(open) => open,
551-
None => return None,
552-
};
553-
554-
let close = key[open..].iter().position(|v| *v == b'}');
555-
let close = match close {
556-
Some(close) => close,
557-
None => return None,
558-
};
559-
560-
let rv = &key[open + 1..open + close];
561-
if rv.is_empty() {
562-
None
563-
} else {
564-
Some(rv)
565-
}
566-
}
567-
568-
#[derive(Debug, Clone, Copy, PartialEq)]
569-
enum RoutingInfo {
570-
AllNodes,
571-
AllMasters,
572-
Random,
573-
Slot(u16),
574-
}
575-
576-
fn get_arg(values: &[Value], idx: usize) -> Option<&[u8]> {
577-
match values.get(idx) {
578-
Some(Value::Data(ref data)) => Some(&data[..]),
579-
_ => None,
580-
}
581-
}
582-
583-
fn get_command_arg(values: &[Value], idx: usize) -> Option<Vec<u8>> {
584-
get_arg(values, idx).map(|x| x.to_ascii_uppercase())
585-
}
586-
587-
fn get_u64_arg(values: &[Value], idx: usize) -> Option<u64> {
588-
get_arg(values, idx)
589-
.and_then(|x| std::str::from_utf8(x).ok())
590-
.and_then(|x| x.parse().ok())
591-
}
592-
593-
impl RoutingInfo {
594-
pub fn for_packed_command(cmd: &[u8]) -> Option<RoutingInfo> {
595-
parse_redis_value(cmd).ok().and_then(RoutingInfo::for_value)
596-
}
597-
598-
pub fn for_value(value: Value) -> Option<RoutingInfo> {
599-
let args = match value {
600-
Value::Bulk(args) => args,
601-
_ => return None,
602-
};
603-
604-
match &get_command_arg(&args, 0)?[..] {
605-
b"FLUSHALL" | b"FLUSHDB" | b"SCRIPT" => Some(RoutingInfo::AllMasters),
606-
b"ECHO" | b"CONFIG" | b"CLIENT" | b"SLOWLOG" | b"DBSIZE" | b"LASTSAVE" | b"PING"
607-
| b"INFO" | b"BGREWRITEAOF" | b"BGSAVE" | b"CLIENT LIST" | b"SAVE" | b"TIME"
608-
| b"KEYS" => Some(RoutingInfo::AllNodes),
609-
b"SCAN" | b"CLIENT SETNAME" | b"SHUTDOWN" | b"SLAVEOF" | b"REPLICAOF"
610-
| b"SCRIPT KILL" | b"MOVE" | b"BITOP" => None,
611-
b"EVALSHA" | b"EVAL" => {
612-
let key_count = get_u64_arg(&args, 2)?;
613-
if key_count == 0 {
614-
Some(RoutingInfo::Random)
615-
} else {
616-
get_arg(&args, 3).and_then(RoutingInfo::for_key)
617-
}
618-
}
619-
b"XGROUP" | b"XINFO" => get_arg(&args, 2).and_then(RoutingInfo::for_key),
620-
b"XREAD" | b"XREADGROUP" => {
621-
let streams_position = args.iter().position(|a| match a {
622-
Value::Data(a) => a.eq_ignore_ascii_case(b"STREAMS"),
623-
_ => false,
624-
})?;
625-
get_arg(&args, streams_position + 1).and_then(RoutingInfo::for_key)
626-
}
627-
_ => match get_arg(&args, 1) {
628-
Some(key) => RoutingInfo::for_key(key),
629-
None => Some(RoutingInfo::Random),
630-
},
631-
}
632-
}
633-
634-
pub fn for_key(key: &[u8]) -> Option<RoutingInfo> {
635-
let key = match get_hashtag(&key) {
636-
Some(tag) => tag,
637-
None => &key,
638-
};
639-
Some(RoutingInfo::Slot(
640-
crc16::State::<crc16::XMODEM>::calculate(key) % SLOT_SIZE as u16,
641-
))
642-
}
643-
}
644-
645-
#[derive(Debug)]
646-
struct Slot {
647-
start: u16,
648-
end: u16,
649-
master: String,
650-
replicas: Vec<String>,
651-
}
652-
653-
impl Slot {
654-
pub fn start(&self) -> u16 {
655-
self.start
656-
}
657-
pub fn end(&self) -> u16 {
658-
self.end
659-
}
660-
pub fn master(&self) -> &str {
661-
&self.master
662-
}
663-
#[allow(dead_code)]
664-
pub fn replicas(&self) -> &Vec<String> {
665-
&self.replicas
666-
}
667-
}
668-
669542
// Get slot data from connection.
670543
fn get_slots(connection: &mut Connection) -> RedisResult<Vec<Slot>> {
671544
let mut cmd = Cmd::new();
@@ -730,48 +603,9 @@ fn get_slots(connection: &mut Connection) -> RedisResult<Vec<Slot>> {
730603
}
731604

732605
let replicas = nodes.split_off(1);
733-
result.push(Slot {
734-
start,
735-
end,
736-
master: nodes.pop().unwrap(),
737-
replicas,
738-
});
606+
result.push(Slot::new(start, end, nodes.pop().unwrap(), replicas));
739607
}
740608
}
741609

742610
Ok(result)
743611
}
744-
745-
#[cfg(test)]
746-
mod tests {
747-
use super::{cmd, get_hashtag, RoutingInfo};
748-
749-
#[test]
750-
fn test_get_hashtag() {
751-
assert_eq!(get_hashtag(&b"foo{bar}baz"[..]), Some(&b"bar"[..]));
752-
assert_eq!(get_hashtag(&b"foo{}{baz}"[..]), None);
753-
assert_eq!(get_hashtag(&b"foo{{bar}}zap"[..]), Some(&b"{bar"[..]));
754-
}
755-
756-
#[test]
757-
fn test_routing_info_mixed_capatalization() {
758-
let mut upper = cmd("XREAD");
759-
upper.arg("STREAMS").arg("foo").arg(0);
760-
761-
let mut lower = cmd("xread");
762-
lower.arg("streams").arg("foo").arg(0);
763-
764-
assert_eq!(
765-
RoutingInfo::for_packed_command(&upper.get_packed_command()).unwrap(),
766-
RoutingInfo::for_packed_command(&lower.get_packed_command()).unwrap()
767-
);
768-
769-
let mut mixed = cmd("xReAd");
770-
mixed.arg("StReAmS").arg("foo").arg(0);
771-
772-
assert_eq!(
773-
RoutingInfo::for_packed_command(&lower.get_packed_command()).unwrap(),
774-
RoutingInfo::for_packed_command(&mixed.get_packed_command()).unwrap()
775-
);
776-
}
777-
}

0 commit comments

Comments
 (0)