Skip to content

Commit

Permalink
Add client management cmd (#54)
Browse files Browse the repository at this point in the history
* Add client id/list/kill

Signed-off-by: Morgan279 <wancheng.long@pingcap.com>

* Fix non-printable string args in eval/evalsha (#51)

Signed-off-by: yongman <yming0221@gmail.com>

* Check gc key in small transaction to avoid confliction (#49)

Signed-off-by: yongman <yming0221@gmail.com>

* Add zremrangebyrank command and test cases (#52)

* Add zremrangebyrank command

Signed-off-by: yongman <yming0221@gmail.com>

* Add e2e test case for zremrangebyrank

Signed-off-by: yongman <yming0221@gmail.com>

* Add zremrangebyrank command to lua and multi support

Signed-off-by: yongman <yming0221@gmail.com>

* Add more e2e test case in lua

Signed-off-by: yongman <yming0221@gmail.com>

* Fix local timestamp and use append mode in log (#53)

Signed-off-by: yongman <yming0221@gmail.com>

* Fix client kill

Signed-off-by: Morgan279 <wancheng.long@pingcap.com>

* Resolve client kill dead lock

Signed-off-by: Morgan279 <wancheng.long@pingcap.com>

* Fix client encode format

Signed-off-by: Morgan279 <wancheng.long@pingcap.com>

* Add client e2e test case

Signed-off-by: Morgan279 <wancheng.long@pingcap.com>

Co-authored-by: yongman <yming0221@gmail.com>
  • Loading branch information
Morgan279 and yongman authored Aug 5, 2022
1 parent 0e1a586 commit 5318ab9
Show file tree
Hide file tree
Showing 9 changed files with 381 additions and 23 deletions.
100 changes: 100 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use async_std::net::TcpStream;
use std::fmt;
use std::fmt::Formatter;
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::SystemTime;
use tokio::sync::mpsc::Sender;

// reserve id 0
static COUNTER: AtomicU64 = AtomicU64::new(1);

#[derive(Debug, Clone)]
pub struct Client {
id: u64,
// name here does not constrain to uniqueness
name: String,
fd: RawFd,
// last command played
cmd: String,

local_addr: String,
peer_addr: String,

create_time: SystemTime,
last_interaction: SystemTime,

kill_tx: Sender<()>,
}

impl Client {
pub fn new(socket: TcpStream, kill_tx: Sender<()>) -> Client {
let now = SystemTime::now();
Client {
id: COUNTER.fetch_add(1, Ordering::Relaxed),
name: "".to_owned(),
fd: socket.as_raw_fd(),
cmd: "".to_owned(),
local_addr: (&socket).local_addr().unwrap().to_string(),
peer_addr: (&socket).peer_addr().unwrap().to_string(),
create_time: now,
last_interaction: now,
kill_tx,
}
}

pub fn interact(&mut self, cmd_name: &str) {
self.cmd = cmd_name.to_string();
self.last_interaction = SystemTime::now();
}

pub async fn kill(&self) {
let _ = self.kill_tx.send(()).await;
}

pub fn id(&self) -> u64 {
self.id
}

pub fn name(&self) -> &str {
self.name.as_str()
}

pub fn local_addr(&self) -> &str {
&self.local_addr
}
pub fn peer_addr(&self) -> &str {
&self.peer_addr
}

pub fn age(&self) -> u64 {
self.create_time.elapsed().unwrap().as_secs()
}

pub fn idle(&self) -> u64 {
self.last_interaction.elapsed().unwrap().as_secs()
}

pub fn set_name(&mut self, name: &str) {
self.name = name.to_string();
}
}

impl fmt::Display for Client {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(
f,
"id={} addr={} laddr={} fd={} name={} age={} idle={} flags=N \
db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 argv-mem=10 obl=0 oll=0 omem=0 \
tot-mem=0 events=r cmd={} user=default redir=-1",
self.id,
self.peer_addr,
self.local_addr,
self.fd,
self.name,
self.age(),
self.idle(),
self.cmd
)
}
}
190 changes: 182 additions & 8 deletions src/cmd/fake.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
use slog::debug;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::client::Client;
use crate::cmd::Invalid;
use crate::tikv::errors::{
REDIS_INVALID_CLIENT_ID_ERR, REDIS_NOT_SUPPORTED_ERR, REDIS_NO_SUCH_CLIENT_ERR,
REDIS_VALUE_IS_NOT_INTEGER_ERR,
};
use crate::utils::{resp_int, resp_str};
use crate::{
config::LOGGER,
tikv::errors::REDIS_UNKNOWN_SUBCOMMAND,
Expand Down Expand Up @@ -29,8 +38,14 @@ impl Fake {
Ok(Fake { args, valid: true })
}

pub(crate) async fn apply(self, command: &str, dst: &mut Connection) -> crate::Result<()> {
let response = self.do_apply(command, dst).await;
pub(crate) async fn apply(
self,
command: &str,
dst: &mut Connection,
cur_client: Arc<Mutex<Client>>,
clients: Arc<Mutex<HashMap<u64, Arc<Mutex<Client>>>>>,
) -> crate::Result<()> {
let response = self.do_apply(command, cur_client, clients).await;

debug!(
LOGGER,
Expand All @@ -45,7 +60,12 @@ impl Fake {
Ok(())
}

async fn do_apply(self, command: &str, dst: &mut Connection) -> Frame {
async fn do_apply(
self,
command: &str,
cur_client: Arc<Mutex<Client>>,
clients: Arc<Mutex<HashMap<u64, Arc<Mutex<Client>>>>>,
) -> Frame {
if !self.valid {
return resp_invalid_arguments();
}
Expand All @@ -55,13 +75,155 @@ impl Fake {
"CLIENT" => {
// TODO client more management will be added later
match self.args[0].clone().to_uppercase().as_str() {
"ID" => resp_int(cur_client.lock().await.id() as i64),
"LIST" => {
let fake_list = format!("id=0 addr={} laddr={} fd=0 name= age=0 idle=0 flags=N \
db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 argv-mem=10 obl=0 oll=0 omem=0 \
tot-mem=0 events=r cmd=client user=default redir=-1\r\n", dst.peer_addr(), dst.local_addr());
resp_bulk(fake_list.into_bytes())
if self.args.len() == 1 {
return resp_bulk(
encode_clients_info(
clients.lock().await.clone().into_values().collect(),
)
.await,
);
}

return match self.args[1].clone().to_uppercase().as_str() {
"ID" => {
let mut match_clients = vec![];
for i in 2..self.args.len() {
match self.args[i].parse::<u64>() {
Ok(client_id) => {
if let Some(client) =
clients.lock().await.get(&client_id)
{
match_clients.push(client.clone());
}
}
Err(_) => return resp_err(REDIS_INVALID_CLIENT_ID_ERR),
}
}

return resp_bulk(encode_clients_info(match_clients).await);
}
_ => resp_err(REDIS_NOT_SUPPORTED_ERR),
};
}
"KILL" => {
if self.args.len() < 2 {
return resp_invalid_arguments();
}

// three arguments format (old format)
if self.args.len() == 2 {
let mut target_client = None;
{
let lk_clients = clients.lock().await;
for client in lk_clients.values() {
let lk_client = client.lock().await;
if lk_client.peer_addr() == self.args[1] {
target_client = Some(client.clone());
break;
}
}
}

return match target_client {
Some(client) => {
let lk_client = client.lock().await;
let mut lk_clients = clients.lock().await;
lk_client.kill().await;
lk_clients.remove(&lk_client.id());
resp_ok()
}
None => resp_err(REDIS_NO_SUCH_CLIENT_ERR),
};
}

// not match <filter> <value> format
if ((self.args.len() - 1) & 1) != 0 {
return resp_invalid_arguments();
}

let mut filter_peer_addr = "".to_owned();
let mut filter_local_addr = "".to_owned();
let mut filter_id = 0;
// skipme is set to yes by default in redis
let mut filter_skipme = true;

for i in (1..self.args.len()).step_by(2) {
let value = self.args[i + 1].clone();
match self.args[i].clone().to_uppercase().as_str() {
"ID" => match value.parse::<u64>() {
Ok(client_id) => filter_id = client_id,
// not REDIS_INVALID_CLIENT_ID_ERR, to be compatible with redis
Err(_) => return resp_err(REDIS_VALUE_IS_NOT_INTEGER_ERR),
},
"ADDR" => filter_peer_addr = value,
"LADDR" => filter_local_addr = value,
"SKIPME" => match value.to_uppercase().as_str() {
"YES" => filter_skipme = true,
"NO" => filter_skipme = false,
_ => return resp_invalid_arguments(),
},
_ => return resp_err(REDIS_NOT_SUPPORTED_ERR),
}
}

// retrieve current client id in advance for preventing dead lock during clients traverse
let cur_client_id = cur_client.lock().await.id();
let mut eligible_clients: Vec<Arc<Mutex<Client>>> = vec![];
{
let lk_clients = clients.lock().await;
for client in lk_clients.values() {
let lk_client = client.lock().await;
if !filter_peer_addr.is_empty()
&& lk_client.peer_addr() != filter_peer_addr
{
continue;
}
if !filter_local_addr.is_empty()
&& lk_client.local_addr() != filter_local_addr
{
continue;
}
if filter_id != 0 && lk_client.id() != filter_id {
continue;
}
if cur_client_id == lk_client.id() && filter_skipme {
continue;
}

eligible_clients.push(client.clone());
}
}

let killed = eligible_clients.len() as i64;
let mut lk_clients = clients.lock().await;
for eligible_client in eligible_clients {
let lk_eligible_client = eligible_client.lock().await;
lk_eligible_client.kill().await;
lk_clients.remove(&lk_eligible_client.id());
}

resp_int(killed)
}
"SETNAME" => {
if self.args.len() != 2 {
return resp_invalid_arguments();
}

let mut w_cur_client = cur_client.lock().await;
w_cur_client.set_name(&self.args[1]);
resp_ok()
}
"GETNAME" => {
let r_cur_client = cur_client.lock().await;
let name = r_cur_client.name();
if name.is_empty() {
return resp_nil();
}

resp_str(name)
}
"SETNAME" => resp_ok(),
_ => resp_err(REDIS_UNKNOWN_SUBCOMMAND),
}
}
Expand All @@ -82,6 +244,18 @@ impl Fake {
}
}

#[inline]
async fn encode_clients_info(clients: Vec<Arc<Mutex<Client>>>) -> Vec<u8> {
let mut resp_list = String::new();
for client in clients {
let r_client = client.lock().await;
resp_list.push_str(&r_client.to_string());
resp_list.push('\n');
}

resp_list.into_bytes()
}

impl Invalid for Fake {
fn new_invalid() -> Fake {
Fake {
Expand Down
15 changes: 11 additions & 4 deletions src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ mod get;

use bytes::Bytes;
pub use get::Get;
use std::collections::HashMap;
use std::sync::Arc;

mod del;
pub use del::Del;
Expand All @@ -12,6 +14,7 @@ pub use mget::Mget;
mod mset;
use mlua::Lua;
pub use mset::Mset;
use tokio::sync::Mutex;

mod strlen;
pub use strlen::Strlen;
Expand Down Expand Up @@ -199,6 +202,7 @@ pub use fake::Fake;
mod multi;
pub use multi::Multi;

use crate::client::Client;
use crate::{cluster::Cluster as Topo, Connection, Db, Frame, Parse, ParseError, Shutdown};

/// All commands should be implement new_invalid() for invalid check
Expand Down Expand Up @@ -660,11 +664,14 @@ impl Command {
///
/// The response is written to `dst`. This is called by the server in order
/// to execute a received command.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn apply(
self,
db: &Db,
topo: &Topo,
dst: &mut Connection,
cur_client: Arc<Mutex<Client>>,
clients: Arc<Mutex<HashMap<u64, Arc<Mutex<Client>>>>>,
lua: &mut Option<Lua>,
shutdown: &mut Shutdown,
) -> crate::Result<()> {
Expand Down Expand Up @@ -748,10 +755,10 @@ impl Command {
Debug(cmd) => cmd.apply(dst).await,

Cluster(cmd) => cmd.apply(topo, dst).await,
ReadWrite(cmd) => cmd.apply("readwrite", dst).await,
ReadOnly(cmd) => cmd.apply("readonly", dst).await,
Client(cmd) => cmd.apply("client", dst).await,
Info(cmd) => cmd.apply("info", dst).await,
ReadWrite(cmd) => cmd.apply("readwrite", dst, cur_client, clients).await,
ReadOnly(cmd) => cmd.apply("readonly", dst, cur_client, clients).await,
Client(cmd) => cmd.apply("client", dst, cur_client, clients).await,
Info(cmd) => cmd.apply("info", dst, cur_client, clients).await,

Unknown(cmd) => cmd.apply(dst).await,
// `Unsubscribe` cannot be applied. It may only be received from the
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub use tikv::set_instance_id;

pub mod cluster;

pub mod client;

pub mod utils;

pub mod config;
Expand Down
Loading

0 comments on commit 5318ab9

Please sign in to comment.