Skip to content

Commit e64e870

Browse files
authored
Merge pull request redis-rs#446 from doyshinda/cmd_routing
Support for requesting `Cmds` directly from a `Connection`
2 parents 711c61b + 5a95fe7 commit e64e870

File tree

4 files changed

+196
-55
lines changed

4 files changed

+196
-55
lines changed

src/cluster.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,12 @@ use rand::{
5050
};
5151

5252
use super::{
53-
cmd, Cmd, Connection, ConnectionAddr, ConnectionInfo, ConnectionLike, ErrorKind,
54-
IntoConnectionInfo, RedisError, RedisResult, Value,
53+
cmd, parse_redis_value, Cmd, Connection, ConnectionAddr, ConnectionInfo, ConnectionLike,
54+
ErrorKind, IntoConnectionInfo, RedisError, RedisResult, Value,
5555
};
5656

5757
pub use crate::cluster_client::{ClusterClient, ClusterClientBuilder};
58-
use crate::cluster_routing::{RoutingInfo, Slot, SLOT_SIZE};
58+
use crate::cluster_routing::{Routable, RoutingInfo, Slot, SLOT_SIZE};
5959

6060
type SlotMap = BTreeMap<u16, String>;
6161

@@ -326,12 +326,13 @@ impl ClusterConnection {
326326
}
327327

328328
#[allow(clippy::unnecessary_unwrap)]
329-
fn request<T, F>(&self, cmd: &[u8], mut func: F) -> RedisResult<T>
329+
fn request<R, T, F>(&self, cmd: &R, mut func: F) -> RedisResult<T>
330330
where
331+
R: ?Sized + Routable,
331332
T: MergeResults + std::fmt::Debug,
332333
F: FnMut(&mut Connection) -> RedisResult<T>,
333334
{
334-
let slot = match RoutingInfo::for_packed_command(cmd) {
335+
let slot = match RoutingInfo::for_routable(cmd) {
335336
Some(RoutingInfo::Random) => None,
336337
Some(RoutingInfo::Slot(slot)) => Some(slot),
337338
Some(RoutingInfo::AllNodes) | Some(RoutingInfo::AllMasters) => {
@@ -461,8 +462,13 @@ impl ConnectionLike for ClusterConnection {
461462
false
462463
}
463464

465+
fn req_command(&mut self, cmd: &Cmd) -> RedisResult<Value> {
466+
self.request(cmd, move |conn| conn.req_command(cmd))
467+
}
468+
464469
fn req_packed_command(&mut self, cmd: &[u8]) -> RedisResult<Value> {
465-
self.request(cmd, move |conn| conn.req_packed_command(cmd))
470+
let value = parse_redis_value(cmd)?;
471+
self.request(&value, move |conn| conn.req_packed_command(cmd))
466472
}
467473

468474
fn req_packed_commands(
@@ -471,7 +477,8 @@ impl ConnectionLike for ClusterConnection {
471477
offset: usize,
472478
count: usize,
473479
) -> RedisResult<Vec<Value>> {
474-
self.request(cmd, move |conn| {
480+
let value = parse_redis_value(cmd)?;
481+
self.request(&value, move |conn| {
475482
conn.req_packed_commands(cmd, offset, count)
476483
})
477484
}

src/cluster_routing.rs

Lines changed: 126 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use super::{parse_redis_value, Value};
1+
use std::iter::Iterator;
2+
3+
use crate::cmd::{Arg, Cmd};
4+
use crate::types::Value;
25

36
pub(crate) const SLOT_SIZE: usize = 16384;
47

@@ -10,58 +13,36 @@ pub(crate) enum RoutingInfo {
1013
Slot(u16),
1114
}
1215

13-
fn get_arg(values: &[Value], idx: usize) -> Option<&[u8]> {
14-
match values.get(idx) {
15-
Some(Value::Data(ref data)) => Some(&data[..]),
16-
_ => None,
17-
}
18-
}
19-
20-
fn get_command_arg(values: &[Value], idx: usize) -> Option<Vec<u8>> {
21-
get_arg(values, idx).map(|x| x.to_ascii_uppercase())
22-
}
23-
24-
fn get_u64_arg(values: &[Value], idx: usize) -> Option<u64> {
25-
get_arg(values, idx)
26-
.and_then(|x| std::str::from_utf8(x).ok())
27-
.and_then(|x| x.parse().ok())
28-
}
29-
3016
impl RoutingInfo {
31-
pub fn for_packed_command(cmd: &[u8]) -> Option<RoutingInfo> {
32-
parse_redis_value(cmd).ok().and_then(RoutingInfo::for_value)
33-
}
34-
35-
pub fn for_value(value: Value) -> Option<RoutingInfo> {
36-
let args = match value {
37-
Value::Bulk(args) => args,
38-
_ => return None,
39-
};
40-
41-
match &get_command_arg(&args, 0)?[..] {
17+
pub(crate) fn for_routable<R>(r: &R) -> Option<RoutingInfo>
18+
where
19+
R: Routable + ?Sized,
20+
{
21+
match &r.command()?[..] {
4222
b"FLUSHALL" | b"FLUSHDB" | b"SCRIPT" => Some(RoutingInfo::AllMasters),
4323
b"ECHO" | b"CONFIG" | b"CLIENT" | b"SLOWLOG" | b"DBSIZE" | b"LASTSAVE" | b"PING"
4424
| b"INFO" | b"BGREWRITEAOF" | b"BGSAVE" | b"CLIENT LIST" | b"SAVE" | b"TIME"
4525
| b"KEYS" => Some(RoutingInfo::AllNodes),
4626
b"SCAN" | b"CLIENT SETNAME" | b"SHUTDOWN" | b"SLAVEOF" | b"REPLICAOF"
4727
| b"SCRIPT KILL" | b"MOVE" | b"BITOP" => None,
4828
b"EVALSHA" | b"EVAL" => {
49-
let key_count = get_u64_arg(&args, 2)?;
29+
let key_count = r
30+
.arg_idx(2)
31+
.and_then(|x| std::str::from_utf8(x).ok())
32+
.and_then(|x| x.parse::<u64>().ok())?;
5033
if key_count == 0 {
5134
Some(RoutingInfo::Random)
5235
} else {
53-
get_arg(&args, 3).and_then(RoutingInfo::for_key)
36+
r.arg_idx(3).and_then(RoutingInfo::for_key)
5437
}
5538
}
56-
b"XGROUP" | b"XINFO" => get_arg(&args, 2).and_then(RoutingInfo::for_key),
39+
b"XGROUP" | b"XINFO" => r.arg_idx(2).and_then(RoutingInfo::for_key),
5740
b"XREAD" | b"XREADGROUP" => {
58-
let streams_position = args.iter().position(|a| match a {
59-
Value::Data(a) => a.eq_ignore_ascii_case(b"STREAMS"),
60-
_ => false,
61-
})?;
62-
get_arg(&args, streams_position + 1).and_then(RoutingInfo::for_key)
41+
let streams_position = r.position(b"STREAMS")?;
42+
r.arg_idx(streams_position + 1)
43+
.and_then(RoutingInfo::for_key)
6344
}
64-
_ => match get_arg(&args, 1) {
45+
_ => match r.arg_idx(1) {
6546
Some(key) => RoutingInfo::for_key(key),
6647
None => Some(RoutingInfo::Random),
6748
},
@@ -79,6 +60,55 @@ impl RoutingInfo {
7960
}
8061
}
8162

63+
pub(crate) trait Routable {
64+
// Convenience function to return ascii uppercase version of the
65+
// the first argument (i.e., the command).
66+
fn command(&self) -> Option<Vec<u8>> {
67+
self.arg_idx(0).map(|x| x.to_ascii_uppercase())
68+
}
69+
70+
// Returns a reference to the data for the argument at `idx`.
71+
fn arg_idx(&self, idx: usize) -> Option<&[u8]>;
72+
73+
// Returns index of argument that matches `candidate`, if it exists
74+
fn position(&self, candidate: &[u8]) -> Option<usize>;
75+
}
76+
77+
impl Routable for Cmd {
78+
fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
79+
self.arg_idx(idx)
80+
}
81+
82+
fn position(&self, candidate: &[u8]) -> Option<usize> {
83+
self.args_iter().position(|a| match a {
84+
Arg::Simple(d) => d.eq_ignore_ascii_case(candidate),
85+
_ => false,
86+
})
87+
}
88+
}
89+
90+
impl Routable for Value {
91+
fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
92+
match self {
93+
Value::Bulk(args) => match args.get(idx) {
94+
Some(Value::Data(ref data)) => Some(&data[..]),
95+
_ => None,
96+
},
97+
_ => None,
98+
}
99+
}
100+
101+
fn position(&self, candidate: &[u8]) -> Option<usize> {
102+
match self {
103+
Value::Bulk(args) => args.iter().position(|a| match a {
104+
Value::Data(d) => d.eq_ignore_ascii_case(candidate),
105+
_ => false,
106+
}),
107+
_ => None,
108+
}
109+
}
110+
}
111+
82112
#[derive(Debug)]
83113
pub(crate) struct Slot {
84114
start: u16,
@@ -139,7 +169,7 @@ fn get_hashtag(key: &[u8]) -> Option<&[u8]> {
139169
#[cfg(test)]
140170
mod tests {
141171
use super::{get_hashtag, RoutingInfo};
142-
use crate::cmd;
172+
use crate::{cmd, parser::parse_redis_value};
143173

144174
#[test]
145175
fn test_get_hashtag() {
@@ -157,16 +187,69 @@ mod tests {
157187
lower.arg("streams").arg("foo").arg(0);
158188

159189
assert_eq!(
160-
RoutingInfo::for_packed_command(&upper.get_packed_command()).unwrap(),
161-
RoutingInfo::for_packed_command(&lower.get_packed_command()).unwrap()
190+
RoutingInfo::for_routable(&upper).unwrap(),
191+
RoutingInfo::for_routable(&lower).unwrap()
162192
);
163193

164194
let mut mixed = cmd("xReAd");
165195
mixed.arg("StReAmS").arg("foo").arg(0);
166196

167197
assert_eq!(
168-
RoutingInfo::for_packed_command(&lower.get_packed_command()).unwrap(),
169-
RoutingInfo::for_packed_command(&mixed.get_packed_command()).unwrap()
198+
RoutingInfo::for_routable(&lower).unwrap(),
199+
RoutingInfo::for_routable(&mixed).unwrap()
170200
);
171201
}
202+
203+
#[test]
204+
fn test_routing_info() {
205+
let mut test_cmds = vec![];
206+
207+
// RoutingInfo::AllMasters
208+
let mut test_cmd = cmd("FLUSHALL");
209+
test_cmd.arg("");
210+
test_cmds.push(test_cmd);
211+
212+
// RoutingInfo::AllNodes
213+
test_cmd = cmd("ECHO");
214+
test_cmd.arg("");
215+
test_cmds.push(test_cmd);
216+
217+
// Routing key is 2nd arg ("42")
218+
test_cmd = cmd("SET");
219+
test_cmd.arg("42");
220+
test_cmds.push(test_cmd);
221+
222+
// Routing key is 3rd arg ("FOOBAR")
223+
test_cmd = cmd("XINFO");
224+
test_cmd.arg("GROUPS").arg("FOOBAR");
225+
test_cmds.push(test_cmd);
226+
227+
// Routing key is 3rd or 4th arg (3rd = "0" == RoutingInfo::Random)
228+
test_cmd = cmd("EVAL");
229+
test_cmd.arg("FOO").arg("0").arg("BAR");
230+
test_cmds.push(test_cmd);
231+
232+
// Routing key is 3rd or 4th arg (3rd != "0" == RoutingInfo::Slot)
233+
test_cmd = cmd("EVAL");
234+
test_cmd.arg("FOO").arg("4").arg("BAR");
235+
test_cmds.push(test_cmd);
236+
237+
// Routing key position is variable, 3rd arg
238+
test_cmd = cmd("XREAD");
239+
test_cmd.arg("STREAMS").arg("4");
240+
test_cmds.push(test_cmd);
241+
242+
// Routing key position is variable, 4th arg
243+
test_cmd = cmd("XREAD");
244+
test_cmd.arg("FOO").arg("STREAMS").arg("4");
245+
test_cmds.push(test_cmd);
246+
247+
for cmd in test_cmds {
248+
let value = parse_redis_value(&cmd.get_packed_command()).unwrap();
249+
assert_eq!(
250+
RoutingInfo::for_routable(&value).unwrap(),
251+
RoutingInfo::for_routable(&cmd).unwrap(),
252+
);
253+
}
254+
}
172255
}

src/cmd.rs

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,7 @@ impl Cmd {
343343
/// you can retrieve data.
344344
#[inline]
345345
pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
346-
let pcmd = self.get_packed_command();
347-
match con.req_packed_command(&pcmd) {
346+
match con.req_command(self) {
348347
Ok(val) => from_redis_value(&val),
349348
Err(e) => Err(e),
350349
}
@@ -377,8 +376,7 @@ impl Cmd {
377376
/// tuple of cursor and list).
378377
#[inline]
379378
pub fn iter<T: FromRedisValue>(self, con: &mut dyn ConnectionLike) -> RedisResult<Iter<'_, T>> {
380-
let pcmd = self.get_packed_command();
381-
let rv = con.req_packed_command(&pcmd)?;
379+
let rv = con.req_command(&self)?;
382380

383381
let (cursor, batch) = if rv.looks_like_cursor() {
384382
from_redis_value::<(u64, Vec<T>)>(&rv)?
@@ -465,6 +463,31 @@ impl Cmd {
465463
Arg::Cursor => Arg::Cursor,
466464
})
467465
}
466+
467+
// Get a reference to the argument at `idx`
468+
#[cfg(feature = "cluster")]
469+
pub(crate) fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
470+
if idx >= self.args.len() {
471+
return None;
472+
}
473+
474+
let start = if idx == 0 {
475+
0
476+
} else {
477+
match self.args[idx - 1] {
478+
Arg::Simple(n) => n,
479+
_ => 0,
480+
}
481+
};
482+
let end = match self.args[idx] {
483+
Arg::Simple(n) => n,
484+
_ => 0,
485+
};
486+
if start == 0 && end == 0 {
487+
return None;
488+
}
489+
Some(&self.data[start..end])
490+
}
468491
}
469492

470493
/// Shortcut function to creating a command with a single argument.
@@ -506,3 +529,25 @@ pub fn pack_command(args: &[Vec<u8>]) -> Vec<u8> {
506529
pub fn pipe() -> Pipeline {
507530
Pipeline::new()
508531
}
532+
533+
#[cfg(test)]
534+
#[cfg(feature = "cluster")]
535+
mod tests {
536+
use super::Cmd;
537+
538+
#[test]
539+
fn test_cmd_arg_idx() {
540+
let mut c = Cmd::new();
541+
assert_eq!(c.arg_idx(0), None);
542+
543+
c.arg("SET");
544+
assert_eq!(c.arg_idx(0), Some(&b"SET"[..]));
545+
assert_eq!(c.arg_idx(1), None);
546+
547+
c.arg("foo").arg("42");
548+
assert_eq!(c.arg_idx(1), Some(&b"foo"[..]));
549+
assert_eq!(c.arg_idx(2), Some(&b"42"[..]));
550+
assert_eq!(c.arg_idx(3), None);
551+
assert_eq!(c.arg_idx(4), None);
552+
}
553+
}

src/connection.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::path::PathBuf;
55
use std::str::{from_utf8, FromStr};
66
use std::time::Duration;
77

8-
use crate::cmd::{cmd, pipe};
8+
use crate::cmd::{cmd, pipe, Cmd};
99
use crate::parser::Parser;
1010
use crate::pipeline::Pipeline;
1111
use crate::types::{
@@ -616,6 +616,12 @@ pub trait ConnectionLike {
616616
count: usize,
617617
) -> RedisResult<Vec<Value>>;
618618

619+
/// Sends a [Cmd](Cmd) into the TCP socket and reads a single response from it.
620+
fn req_command(&mut self, cmd: &Cmd) -> RedisResult<Value> {
621+
let pcmd = cmd.get_packed_command();
622+
self.req_packed_command(&pcmd)
623+
}
624+
619625
/// Returns the database this connection is bound to. Note that this
620626
/// information might be unreliable because it's initially cached and
621627
/// also might be incorrect if the connection like object is not

0 commit comments

Comments
 (0)