Skip to content

Commit

Permalink
add miniresp
Browse files Browse the repository at this point in the history
  • Loading branch information
iwanbk committed Apr 20, 2024
1 parent 174aea3 commit cbeb7b9
Show file tree
Hide file tree
Showing 10 changed files with 659 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/conn/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# conn module

This module was extracted from mini-redis.
It is only for temporary workaround
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ pub mod server;
mod proksis;
mod conn;
mod cmd;
mod miniresp;

15 changes: 15 additions & 0 deletions src/miniresp/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# miniresp

A very simple RESP2 parser & writter

## Features

### Parser

Parse redis request and get following data:

- command: to define the next action
- key: to define the responsible nodes
- raw command: to be forwarded to the respected node

### Writter
23 changes: 23 additions & 0 deletions src/miniresp/command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use bytes::Bytes;

pub enum CommandName {
Get,
Set,
Unknown,
}

pub struct Command {
pub name: String,
pub key: String,
pub raw: Bytes,
}

impl Command {
pub fn new(name: String, key: String, raw: Bytes) -> Command {
Command {
name,
key,
raw,
}
}
}
128 changes: 128 additions & 0 deletions src/miniresp/conn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use std::io::Cursor;

use bytes::{Buf, BytesMut};
use tokio::io::{AsyncReadExt, BufReader};
use tokio::net::TcpStream;

use crate::miniresp::command::Command;
use crate::miniresp::frame::Frame;
use crate::miniresp::parse::Parse;

pub struct Conn {
stream: BufReader<TcpStream>,
// The buffer for reading frames.
buffer: BytesMut,
}

impl Conn {
pub fn new(stream: BufReader<TcpStream>) -> Conn {
Conn {
stream,
buffer: BytesMut::with_capacity(4 * 1024),
}
}

// read a command
pub async fn read_command(&mut self) -> Result<Command, anyhow::Error> {
// get one frame
let frame = self.read_frame().await.unwrap().unwrap();
// get the command
let mut parse = Parse::new(frame)?;
let command_name = parse.next_string().unwrap().to_lowercase();
let key = parse.next_string().unwrap();
let raw = self.buffer.clone().freeze();
Ok(Command::new(command_name, key, raw))
}

async fn read_frame(&mut self) -> crate::miniresp::Result<Option<Frame>> {
loop {
// Attempt to parse a frame from the buffered data. If enough data
// has been buffered, the frame is returned.
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}

// There is not enough buffered data to read a frame. Attempt to
// read more data from the socket.
//
// On success, the number of bytes is returned. `0` indicates "end
// of stream".
if 0 == self.stream.read_buf(&mut self.buffer).await? {
// The remote closed the connection. For this to be a clean
// shutdown, there should be no data in the read buffer. If
// there is, this means that the peer closed the socket while
// sending a frame.
if self.buffer.is_empty() {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
}
}
}

/// Tries to parse a frame from the buffer. If the buffer contains enough
/// data, the frame is returned and the data removed from the buffer. If not
/// enough data has been buffered yet, `Ok(None)` is returned. If the
/// buffered data does not represent a valid frame, `Err` is returned.
fn parse_frame(&mut self) -> crate::miniresp::Result<Option<Frame>> {
use crate::miniresp::frame::Error::Incomplete;

// Cursor is used to track the "current" location in the
// buffer. Cursor also implements `Buf` from the `bytes` crate
// which provides a number of helpful utilities for working
// with bytes.
let mut buf = Cursor::new(&self.buffer[..]);

// The first step is to check if enough data has been buffered to parse
// a single frame. This step is usually much faster than doing a full
// parse of the frame, and allows us to skip allocating data structures
// to hold the frame data unless we know the full frame has been
// received.
match Frame::check(&mut buf) {
Ok(_) => {
// The `check` function will have advanced the cursor until the
// end of the frame. Since the cursor had position set to zero
// before `Frame::check` was called, we obtain the length of the
// frame by checking the cursor position.
let len = buf.position() as usize;

// Reset the position to zero before passing the cursor to
// `Frame::parse`.
buf.set_position(0);

// Parse the frame from the buffer. This allocates the necessary
// structures to represent the frame and returns the frame
// value.
//
// If the encoded frame representation is invalid, an error is
// returned. This should terminate the **current** connection
// but should not impact any other connected client.
let frame = Frame::parse(&mut buf)?;

// Discard the parsed data from the read buffer.
//
// When `advance` is called on the read buffer, all of the data
// up to `len` is discarded. The details of how this works is
// left to `BytesMut`. This is often done by moving an internal
// cursor, but it may be done by reallocating and copying data.
self.buffer.advance(len);

// Return the parsed frame to the caller.
Ok(Some(frame))
}
// There is not enough data present in the read buffer to parse a
// single frame. We must wait for more data to be received from the
// socket. Reading from the socket will be done in the statement
// after this `match`.
//
// We do not want to return `Err` from here as this "error" is an
// expected runtime condition.
Err(Incomplete) => Ok(None),
// An error was encountered while parsing the frame. The connection
// is now in an invalid state. Returning `Err` from here will result
// in the connection being closed.
Err(e) => Err(e.into()),
}
}
}
Loading

0 comments on commit cbeb7b9

Please sign in to comment.