Skip to content

Superstream: Create/Delete superstream and Partition and route commands #230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM rabbitmq:3.13-rc-management
FROM rabbitmq:4.0.1-management

COPY .ci/conf/rabbitmq.conf /etc/rabbitmq/rabbitmq.conf
COPY .ci/conf/enabled_plugins /etc/rabbitmq/enabled_plugins

COPY .ci/certs /etc/rabbitmq/certs
COPY .ci/certs /etc/rabbitmq/certs
100 changes: 100 additions & 0 deletions protocol/src/commands/create_super_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use std::collections::HashMap;
use std::io::Write;

#[cfg(test)]
use fake::Fake;

use crate::{
codec::{Decoder, Encoder},
error::{DecodeError, EncodeError},
protocol::commands::COMMAND_CREATE_SUPER_STREAM,
};

use super::Command;

#[cfg_attr(test, derive(fake::Dummy))]
#[derive(PartialEq, Eq, Debug)]
pub struct CreateSuperStreamCommand {
correlation_id: u32,
super_stream_name: String,
partitions: Vec<String>,
binding_keys: Vec<String>,
args: HashMap<String, String>,
}

impl CreateSuperStreamCommand {
pub fn new(
correlation_id: u32,
super_stream_name: String,
partitions: Vec<String>,
binding_keys: Vec<String>,
args: HashMap<String, String>,
) -> Self {
Self {
correlation_id,
super_stream_name,
partitions,
binding_keys,
args,
}
}
}

impl Encoder for CreateSuperStreamCommand {
fn encoded_size(&self) -> u32 {
self.correlation_id.encoded_size()
+ self.super_stream_name.as_str().encoded_size()
+ self.partitions.encoded_size()
+ self.binding_keys.encoded_size()
+ self.args.encoded_size()
}

fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
self.correlation_id.encode(writer)?;
self.super_stream_name.as_str().encode(writer)?;
self.partitions.encode(writer)?;
self.binding_keys.encode(writer)?;
self.args.encode(writer)?;
Ok(())
}
}

impl Decoder for CreateSuperStreamCommand {
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
let (input, correlation_id) = u32::decode(input)?;
let (input, super_stream_name) = Option::decode(input)?;
let (input, partitions) = <Vec<String>>::decode(input)?;
let (input, binding_keys) = <Vec<String>>::decode(input)?;
let (input, args) = HashMap::decode(input)?;

Ok((
input,
CreateSuperStreamCommand {
correlation_id,
super_stream_name: super_stream_name.unwrap(),
partitions,
binding_keys,
args,
},
))
}
}

impl Command for CreateSuperStreamCommand {
fn key(&self) -> u16 {
COMMAND_CREATE_SUPER_STREAM
}
}

#[cfg(test)]
mod tests {

use crate::commands::tests::command_encode_decode_test;

use super::CreateSuperStreamCommand;

#[test]
fn create_super_stream_request_test() {
command_encode_decode_test::<CreateSuperStreamCommand>();
}
}
74 changes: 74 additions & 0 deletions protocol/src/commands/delete_super_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use std::io::Write;

#[cfg(test)]
use fake::Fake;

use crate::{
codec::{Decoder, Encoder},
error::{DecodeError, EncodeError},
protocol::commands::COMMAND_DELETE_SUPER_STREAM,
};

use super::Command;

#[cfg_attr(test, derive(fake::Dummy))]
#[derive(PartialEq, Eq, Debug)]
pub struct DeleteSuperStreamCommand {
correlation_id: u32,
super_stream_name: String,
}

impl DeleteSuperStreamCommand {
pub fn new(correlation_id: u32, super_stream_name: String) -> Self {
Self {
correlation_id,
super_stream_name,
}
}
}

impl Encoder for DeleteSuperStreamCommand {
fn encoded_size(&self) -> u32 {
self.correlation_id.encoded_size() + self.super_stream_name.as_str().encoded_size()
}

fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
self.correlation_id.encode(writer)?;
self.super_stream_name.as_str().encode(writer)?;
Ok(())
}
}

impl Decoder for DeleteSuperStreamCommand {
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
let (input, correlation_id) = u32::decode(input)?;
let (input, super_stream_name) = Option::decode(input)?;

Ok((
input,
DeleteSuperStreamCommand {
correlation_id,
super_stream_name: super_stream_name.unwrap(),
},
))
}
}

impl Command for DeleteSuperStreamCommand {
fn key(&self) -> u16 {
COMMAND_DELETE_SUPER_STREAM
}
}

#[cfg(test)]
mod tests {
use crate::commands::create_super_stream::CreateSuperStreamCommand;
use crate::commands::tests::command_encode_decode_test;

use super::DeleteSuperStreamCommand;

#[test]
fn delete_super_stream_request_test() {
command_encode_decode_test::<DeleteSuperStreamCommand>();
}
}
4 changes: 4 additions & 0 deletions protocol/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use crate::protocol::version::PROTOCOL_VERSION;

pub mod close;
pub mod create_stream;
pub mod create_super_stream;
pub mod credit;
pub mod declare_publisher;
pub mod delete;
pub mod delete_publisher;
pub mod delete_super_stream;
pub mod deliver;
pub mod exchange_command_versions;
pub mod generic;
Expand All @@ -23,6 +25,8 @@ pub mod sasl_authenticate;
pub mod sasl_handshake;
pub mod store_offset;
pub mod subscribe;
pub mod superstream_partitions;
pub mod superstream_route;
pub mod tune;
pub mod unsubscribe;

Expand Down
144 changes: 144 additions & 0 deletions protocol/src/commands/superstream_partitions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use std::io::Write;

#[cfg(test)]
use fake::Fake;

use super::Command;
use crate::{
codec::{Decoder, Encoder},
error::{DecodeError, EncodeError},
protocol::commands::COMMAND_PARTITIONS,
FromResponse, ResponseCode,
};

#[cfg_attr(test, derive(fake::Dummy))]
#[derive(PartialEq, Eq, Debug)]
pub struct SuperStreamPartitionsRequest {
correlation_id: u32,
super_stream: String,
}

impl SuperStreamPartitionsRequest {
pub fn new(correlation_id: u32, super_stream: String) -> Self {
Self {
correlation_id,
super_stream,
}
}
}

impl Encoder for SuperStreamPartitionsRequest {
fn encoded_size(&self) -> u32 {
self.correlation_id.encoded_size() + self.super_stream.as_str().encoded_size()
}

fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
self.correlation_id.encode(writer)?;
self.super_stream.as_str().encode(writer)?;
Ok(())
}
}

impl Decoder for SuperStreamPartitionsRequest {
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
let (input, correlation_id) = u32::decode(input)?;
let (input, super_stream) = Option::decode(input)?;

Ok((
input,
SuperStreamPartitionsRequest {
correlation_id,
super_stream: super_stream.unwrap(),
},
))
}
}

impl Command for SuperStreamPartitionsRequest {
fn key(&self) -> u16 {
COMMAND_PARTITIONS
}
}

#[cfg_attr(test, derive(fake::Dummy))]
#[derive(PartialEq, Eq, Debug)]
pub struct SuperStreamPartitionsResponse {
pub(crate) correlation_id: u32,
response_code: ResponseCode,
pub streams: Vec<String>,
}

impl SuperStreamPartitionsResponse {
pub fn new(correlation_id: u32, streams: Vec<String>, response_code: ResponseCode) -> Self {
Self {
correlation_id,
response_code,
streams,
}
}
pub fn is_ok(&self) -> bool {
self.response_code == ResponseCode::Ok
}

Check warning on line 81 in protocol/src/commands/superstream_partitions.rs

View check run for this annotation

Codecov / codecov/patch

protocol/src/commands/superstream_partitions.rs#L72-L81

Added lines #L72 - L81 were not covered by tests
}

impl Encoder for SuperStreamPartitionsResponse {
fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
self.correlation_id.encode(writer)?;
self.response_code.encode(writer)?;
self.streams.encode(writer)?;
Ok(())
}

fn encoded_size(&self) -> u32 {
self.correlation_id.encoded_size()
+ self.response_code.encoded_size()
+ self.streams.encoded_size()
}
}

impl Decoder for SuperStreamPartitionsResponse {
fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
let (input, correlation_id) = u32::decode(input)?;
let (input, response_code) = ResponseCode::decode(input)?;
let (input, streams) = <Vec<String>>::decode(input)?;

Ok((
input,
SuperStreamPartitionsResponse {
correlation_id,
response_code,
streams,
},
))
}
}

impl FromResponse for SuperStreamPartitionsResponse {
fn from_response(response: crate::Response) -> Option<Self> {
match response.kind {
crate::ResponseKind::SuperStreamPartitions(partitions_response) => {
Some(partitions_response)
}
_ => None,

Check warning on line 122 in protocol/src/commands/superstream_partitions.rs

View check run for this annotation

Codecov / codecov/patch

protocol/src/commands/superstream_partitions.rs#L122

Added line #L122 was not covered by tests
}
}
}

#[cfg(test)]
mod tests {

use crate::commands::tests::command_encode_decode_test;

use super::SuperStreamPartitionsRequest;
use super::SuperStreamPartitionsResponse;

#[test]
fn super_stream_partition_request_test() {
command_encode_decode_test::<SuperStreamPartitionsRequest>();
}

#[test]
fn super_stream_partition_response_test() {
command_encode_decode_test::<SuperStreamPartitionsResponse>();
}
}
Loading
Loading