Skip to content

Implementing create and delete superstream commands #229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions protocol/src/commands/create_super_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use std::collections::HashMap;
use std::io::Write;

#[cfg(test)]
use fake::Fake;

use crate::{
codec::{Decoder, Encoder},
error::{DecodeError, EncodeError},
protocol::commands::COMMAND_CREATE_SUPER_STREAM,
};

use super::Command;

#[cfg_attr(test, derive(fake::Dummy))]
#[derive(PartialEq, Eq, Debug)]
pub struct CreateSuperStreamCommand {
correlation_id: u32,
super_stream_name: String,
partitions: Vec<String>,
binding_keys: Vec<String>,
args: HashMap<String, String>,
}

impl CreateSuperStreamCommand {
pub fn new(
correlation_id: u32,
super_stream_name: String,
partitions: Vec<String>,
binding_keys: Vec<String>,
args: HashMap<String, String>,
) -> Self {
Self {
correlation_id,
super_stream_name,
partitions,
binding_keys,
args,
}
}
}

impl Encoder for CreateSuperStreamCommand {
fn encoded_size(&self) -> u32 {
self.correlation_id.encoded_size()
+ self.super_stream_name.as_str().encoded_size()
+ self.partitions.encoded_size()
+ self.binding_keys.encoded_size()
+ self.args.encoded_size()
}

fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
self.correlation_id.encode(writer)?;
self.super_stream_name.as_str().encode(writer)?;
self.partitions.encode(writer)?;
self.binding_keys.encode(writer)?;
self.args.encode(writer)?;
Ok(())
}
}

impl Decoder for CreateSuperStreamCommand {
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
let (input, correlation_id) = u32::decode(input)?;
let (input, super_stream_name) = Option::decode(input)?;
let (input, partitions) = <Vec<String>>::decode(input)?;
let (input, binding_keys) = <Vec<String>>::decode(input)?;
let (input, args) = HashMap::decode(input)?;

Ok((
input,
CreateSuperStreamCommand {
correlation_id,
super_stream_name: super_stream_name.unwrap(),
partitions,
binding_keys,
args,
},
))
}
}

impl Command for CreateSuperStreamCommand {
fn key(&self) -> u16 {
COMMAND_CREATE_SUPER_STREAM
}
}

#[cfg(test)]
mod tests {
use crate::commands::create_stream::CreateStreamCommand;
use crate::commands::tests::command_encode_decode_test;

use super::CreateSuperStreamCommand;

#[test]
fn create_super_stream_request_test() {
command_encode_decode_test::<CreateSuperStreamCommand>();
}
}
74 changes: 74 additions & 0 deletions protocol/src/commands/delete_super_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use std::io::Write;

#[cfg(test)]
use fake::Fake;

use crate::{
codec::{Decoder, Encoder},
error::{DecodeError, EncodeError},
protocol::commands::COMMAND_DELETE_SUPER_STREAM,
};

use super::Command;

#[cfg_attr(test, derive(fake::Dummy))]
#[derive(PartialEq, Eq, Debug)]
pub struct DeleteSuperStreamCommand {
correlation_id: u32,
super_stream_name: String,
}

impl DeleteSuperStreamCommand {
pub fn new(correlation_id: u32, super_stream_name: String) -> Self {
Self {
correlation_id,
super_stream_name,
}
}
}

impl Encoder for DeleteSuperStreamCommand {
fn encoded_size(&self) -> u32 {
self.correlation_id.encoded_size() + self.super_stream_name.as_str().encoded_size()
}

fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
self.correlation_id.encode(writer)?;
self.super_stream_name.as_str().encode(writer)?;
Ok(())
}
}

impl Decoder for DeleteSuperStreamCommand {
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
let (input, correlation_id) = u32::decode(input)?;
let (input, super_stream_name) = Option::decode(input)?;

Ok((
input,
DeleteSuperStreamCommand {
correlation_id,
super_stream_name: super_stream_name.unwrap(),
},
))
}
}

impl Command for DeleteSuperStreamCommand {
fn key(&self) -> u16 {
COMMAND_DELETE_SUPER_STREAM
}
}

#[cfg(test)]
mod tests {
use crate::commands::create_super_stream::CreateSuperStreamCommand;
use crate::commands::tests::command_encode_decode_test;

use super::DeleteSuperStreamCommand;

#[test]
fn delete_super_stream_request_test() {
command_encode_decode_test::<DeleteSuperStreamCommand>();
}
}
2 changes: 2 additions & 0 deletions protocol/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use crate::protocol::version::PROTOCOL_VERSION;

pub mod close;
pub mod create_stream;
pub mod create_super_stream;
pub mod credit;
pub mod declare_publisher;
pub mod delete;
pub mod delete_publisher;
pub mod delete_super_stream;
pub mod deliver;
pub mod exchange_command_versions;
pub mod generic;
Expand Down
2 changes: 2 additions & 0 deletions protocol/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub mod commands {
pub const COMMAND_CONSUMER_UPDATE: u16 = 26;
pub const COMMAND_EXCHANGE_COMMAND_VERSIONS: u16 = 27;
pub const COMMAND_STREAMS_STATS: u16 = 28;
pub const COMMAND_CREATE_SUPER_STREAM: u16 = 29;
pub const COMMAND_DELETE_SUPER_STREAM: u16 = 30;
}

// server responses
Expand Down
43 changes: 38 additions & 5 deletions protocol/src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use std::io::Write;
use crate::{
codec::{decoder::read_u32, Decoder, Encoder},
commands::{
close::CloseRequest, create_stream::CreateStreamCommand, credit::CreditCommand,
close::CloseRequest, create_stream::CreateStreamCommand,
create_super_stream::CreateSuperStreamCommand, credit::CreditCommand,
declare_publisher::DeclarePublisherCommand, delete::Delete,
delete_publisher::DeletePublisherCommand,
delete_publisher::DeletePublisherCommand, delete_super_stream::DeleteSuperStreamCommand,
exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand,
metadata::MetadataCommand, open::OpenCommand, peer_properties::PeerPropertiesCommand,
publish::PublishCommand, query_offset::QueryOffsetRequest,
Expand All @@ -20,6 +21,7 @@ use crate::{
};

use byteorder::{BigEndian, WriteBytesExt};

mod shims;
#[derive(Debug, PartialEq, Eq)]
pub struct Request {
Expand Down Expand Up @@ -60,6 +62,8 @@ pub enum RequestKind {
StoreOffset(StoreOffset),
Unsubscribe(UnSubscribeCommand),
ExchangeCommandVersions(ExchangeCommandVersionsRequest),
CreateSuperStream(CreateSuperStreamCommand),
DeleteSuperStream(DeleteSuperStreamCommand),
}

impl Encoder for RequestKind {
Expand Down Expand Up @@ -87,6 +91,12 @@ impl Encoder for RequestKind {
RequestKind::ExchangeCommandVersions(exchange_command_versions) => {
exchange_command_versions.encoded_size()
}
RequestKind::CreateSuperStream(create_super_stream) => {
create_super_stream.encoded_size()
}
RequestKind::DeleteSuperStream(delete_super_stream) => {
delete_super_stream.encoded_size()
}
}
}

Expand Down Expand Up @@ -114,6 +124,12 @@ impl Encoder for RequestKind {
RequestKind::ExchangeCommandVersions(exchange_command_versions) => {
exchange_command_versions.encode(writer)
}
RequestKind::CreateSuperStream(create_super_stream) => {
create_super_stream.encode(writer)
}
RequestKind::DeleteSuperStream(delete_super_stream) => {
delete_super_stream.encode(writer)
}
}
}
}
Expand Down Expand Up @@ -182,6 +198,12 @@ impl Decoder for Request {
COMMAND_EXCHANGE_COMMAND_VERSIONS => {
ExchangeCommandVersionsRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
}
COMMAND_CREATE_SUPER_STREAM => {
CreateSuperStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
}
COMMAND_DELETE_SUPER_STREAM => {
DeleteSuperStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
}
n => return Err(DecodeError::UnsupportedResponseType(n)),
};
Ok((input, Request { header, kind: cmd }))
Expand All @@ -194,9 +216,11 @@ mod tests {
use crate::{
codec::{Decoder, Encoder},
commands::{
close::CloseRequest, create_stream::CreateStreamCommand, credit::CreditCommand,
close::CloseRequest, create_stream::CreateStreamCommand,
create_super_stream::CreateSuperStreamCommand, credit::CreditCommand,
declare_publisher::DeclarePublisherCommand, delete::Delete,
delete_publisher::DeletePublisherCommand,
delete_super_stream::DeleteSuperStreamCommand,
exchange_command_versions::ExchangeCommandVersionsRequest,
heart_beat::HeartBeatCommand, metadata::MetadataCommand, open::OpenCommand,
peer_properties::PeerPropertiesCommand, publish::PublishCommand,
Expand All @@ -209,9 +233,8 @@ mod tests {

use std::fmt::Debug;

use fake::{Dummy, Fake, Faker};

use super::Request;
use fake::{Dummy, Fake, Faker};

#[test]
fn request_open_test() {
Expand Down Expand Up @@ -324,4 +347,14 @@ mod tests {
fn request_exchange_command_versions_test() {
request_encode_decode_test::<ExchangeCommandVersionsRequest>()
}

#[test]
fn request_create_super_stream_test() {
request_encode_decode_test::<CreateSuperStreamCommand>()
}

#[test]
fn request_delete_super_stream_test() {
request_encode_decode_test::<DeleteSuperStreamCommand>()
}
}
15 changes: 15 additions & 0 deletions protocol/src/request/shims.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::commands::create_super_stream::CreateSuperStreamCommand;
use crate::commands::delete_super_stream::DeleteSuperStreamCommand;
use crate::{
commands::{
close::CloseRequest, create_stream::CreateStreamCommand, credit::CreditCommand,
Expand All @@ -14,6 +16,7 @@ use crate::{
types::Header,
Request, RequestKind,
};

impl<T> From<T> for Request
where
T: Into<RequestKind> + Command,
Expand Down Expand Up @@ -135,3 +138,15 @@ impl From<ExchangeCommandVersionsRequest> for RequestKind {
RequestKind::ExchangeCommandVersions(cmd)
}
}

impl From<CreateSuperStreamCommand> for RequestKind {
fn from(cmd: CreateSuperStreamCommand) -> Self {
RequestKind::CreateSuperStream(cmd)
}
}

impl From<DeleteSuperStreamCommand> for RequestKind {
fn from(cmd: DeleteSuperStreamCommand) -> Self {
RequestKind::DeleteSuperStream(cmd)
}
}
2 changes: 2 additions & 0 deletions protocol/src/response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ impl Decoder for Response {
| COMMAND_SUBSCRIBE
| COMMAND_UNSUBSCRIBE
| COMMAND_CREATE_STREAM
| COMMAND_CREATE_SUPER_STREAM
| COMMAND_DELETE_SUPER_STREAM
| COMMAND_DELETE_STREAM => {
GenericResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Generic(kind)))?
}
Expand Down
Loading
Loading