Skip to content

Commit

Permalink
add pub sub client implementation with examples
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Apr 4, 2020
1 parent 2115ebf commit 95569f1
Show file tree
Hide file tree
Showing 6 changed files with 316 additions and 20 deletions.
31 changes: 29 additions & 2 deletions examples/pub.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,31 @@
//! Publish to a redis channel example.
//!
//! A simple client that connects to a mini-redis server, and
//! publishes a message on `foo` channel
//!
//! You can test this out by running:
//!
//! cargo run --bin server
//!
//! Then in another terminal run:
//!
//! cargo run --example sub
//!
//! And then in another terminal run:
//!
//! cargo run --example pub
#![warn(rust_2018_idioms)]

use mini_redis::{client, Result};

#[tokio::main]
async fn main() {
unimplemented!();
async fn main() -> Result<()> {
// Open a connection to the mini-redis address.
let mut client = client::connect("127.0.0.1:6379").await?;

// publish message `bar` on channel foo
client.publish("foo", "bar".into()).await?;

Ok(())
}
38 changes: 35 additions & 3 deletions examples/sub.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,38 @@
/// Subscribe to a redis channel
//! Subscribe to a redis channel example.
//!
//! A simple client that connects to a mini-redis server, subscribes to "foo" and "bar" channels
//! and awaits messages published on those channels
//!
//! You can test this out by running:
//!
//! cargo run --bin server
//!
//! Then in another terminal run:
//!
//! cargo run --example sub
//!
//! And then in another terminal run:
//!
//! cargo run --example pub
#![warn(rust_2018_idioms)]

use mini_redis::{client, Result};
use tokio::stream::StreamExt;

#[tokio::main]
async fn main() {
unimplemented!();
pub async fn main() -> Result<()> {
// Open a connection to the mini-redis address.
let client = client::connect("127.0.0.1:6379").await?;


// subscribe to channel foo
let mut result = client.subscribe(vec!["foo".into()]).await?;

// await messages on channel foo
while let Some(Ok(msg)) = result.next().await {
println!("got message from the channel: {}; message = {:?}", msg.channel, msg.content);
}

Ok(())
}
226 changes: 218 additions & 8 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use crate::cmd::{Get, Publish, Set, Subscribe, Unsubscribe};
use crate::{Connection, Frame};
use crate::cmd::{Get, Set};

use bytes::Bytes;
use std::future::Future;
use std::io::{Error, ErrorKind};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio::stream::Stream;
use tracing::{debug, instrument};

/// Mini asynchronous Redis client
Expand Down Expand Up @@ -47,7 +51,31 @@ impl Client {
key: key.to_string(),
value: value,
expire: None,
}).await
})
.await
}

/// publish `message` on the `channel`
#[instrument(skip(self))]
pub async fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> {
self.publish_cmd(Publish {
channel: channel.to_string(),
message: message,
})
.await
}

/// subscribe to the list of channels
/// when client sends `SUBSCRIBE`, server's handle for client start's accepting only
/// `SUBSCRIBE` and `UNSUBSCRIBE` commands so we consume client and return Subscribe
#[instrument(skip(self))]
pub async fn subscribe(mut self, channels: Vec<String>) -> crate::Result<Subscriber> {
let subscribed_channels = self.subscribe_cmd(Subscribe { channels: channels }).await?;

Ok(Subscriber {
conn: self.conn,
subscribed_channels,
})
}

/// Set the value of a key to `value`. The value expires after `expiration`.
Expand All @@ -62,7 +90,8 @@ impl Client {
key: key.to_string(),
value: value.into(),
expire: Some(expiration),
}).await
})
.await
}

async fn set_cmd(&mut self, cmd: Set) -> crate::Result<()> {
Expand All @@ -81,6 +110,52 @@ impl Client {
}
}

async fn publish_cmd(&mut self, cmd: Publish) -> crate::Result<u64> {
// Convert the `Publish` command into a frame
let frame = cmd.into_frame();

debug!(request = ?frame);

// Write the frame to the socket
self.conn.write_frame(&frame).await?;

// Read the response
match self.read_response().await? {
Frame::Integer(response) => Ok(response),
frame => Err(frame.to_error()),
}
}

async fn subscribe_cmd(&mut self, cmd: Subscribe) -> crate::Result<Vec<String>> {
// Convert the `Subscribe` command into a frame
let channels = cmd.channels.clone();
let frame = cmd.into_frame();

debug!(request = ?frame);

// Write the frame to the socket
self.conn.write_frame(&frame).await?;

// Read the response
for channel in &channels {
let response = self.read_response().await?;
match response {
Frame::Array(ref frame) => match frame.as_slice() {
[subscribe, schannel]
if subscribe.to_string() == "subscribe"
&& &schannel.to_string() == channel =>
{
()
}
_ => return Err(response.to_error()),
},
frame => return Err(frame.to_error()),
};
}

Ok(channels)
}

/// Reads a response frame from the socket. If an `Error` frame is read, it
/// is converted to `Err`.
async fn read_response(&mut self) -> crate::Result<Frame> {
Expand All @@ -89,20 +164,155 @@ impl Client {
debug!(?response);

match response {
Some(Frame::Error(msg)) => {
Err(msg.into())
Some(Frame::Error(msg)) => Err(msg.into()),
Some(frame) => Ok(frame),
None => {
// Receiving `None` here indicates the server has closed the
// connection without sending a frame. This is unexpected and is
// represented as a "connection reset by peer" error.
let err = Error::new(ErrorKind::ConnectionReset, "connection reset by server");

Err(err.into())
}
}
}
}

pub struct Subscriber {
conn: Connection,
subscribed_channels: Vec<String>,
}

impl Subscriber {
/// Subscribe to a list of new channels
#[instrument(skip(self))]
pub async fn subscribe(&mut self, channels: Vec<String>) -> crate::Result<()> {
let cmd = Subscribe { channels: channels };

let channels = cmd.channels.clone();
let frame = cmd.into_frame();

debug!(request = ?frame);

// Write the frame to the socket
self.conn.write_frame(&frame).await?;

// Read the response
for channel in &channels {
let response = self.read_response().await?;
match response {
Frame::Array(ref frame) => match frame.as_slice() {
[subscribe, schannel]
if &subscribe.to_string() == "subscribe"
&& &schannel.to_string() == channel =>
{
()
}
_ => return Err(response.to_error()),
},
frame => return Err(frame.to_error()),
};
}

self.subscribed_channels.extend(channels);

Ok(())
}

/// Unsubscribe to a list of new channels
#[instrument(skip(self))]
pub async fn unsubscribe(&mut self, channels: Vec<String>) -> crate::Result<()> {
let cmd = Unsubscribe { channels: channels };

let mut channels = cmd.channels.clone();
let frame = cmd.into_frame();

debug!(request = ?frame);

// Write the frame to the socket
self.conn.write_frame(&frame).await?;

// if the input channel list is empty, server acknowledges as unsubscribing
// from all subscribed channels, so we assert that the unsubscribe list received
// matches the client subscribed one
if channels.is_empty() {
channels = self.subscribed_channels.clone();
}

// Read the response
for channel in &channels {
let response = self.read_response().await?;
match response {
Frame::Array(ref frame) => match frame.as_slice() {
[unsubscribe, uchannel]
if &unsubscribe.to_string() == "unsubscribe"
&& &uchannel.to_string() == channel =>
{
self.subscribed_channels
.retain(|channel| channel != &uchannel.to_string());
}
_ => return Err(response.to_error()),
},
frame => return Err(frame.to_error()),
};
}

Ok(())
}

/// Reads a response frame from the socket. If an `Error` frame is read, it
/// is converted to `Err`.
async fn read_response(&mut self) -> crate::Result<Frame> {
let response = self.conn.read_frame().await?;

debug!(?response);

match response {
Some(Frame::Error(msg)) => Err(msg.into()),
Some(frame) => Ok(frame),
None => {
// Receiving `None` here indicates the server has closed the
// connection without sending a frame. This is unexpected and is
// represented as a "connection reset by peer" error.
let err = Error::new(
ErrorKind::ConnectionReset,
"connection reset by server");
let err = Error::new(ErrorKind::ConnectionReset, "connection reset by server");

Err(err.into())
}
}
}
}

impl Stream for Subscriber {
type Item = crate::Result<Message>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut read_frame = Box::pin(self.conn.read_frame());
match Pin::new(&mut read_frame).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(None)) => Poll::Ready(None),
Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err.into()))),
Poll::Ready(Ok(Some(mframe))) => {
debug!(?mframe);
match mframe {
Frame::Array(ref frame) => match frame.as_slice() {
[message, channel, content] if &message.to_string() == "message" => {
Poll::Ready(Some(Ok(Message {
channel: channel.to_string(),
content: Bytes::from(content.to_string()),
})))
}
_ => Poll::Ready(Some(Err(mframe.to_error()))),
},
frame => Poll::Ready(Some(Err(frame.to_error()))),
}
}
}
}
}

/// A message received on a subscribed channel
#[derive(Debug, Clone)]
pub struct Message {
pub channel: String,
pub content: Bytes,
}
13 changes: 11 additions & 2 deletions src/cmd/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use bytes::Bytes;

#[derive(Debug)]
pub struct Publish {
channel: String,
message: Bytes,
pub(crate) channel: String,
pub(crate) message: Bytes,
}

impl Publish {
Expand All @@ -24,4 +24,13 @@ impl Publish {
dst.write_frame(&response).await?;
Ok(())
}

pub(crate) fn into_frame(self) -> Frame {
let mut frame = Frame::array();
frame.push_bulk(Bytes::from("publish".as_bytes()));
frame.push_bulk(Bytes::from(self.channel.into_bytes()));
frame.push_bulk(self.message);

frame
}
}
Loading

0 comments on commit 95569f1

Please sign in to comment.