Skip to content

Commit

Permalink
Added new routing option: SingleNodeRoutingInfo::RandomPrimary (#194)
Browse files Browse the repository at this point in the history
* Added new routing option: `SingleNodeRoutingInfo::RandomPrimary`

Signed-off-by: Eran Ifrah <eifrah@amazon.com>
  • Loading branch information
eifrah-aws authored Oct 7, 2024
1 parent 8e89ad8 commit 41de670
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 11 deletions.
18 changes: 9 additions & 9 deletions redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ use std::str::FromStr;
use std::thread;
use std::time::Duration;

use rand::{seq::IteratorRandom, thread_rng, Rng};
use rand::{seq::IteratorRandom, thread_rng};

use crate::cluster_pipeline::UNROUTABLE_ERROR;
use crate::cluster_routing::{
MultipleNodeRoutingInfo, ResponsePolicy, Routable, SingleNodeRoutingInfo, SlotAddr,
MultipleNodeRoutingInfo, ResponsePolicy, Routable, SingleNodeRoutingInfo,
};
use crate::cluster_slotmap::SlotMap;
use crate::cluster_topology::{parse_and_count_slots, SLOT_SIZE};
use crate::cluster_topology::parse_and_count_slots;
use crate::cmd::{cmd, Cmd};
use crate::connection::{
connect, Connection, ConnectionAddr, ConnectionInfo, ConnectionLike, RedisConnectionInfo,
Expand Down Expand Up @@ -459,12 +459,9 @@ where
};

match RoutingInfo::for_routable(cmd) {
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => {
let mut rng = thread_rng();
Ok(addr_for_slot(Route::new(
rng.gen_range(0..SLOT_SIZE),
SlotAddr::Master,
))?)
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
| Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::RandomPrimary)) => {
Ok(addr_for_slot(Route::new_random_primary())?)
}
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(route))) => {
Ok(addr_for_slot(route)?)
Expand Down Expand Up @@ -730,6 +727,9 @@ where
SingleNodeRoutingInfo::SpecificNode(route) => {
self.get_connection(&mut connections, route)?
}
SingleNodeRoutingInfo::RandomPrimary => {
self.get_connection(&mut connections, &Route::new_random_primary())?
}
SingleNodeRoutingInfo::ByAddress { host, port } => {
let address = format!("{host}:{port}");
let conn = self.get_connection_by_addr(&mut connections, &address)?;
Expand Down
6 changes: 6 additions & 0 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,9 @@ impl<C> From<SingleNodeRoutingInfo> for InternalSingleNodeRouting<C> {
SingleNodeRoutingInfo::SpecificNode(route) => {
InternalSingleNodeRouting::SpecificNode(route)
}
SingleNodeRoutingInfo::RandomPrimary => {
InternalSingleNodeRouting::SpecificNode(Route::new_random_primary())
}
SingleNodeRoutingInfo::ByAddress { host, port } => {
InternalSingleNodeRouting::ByAddress(format!("{host}:{port}"))
}
Expand Down Expand Up @@ -620,6 +623,9 @@ fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult<Option<Route>>
Some(cluster_routing::RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(route),
)) => Some(route),
Some(cluster_routing::RoutingInfo::SingleNode(
SingleNodeRoutingInfo::RandomPrimary,
)) => Some(Route::new_random_primary()),
Some(cluster_routing::RoutingInfo::MultiNode(_)) => None,
Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress {
..
Expand Down
43 changes: 41 additions & 2 deletions redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use rand::Rng;
use std::cmp::min;
use std::collections::HashMap;

Expand Down Expand Up @@ -66,6 +67,8 @@ pub enum RoutingInfo {
pub enum SingleNodeRoutingInfo {
/// Route to any node at random
Random,
/// Route to any *primary* node
RandomPrimary,
/// Route to the node that matches the [Route]
SpecificNode(Route),
/// Route to the node with the given address.
Expand Down Expand Up @@ -610,7 +613,13 @@ impl RoutingInfo {
.and_then(|x| std::str::from_utf8(x).ok())
.and_then(|x| x.parse::<u64>().ok())?;
if key_count == 0 {
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
if is_readonly_cmd(cmd) {
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
} else {
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::RandomPrimary,
))
}
} else {
r.arg_idx(3).map(|key| RoutingInfo::for_key(cmd, key))
}
Expand Down Expand Up @@ -949,6 +958,17 @@ impl Route {
pub fn slot_addr(&self) -> SlotAddr {
self.1
}

/// Returns a new Route for a random primary node
pub fn new_random_primary() -> Self {
Self::new(random_slot(), SlotAddr::Master)
}
}

/// Choose a random slot from `0..SLOT_SIZE` (excluding)
fn random_slot() -> u16 {
let mut rng = rand::thread_rng();
rng.gen_range(0..crate::cluster_topology::SLOT_SIZE)
}

#[cfg(test)]
Expand Down Expand Up @@ -1096,12 +1116,31 @@ mod tests {
cmd("EVAL").arg(r#"redis.call("PING");"#).arg(0),
cmd("EVALSHA").arg(r#"redis.call("PING");"#).arg(0),
] {
// EVAL / EVALSHA are expected to be routed to a RandomPrimary
assert_eq!(
RoutingInfo::for_routable(cmd),
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::RandomPrimary
))
);
}

// FCALL (with 0 keys) is expected to be routed to a random primary node
assert_eq!(
RoutingInfo::for_routable(cmd("FCALL").arg("foo").arg(0)),
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::RandomPrimary
))
);

// While FCALL with N keys is expected to be routed to a specific node
assert_eq!(
RoutingInfo::for_routable(cmd("FCALL").arg("foo").arg(1).arg("mykey")),
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(Route::new(slot(b"mykey"), SlotAddr::Master))
))
);

for (cmd, expected) in [
(
cmd("EVAL")
Expand Down

0 comments on commit 41de670

Please sign in to comment.