Skip to content

Commit

Permalink
Updates bridge for RB integration (#4362)
Browse files Browse the repository at this point in the history
This sets up the bridge code for easier integration later with the RingBuffer code. 

One thing that I think can be thought a bit more about is whether we want both local and remote pumps to have the same storage settings or if they can/should be different.
  • Loading branch information
nyanzebra authored Feb 23, 2021
1 parent 9414793 commit 295cfed
Show file tree
Hide file tree
Showing 15 changed files with 199 additions and 195 deletions.
8 changes: 4 additions & 4 deletions mqtt/mqtt-bridge/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl Bridge<WakingMemoryStore> {

debug!("creating bridge {}...", settings.name());

let (local_pump, remote_pump) = Builder::default()
let (local_pump, remote_pump) = Builder::<WakingMemoryStore>::default()
.with_local(|pump| {
pump.with_config(MqttClientConfig::new(
system_address,
Expand Down Expand Up @@ -133,9 +133,9 @@ impl Bridge<RingBuffer> {
const BATCH_SIZE: usize = 10;

debug!("creating bridge {}...", settings.name());
let device_id = String::from(device_id);
let bridge_name = String::from(settings.name());

let (local_pump, remote_pump) = Builder::default()
let (local_pump, remote_pump) = Builder::<RingBuffer>::default()
.with_local(|pump| {
pump.with_config(MqttClientConfig::new(
system_address,
Expand All @@ -158,7 +158,7 @@ impl Bridge<RingBuffer> {
PublicationStore::new_ring_buffer(
NonZeroUsize::new(BATCH_SIZE).unwrap(),
&ring_buffer_settings,
device_id.clone(),
&bridge_name,
suffix,
)
})
Expand Down
12 changes: 8 additions & 4 deletions mqtt/mqtt-bridge/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::client::UpdateSubscriptionHandle;
use crate::{
bridge::BridgeError,
client::{Handled, MqttEventHandler},
persist::{PublicationStore, StreamWakeableState},
persist::{PersistError, PublicationStore, RingBufferError, StreamWakeableState},
pump::TopicMapperUpdates,
settings::TopicRule,
};
Expand Down Expand Up @@ -147,9 +147,13 @@ where

if let Some(publication) = forward_publication {
debug!("saving message to store");
self.store.push(&publication).map_err(BridgeError::Store)?;

return Ok(Handled::Fully);
return match self.store.push(&publication) {
Ok(_) => Ok(Handled::Fully),
// If we are full we are dropping the message on ground.
Err(PersistError::RingBuffer(RingBufferError::Full)) => Ok(Handled::Fully),
Err(err) => Err(BridgeError::Store(err)),
};
}
warn!("no topic matched");
}
Expand Down Expand Up @@ -263,7 +267,7 @@ mod tests {
let result = PublicationStore::new_ring_buffer(
BATCH_SIZE,
&RingBufferSettings::new(MAX_FILE_SIZE, dir_path, FLUSH_OPTIONS),
"test".to_owned(),
"test",
"local",
);
assert!(result.is_ok());
Expand Down
25 changes: 13 additions & 12 deletions mqtt/mqtt-bridge/src/persist/loader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
collections::VecDeque,
num::NonZeroUsize,
pin::Pin,
sync::Arc,
task::{Context, Poll},
Expand All @@ -9,7 +10,7 @@ use futures_util::stream::Stream;
use mqtt3::proto::Publication;
use parking_lot::Mutex;

use crate::persist::{waking_state::StreamWakeableState, Key, PersistError};
use crate::persist::{waking_state::StreamWakeableState, Key, PersistResult};

/// Pattern allows for the wrapping `MessageLoader` to be cloned and have non mutable methods
/// This facilitates sharing between multiple futures in a single threaded environment
Expand All @@ -32,20 +33,20 @@ impl<S> MessageLoader<S>
where
S: StreamWakeableState,
{
pub fn new(state: Arc<Mutex<S>>, batch_size: usize) -> Self {
pub fn new(state: Arc<Mutex<S>>, batch_size: NonZeroUsize) -> Self {
let batch = VecDeque::new();

let inner = MessageLoaderInner {
state,
batch,
batch_size,
batch_size: batch_size.get(),
};
let inner = Arc::new(Mutex::new(inner));

Self(inner)
}

fn next_batch(&mut self) -> Result<VecDeque<(Key, Publication)>, PersistError> {
fn next_batch(&mut self) -> PersistResult<VecDeque<(Key, Publication)>> {
let inner = self.0.lock();
let mut state_lock = inner.state.lock();
let batch = state_lock.batch(inner.batch_size)?;
Expand All @@ -64,7 +65,7 @@ impl<S> Stream for MessageLoader<S>
where
S: StreamWakeableState,
{
type Item = Result<(Key, Publication), PersistError>;
type Item = PersistResult<(Key, Publication)>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut inner = self.0.lock();
Expand Down Expand Up @@ -96,7 +97,7 @@ where
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use std::{num::NonZeroUsize, sync::Arc, time::Duration};

use bytes::Bytes;
use futures_util::{future::join, stream::TryStreamExt};
Expand Down Expand Up @@ -142,7 +143,7 @@ mod tests {
let _key2 = borrowed_state.insert(&pub2).unwrap();
}
// get batch size elements
let mut loader = MessageLoader::new(state, 1);
let mut loader = MessageLoader::new(state, NonZeroUsize::new(1).unwrap());
let mut elements = loader.next_batch().unwrap();

// verify
Expand Down Expand Up @@ -182,7 +183,7 @@ mod tests {
}

// get batch size elements
let mut loader = MessageLoader::new(state, BATCH_SIZE);
let mut loader = MessageLoader::new(state, NonZeroUsize::new(BATCH_SIZE).unwrap());
let mut elements = loader.next_batch().unwrap();

// verify
Expand Down Expand Up @@ -219,7 +220,7 @@ mod tests {
}

// verify insertion order
let mut loader = MessageLoader::new(state, num_elements);
let mut loader = MessageLoader::new(state, NonZeroUsize::new(num_elements).unwrap());
let mut elements = loader.next_batch().unwrap();

for key in keys {
Expand Down Expand Up @@ -257,7 +258,7 @@ mod tests {
key2 = borrowed_state.insert(&pub2).unwrap();
}
// get loader
let mut loader = MessageLoader::new(state.clone(), BATCH_SIZE);
let mut loader = MessageLoader::new(state.clone(), NonZeroUsize::new(BATCH_SIZE).unwrap());

// make sure same publications come out in correct order
let extracted1 = loader.try_next().await.unwrap().unwrap();
Expand Down Expand Up @@ -299,7 +300,7 @@ mod tests {
}

// get loader
let mut loader = MessageLoader::new(state.clone(), BATCH_SIZE);
let mut loader = MessageLoader::new(state.clone(), NonZeroUsize::new(BATCH_SIZE).unwrap());

// process inserted messages
loader.try_next().await.unwrap().unwrap();
Expand Down Expand Up @@ -346,7 +347,7 @@ mod tests {
};

// get loader
let mut loader = MessageLoader::new(state.clone(), BATCH_SIZE);
let mut loader = MessageLoader::new(state.clone(), NonZeroUsize::new(BATCH_SIZE).unwrap());

// async function that waits for a message to enter the state
let pub_copy = pub1.clone();
Expand Down
5 changes: 3 additions & 2 deletions mqtt/mqtt-bridge/src/persist/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use serde::{Deserialize, Serialize};

pub use loader::MessageLoader;
pub use publication_store::PublicationStore;
use waking_state::{memory::error::MemoryError, ring_buffer::error::RingBufferError};
use waking_state::memory::error::MemoryError;
pub use waking_state::{
memory::WakingMemoryStore, ring_buffer::flush::FlushOptions, ring_buffer::RingBuffer,
memory::WakingMemoryStore,
ring_buffer::{error::RingBufferError, flush::FlushOptions, RingBuffer},
StreamWakeableState,
};

Expand Down
6 changes: 3 additions & 3 deletions mqtt/mqtt-bridge/src/persist/publication_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ impl PublicationStore<RingBuffer> {
pub fn new_ring_buffer(
batch_size: NonZeroUsize,
ring_buffer_settings: &RingBufferSettings,
device_id: String,
bridge_name: &str,
suffix: &str,
) -> PersistResult<Self> {
let mut file_path = ring_buffer_settings.directory().clone();
file_path.push(device_id);
file_path.push(bridge_name);
file_path.push(suffix);
let max_file_size = ring_buffer_settings.max_file_size();
let flush_options = ring_buffer_settings.flush_options();
Expand All @@ -61,7 +61,7 @@ where
{
pub fn new(state: S, batch_size: NonZeroUsize) -> Self {
let state = Arc::new(Mutex::new(state));
let loader = MessageLoader::new(state.clone(), batch_size.get());
let loader = MessageLoader::new(state.clone(), batch_size);

let inner = PublicationStoreInner { state, loader };
let inner = Arc::new(Mutex::new(inner));
Expand Down
4 changes: 1 addition & 3 deletions mqtt/mqtt-bridge/src/persist/waking_state/memory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ use std::{cmp::min, collections::VecDeque, num::NonZeroUsize, task::Waker};
use mqtt3::proto::Publication;
use tracing::debug;

use crate::persist::{
waking_state::memory::error::MemoryError, Key, PersistResult, StreamWakeableState,
};
use crate::persist::{Key, MemoryError, PersistResult, StreamWakeableState};

pub mod error;
#[cfg(test)]
Expand Down
10 changes: 3 additions & 7 deletions mqtt/mqtt-bridge/src/persist/waking_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ use crate::persist::{Key, PersistResult};
pub mod memory;
pub mod ring_buffer;

// TODO: Currently rocksdb does not compile on musl.
// Once we fix compilation we can add this module back.
// If we decide fixing compilation is not efficient, we can reuse code in rocksdb.rs by substituting rocksdb wrapping abstraction.
// pub mod rocksdb;

/// Responsible for waking waiting streams when new elements are added.
/// Exposes a get method for retrieving a count of elements in order of insertion.
///
Expand All @@ -33,7 +28,7 @@ pub trait StreamWakeableState {

#[cfg(test)]
mod tests {
use std::{pin::Pin, sync::Arc, task::Context, task::Poll};
use std::{num::NonZeroUsize, pin::Pin, sync::Arc, task::Context, task::Poll};

use bytes::Bytes;
use futures_util::stream::{Stream, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -119,7 +114,8 @@ mod tests {

// extract some, check that they are in order
let state = Arc::new(Mutex::new(state));
let mut loader = MessageLoader::new(state.clone(), num_elements);
let mut loader =
MessageLoader::new(state.clone(), NonZeroUsize::new(num_elements).unwrap());
let (key1, _) = loader.try_next().await.unwrap().unwrap();
let (key2, _) = loader.try_next().await.unwrap().unwrap();
assert_eq!(key1, keys[0]);
Expand Down
4 changes: 2 additions & 2 deletions mqtt/mqtt-bridge/src/persist/waking_state/ring_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use std::{
time::{Duration, Instant},
};

use block::{calculate_crc_over_bytes, BlockHeaderV1};
use mqtt3::proto::Publication;

use bincode::Result as BincodeResult;
Expand All @@ -26,7 +25,8 @@ use crate::persist::{
waking_state::{
ring_buffer::{
block::{
validate, BlockHeaderWithCrc, BlockVersion, BLOCK_HINT, SERIALIZED_BLOCK_SIZE,
calculate_crc_over_bytes, validate, BlockHeaderV1, BlockHeaderWithCrc,
BlockVersion, BLOCK_HINT, SERIALIZED_BLOCK_SIZE,
},
error::{BlockError, RingBufferError},
flush::{FlushOptions, FlushState},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ impl StreamWakeableState for TestRingBuffer {
}

fn remove(&mut self, key: Key) -> PersistResult<()> {
#[allow(clippy::cast_possible_truncation)]
self.0.remove(key)
}

Expand Down
Loading

0 comments on commit 295cfed

Please sign in to comment.