Skip to content

Commit

Permalink
[mqtt] handle RPC sub requests (Azure#4602)
Browse files Browse the repository at this point in the history
For some reasons when EdgeHub starts up it make several concurrent attempts to subscribe to $iothub topics. It causes first attempt to succeed and the second one to fail with a timeout.

The changes to treat RPC subsctioption commands as individual even for the same topic.
  • Loading branch information
dmolokanov authored and damonbarry committed Apr 14, 2022
1 parent 7250d79 commit 6544515
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 31 deletions.
10 changes: 8 additions & 2 deletions mqtt/mqtt-bridge/src/upstream/events/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ impl RemoteUpstreamPumpEventHandler {
match self.remote_sub_handle.subscribe(subscribe_to).await {
Ok(_) => {
if let Some(existing) = self.subscriptions.insert(&topic_filter, command_id) {
warn!("duplicating sub request found for {}", existing);
warn!(
command_id = ?existing,
"duplicating sub request found for topic {}", topic_filter
);
}
}
Err(e) => {
Expand All @@ -105,7 +108,10 @@ impl RemoteUpstreamPumpEventHandler {
{
Ok(_) => {
if let Some(existing) = self.subscriptions.insert(&topic_filter, command_id) {
warn!("duplicating unsub request found for {}", existing);
warn!(
commands = ?existing,
"duplicating unsub request found for topic {}", topic_filter
);
}
}
Err(e) => {
Expand Down
72 changes: 67 additions & 5 deletions mqtt/mqtt-bridge/src/upstream/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use parking_lot::Mutex;
pub use remote::{RemoteRpcMqttEventHandler, RpcPumpHandle};

use std::{
collections::HashMap, fmt::Display, fmt::Formatter, fmt::Result as FmtResult, sync::Arc,
collections::{HashMap, VecDeque},
fmt::{Display, Formatter, Result as FmtResult},
sync::Arc,
};

use bson::doc;
Expand Down Expand Up @@ -110,17 +112,77 @@ impl Display for RpcCommand {
/// response comes back as `mqtt3::Event` type which handled with the event
/// handler.
#[derive(Debug, Clone, Default)]
pub struct RpcSubscriptions(Arc<Mutex<HashMap<String, CommandId>>>);
pub struct RpcSubscriptions(Arc<Mutex<HashMap<String, VecDeque<CommandId>>>>);

impl RpcSubscriptions {
/// Stores topic filter to command identifier mapping.
pub fn insert(&self, topic_filter: &str, id: CommandId) -> Option<CommandId> {
self.0.lock().insert(topic_filter.into(), id)
pub fn insert(&self, topic_filter: &str, id: CommandId) -> Option<Vec<CommandId>> {
let mut inner = self.0.lock();

let existing = inner
.get(topic_filter)
.map(|ids| ids.iter().cloned().collect());

inner.entry(topic_filter.into()).or_default().push_back(id);

existing
}

/// Removes topic filter to command identifier mapping and returns
/// `CommandId` if exists.
pub fn remove(&self, topic_filter: &str) -> Option<CommandId> {
self.0.lock().remove(topic_filter)
let mut inner = self.0.lock();

inner
.remove_entry(topic_filter)
.and_then(|(topic, mut existing)| {
let id = existing.pop_front();

if !existing.is_empty() {
inner.insert(topic, existing);
}

id
})
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn it_handles_rpc_subscriptions() {
let subs = RpcSubscriptions::default();
assert!(subs.0.lock().is_empty());

assert_eq!(subs.insert("topic/1", "1".into()), None);
assert_eq!(commands(&subs, "topic/1"), Some(vec!["1".into()]));
assert_eq!(subs.insert("topic/2", "2".into()), None);
assert_eq!(commands(&subs, "topic/2"), Some(vec!["2".into()]));
assert_eq!(subs.insert("topic/1", "3".into()), Some(vec!["1".into()]));
assert_eq!(
commands(&subs, "topic/1"),
Some(vec!["1".into(), "3".into()])
);

assert_eq!(subs.remove("topic/1"), Some("1".into()));
assert_eq!(commands(&subs, "topic/1"), Some(vec!["3".into()]));
assert_eq!(subs.remove("topic/1"), Some("3".into()));
assert_eq!(commands(&subs, "topic/1"), None);
assert_eq!(subs.remove("topic/1"), None);
assert_eq!(commands(&subs, "topic/1"), None);
assert_eq!(subs.remove("topic/2"), Some("2".into()));
assert_eq!(commands(&subs, "topic/2"), None);

assert_eq!(subs.insert("topic/1", "4".into()), None);
assert_eq!(commands(&subs, "topic/1"), Some(vec!["4".into()]));
}

fn commands(subs: &RpcSubscriptions, topic_filter: &str) -> Option<Vec<CommandId>> {
subs.0
.lock()
.get(topic_filter)
.map(|ids| ids.iter().cloned().collect())
}
}
9 changes: 5 additions & 4 deletions mqtt/mqtt-bridge/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,29 +72,30 @@ where
}

pub async fn setup_bridge_controller(
device_id: &str,
local_address: String,
upstream_address: String,
subs: Vec<Direction>,
storage_dir_override: &PathBuf,
) -> (BridgeControllerHandle, JoinHandle<()>) {
let credentials = Credentials::PlainText(AuthenticationSettings::new(
"bridge".into(),
device_id.into(),
format!("{}/edgehub", device_id),
"pass".into(),
"bridge".into(),
Some(CERTIFICATE.into()),
));

let settings = BridgeSettings::from_upstream_details(
upstream_address,
credentials,
subs,
true,
false,
Duration::from_secs(5),
storage_dir_override,
)
.unwrap();

let controller = BridgeController::new(local_address, "bridge".into(), settings);
let controller = BridgeController::new(local_address, device_id.into(), settings);
let controller_handle = controller.handle();
let controller: Box<dyn Sidecar + Send> = Box::new(controller);

Expand Down
9 changes: 9 additions & 0 deletions mqtt/mqtt-bridge/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ async fn send_message_upstream_downstream() {
let storage_dir_override = dir.path().to_path_buf();

let (controller_handle, controller_task) = common::setup_bridge_controller(
"edge-device-1",
local_server_handle.address(),
upstream_server_handle.tls_address().unwrap(),
subs,
Expand Down Expand Up @@ -168,6 +169,7 @@ async fn send_message_upstream_with_crash_is_lossless() {
let storage_dir_override = dir.path().to_path_buf();

let (controller_handle, controller_task) = common::setup_bridge_controller(
"edge-device-2",
local_server_handle.address(),
upstream_server_handle.tls_address().unwrap(),
subs.clone(),
Expand Down Expand Up @@ -218,6 +220,7 @@ async fn send_message_upstream_with_crash_is_lossless() {
.build();

let (controller_handle, controller_task) = common::setup_bridge_controller(
"edge-device-3",
local_server_handle.address(),
upstream_server_handle.tls_address().unwrap(),
subs.clone(),
Expand Down Expand Up @@ -263,6 +266,7 @@ async fn bridge_settings_update() {
let storage_dir_override = dir.path().to_path_buf();

let (mut controller_handle, controller_task) = common::setup_bridge_controller(
"edge-device-4",
local_server_handle.address(),
upstream_server_handle.tls_address().unwrap(),
vec![],
Expand Down Expand Up @@ -384,6 +388,7 @@ async fn subscribe_to_upstream_rejected_should_retry() {
let storage_dir_override = dir.path().to_path_buf();

let (controller_handle, controller_task) = common::setup_bridge_controller(
"edge-device-5",
local_server_handle.address(),
upstream_server_handle.tls_address().unwrap(),
subs,
Expand Down Expand Up @@ -474,6 +479,7 @@ async fn connect_to_upstream_failure_should_retry() {
let storage_dir_override = dir.path().to_path_buf();

let (controller_handle, controller_task) = common::setup_bridge_controller(
"edge-device-6",
local_server_handle.address(),
upstream_tls_address.clone(),
subs,
Expand Down Expand Up @@ -560,6 +566,7 @@ async fn bridge_forwards_messages_after_restart() {
let storage_dir_override = dir.path().to_path_buf();

let (controller_handle, controller_task) = common::setup_bridge_controller(
"edge-device-7",
local_server_handle.address(),
upstream_server_handle.tls_address().unwrap(),
subs.clone(),
Expand Down Expand Up @@ -621,6 +628,7 @@ async fn bridge_forwards_messages_after_restart() {

// restart bridge
let (controller_handle, controller_task) = common::setup_bridge_controller(
"edge-device-8",
local_server_handle.address(),
upstream_server_handle.tls_address().unwrap(),
subs,
Expand Down Expand Up @@ -688,6 +696,7 @@ async fn recreate_upstream_bridge_when_fails() {
let storage_dir_override = dir.path().to_path_buf();

let (mut controller_handle, _) = common::setup_bridge_controller(
"edge-device-9",
local_server_handle.address(),
upstream_server_handle.tls_address().unwrap(),
subs,
Expand Down
69 changes: 67 additions & 2 deletions mqtt/mqtt-bridge/tests/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
mod common;

use std::time::Duration;

use bson::{doc, spec::BinarySubtype};
use bytes::Bytes;
use futures_util::StreamExt;
use matches::assert_matches;
use tokio::time;

use mqtt3::{
proto::{ClientId, QoS},
Expand All @@ -21,16 +24,22 @@ async fn get_twin_update_via_rpc() {
let storage_dir_override = dir.path().to_path_buf();

let (controller_handle, controller_task) = common::setup_bridge_controller(
"edge-device-1",
local_server_handle.address(),
upstream_server_handle.tls_address().unwrap(),
Vec::new(),
&storage_dir_override,
)
.await;

// wait for bridge controller subscribed to all required topics
time::delay_for(Duration::from_millis(100)).await;

// connect to the remote broker to emulate upstream interaction
let mut upstream = TestClientBuilder::new(upstream_server_handle.address())
.with_client_id(ClientId::IdWithExistingSession("upstream".into()))
.with_client_id(ClientId::IdWithExistingSession(
"edge-device-1/upstream/$bridge".into(),
))
.build();
upstream
.subscribe("$iothub/+/twin/get/#", QoS::AtLeastOnce)
Expand All @@ -39,7 +48,9 @@ async fn get_twin_update_via_rpc() {

// connect to the local broker with eh-core client
let mut edgehub = TestClientBuilder::new(local_server_handle.address())
.with_client_id(ClientId::IdWithExistingSession("edgehub".into()))
.with_client_id(ClientId::IdWithExistingSession(
"edge-device-1/edgehub/$bridge".into(),
))
.build();

// edgehub subscribes to any downstream topic command acknowledgement
Expand Down Expand Up @@ -91,6 +102,60 @@ async fn get_twin_update_via_rpc() {
upstream.shutdown().await;
}

#[tokio::test]
async fn handle_rpc_subscription_duplicates() {
let (mut local_server_handle, _, mut upstream_server_handle, _) =
common::setup_brokers(AllowAll, AllowAll);

let dir = tempfile::tempdir().expect("Failed to create temp dir");
let storage_dir_override = dir.path().to_path_buf();

let (controller_handle, controller_task) = common::setup_bridge_controller(
"edge-device-2",
local_server_handle.address(),
upstream_server_handle.tls_address().unwrap(),
Vec::new(),
&storage_dir_override,
)
.await;

// wait for bridge controller subscribed to all required topics
time::delay_for(Duration::from_millis(100)).await;

// connect to the local broker with eh-core client
let mut edgehub = TestClientBuilder::new(local_server_handle.address())
.with_client_id(ClientId::IdWithExistingSession(
"edge-device-2/edgehub/$bridge".into(),
))
.build();

// edgehub subscribes to any downstream topic command acknowledgement
edgehub.subscribe("$downstream/#", QoS::AtLeastOnce).await;
assert!(edgehub.subscriptions().next().await.is_some());

// edgehub subscribes to twin response #1
let payload = command("sub", "$iothub/device-1/twin/res/#", None);
edgehub
.publish_qos1("$upstream/rpc/11", payload, false)
.await;

// edgehub subscribes to twin response #2
let payload = command("sub", "$iothub/device-1/twin/res/#", None);
edgehub
.publish_qos1("$upstream/rpc/12", payload, false)
.await;

assert_matches!(edgehub.publications().next().await, Some(ReceivedPublication {topic_name, ..}) if topic_name == "$downstream/rpc/ack/11");
assert_matches!(edgehub.publications().next().await, Some(ReceivedPublication {topic_name, ..}) if topic_name == "$downstream/rpc/ack/12");

controller_handle.shutdown();
controller_task.await.expect("controller task");

local_server_handle.shutdown().await;
upstream_server_handle.shutdown().await;
edgehub.shutdown().await;
}

fn command(cmd: &str, topic: &str, payload: Option<Vec<u8>>) -> Bytes {
let mut command = doc! {
"version": "v1",
Expand Down
12 changes: 6 additions & 6 deletions mqtt/mqtt-broker/benches/dispatch_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@
use std::{
collections::HashSet,
fmt::{Display, Formatter, Result as FmtResult},
iter::FromIterator,
net::SocketAddr,
sync::{Arc, Mutex},
time::Duration,
};

use bytes::Bytes;
use criterion::{
criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup, Criterion,
};
Expand Down Expand Up @@ -120,7 +118,7 @@ fn scenarios() -> (Vec<(proto::QoS, Size)>, Vec<usize>) {
let qoses = vec![proto::QoS::AtMostOnce, proto::QoS::AtLeastOnce];
let clients = vec![1, 10, 50, 100];

(Vec::from_iter(iproduct!(qoses, sizes)), clients)
(iproduct!(qoses, sizes).collect(), clients)
}

fn dispatch_messages(
Expand All @@ -137,7 +135,7 @@ fn dispatch_messages(
let (on_publish_tx, mut on_publish_rx) = mpsc::unbounded_channel();
broker.on_publish = Some(on_publish_tx);

let mut broker_handle = broker.handle();
let broker_handle = broker.handle();

let broker_task = runtime.spawn(broker.run());

Expand Down Expand Up @@ -224,7 +222,7 @@ impl Client {
async fn connect(client_id: Id, broker_handle: BrokerHandle) -> Self {
let (tx, rx) = mpsc::unbounded_channel();

let mut client = Self {
let client = Self {
id: client_id,
broker_handle,
rx,
Expand Down Expand Up @@ -369,7 +367,9 @@ impl PublishHandle {
packet_identifier_dup_qos: packet_id,
retain: false,
topic_name,
payload: Bytes::from_iter((0..payload_size.into()).map(|_| rand::random::<u8>())),
payload: (0..payload_size.into())
.map(|_| rand::random::<u8>())
.collect(),
}
}
}
Expand Down
Loading

0 comments on commit 6544515

Please sign in to comment.