Skip to content

Commit fab3d68

Browse files
committed
test and fix broadcast transmission
1 parent 3bec49e commit fab3d68

File tree

8 files changed

+181
-28
lines changed

8 files changed

+181
-28
lines changed

crates/locutus-node/src/node/event_listener.rs

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,19 @@ impl<'a> EventLog<'a> {
6565
},
6666
*msg.id(),
6767
),
68+
Message::Put(PutMsg::Broadcasting {
69+
new_value,
70+
broadcast_to,
71+
key,
72+
..
73+
}) => EventKind::Put(
74+
PutEvent::BroadcastEmitted {
75+
broadcast_to: broadcast_to.clone(),
76+
key: key.clone(),
77+
value: new_value.clone(),
78+
},
79+
*msg.id(),
80+
),
6881
Message::Get(GetMsg::ReturnGet {
6982
key,
7083
value: StoreResponse { value: Some(_), .. },
@@ -134,6 +147,14 @@ enum PutEvent {
134147
/// value that was put
135148
value: ContractValue,
136149
},
150+
BroadcastEmitted {
151+
/// subscribed peers
152+
broadcast_to: Vec<PeerKeyLocation>,
153+
/// key of the contract which value was being updated
154+
key: ContractKey,
155+
/// value that was put
156+
value: ContractValue,
157+
},
137158
}
138159

139160
#[inline]
@@ -150,13 +171,16 @@ fn create_log(logs: &[MessageLog], log: EventLog) -> (MessageLog, ListenerLogId)
150171
.iter()
151172
.filter_map(|l| {
152173
if matches!(l, MessageLog { kind: EventKind::Put(_, id), .. } if incoming_tx == id ) {
153-
Some(&l.kind)
174+
match l.kind {
175+
EventKind::Put(PutEvent::BroadcastEmitted { .. }, _) => None,
176+
_ => Some(&l.kind),
177+
}
154178
} else {
155179
None
156180
}
157181
})
158182
.chain([&kind]);
159-
let kind = fuse_events_msg(find_put_ops).unwrap_or(kind);
183+
let kind = fuse_successful_put_op(find_put_ops).unwrap_or(kind);
160184

161185
let msg_log = MessageLog {
162186
ts: Instant::now(),
@@ -166,7 +190,7 @@ fn create_log(logs: &[MessageLog], log: EventLog) -> (MessageLog, ListenerLogId)
166190
(msg_log, log_id)
167191
}
168192

169-
fn fuse_events_msg<'a>(mut put_ops: impl Iterator<Item = &'a EventKind>) -> Option<EventKind> {
193+
fn fuse_successful_put_op<'a>(mut put_ops: impl Iterator<Item = &'a EventKind>) -> Option<EventKind> {
170194
let prev_msgs = [put_ops.next().cloned(), put_ops.next().cloned()];
171195
match prev_msgs {
172196
[Some(EventKind::Put(PutEvent::Request { performer, key }, id)), Some(EventKind::Put(PutEvent::PutSuccess { requester, value }, _))] => {
@@ -234,6 +258,17 @@ mod test_utils {
234258
})
235259
}
236260

261+
pub fn get_broadcast_count(
262+
&self,
263+
expected_key: &ContractKey,
264+
expected_value: &ContractValue,
265+
) -> usize {
266+
let mut logs = self.logs.read();
267+
logs.iter().filter(|log| {
268+
matches!(log.kind, EventKind::Put(PutEvent::BroadcastEmitted { ref key, ref value, .. }, ..) if key == expected_key && value == expected_value )
269+
}).count()
270+
}
271+
237272
pub fn has_got_contract(&self, peer: &PeerKey, expected_key: &ContractKey) -> bool {
238273
let logs = self.logs.read();
239274
logs.iter().any(|log| {

crates/locutus-node/src/node/in_memory.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1+
use std::collections::HashMap;
12
use std::sync::Arc;
23

34
use tokio::sync::mpsc::{self, Receiver};
45

56
use crate::contract::{
6-
Contract, ContractError, ContractHandlerEvent, ContractValue, SimStoreError,
7+
Contract, ContractError, ContractHandlerEvent, ContractKey, ContractValue, SimStoreError,
78
};
89
use crate::{
910
conn_manager::{in_memory::MemoryConnManager, ConnectionBridge, PeerKey},
@@ -84,6 +85,7 @@ where
8485
pub(crate) async fn append_contracts(
8586
&self,
8687
contracts: Vec<(Contract, ContractValue)>,
88+
contract_subscribers: HashMap<ContractKey, Vec<PeerKeyLocation>>,
8789
) -> Result<(), ContractError<CErr>> {
8890
for (contract, value) in contracts {
8991
let key = contract.key();
@@ -98,7 +100,14 @@ where
98100
key,
99101
self.op_storage.ring.peer_key
100102
);
101-
self.op_storage.ring.cached_contracts.insert(key);
103+
self.op_storage.ring.cached_contracts.insert(key.clone());
104+
105+
if let Some(subscribers) = contract_subscribers.get(&key) {
106+
// add contract subscribers
107+
for subscriber in subscribers {
108+
self.op_storage.ring.add_subscriber(key, *subscriber);
109+
}
110+
}
102111
}
103112
Ok(())
104113
}

crates/locutus-node/src/node/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ where
5656
}
5757

5858
#[cfg(not(test))]
59-
impl<CErr> Node<CErr>
59+
impl<CErr> Node<CErr>
6060
where
6161
CErr: std::error::Error + Send + Sync + 'static,
6262
{

crates/locutus-node/src/node/test_utils.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use rand::Rng;
1010
use tokio::sync::watch::{channel, Receiver, Sender};
1111

1212
use crate::contract::{Contract, ContractKey, ContractValue, SimStoreError};
13+
use crate::ring::PeerKeyLocation;
1314
use crate::user_events::UserEvent;
1415
use crate::{
1516
conn_manager::PeerKey,
@@ -57,10 +58,11 @@ pub(crate) type EventId = usize;
5758

5859
#[derive(Clone)]
5960
pub(crate) struct NodeSpecification {
60-
/// Pair of contract and the initial value
61+
/// Pair of contract, the initial value and contract subscribers
6162
pub owned_contracts: Vec<(Contract, ContractValue)>,
6263
pub non_owned_contracts: Vec<ContractKey>,
6364
pub events_to_generate: HashMap<EventId, UserEvent>,
65+
pub contract_subscribers: HashMap<ContractKey, Vec<PeerKeyLocation>>,
6466
}
6567

6668
#[derive(Clone)]
@@ -231,14 +233,33 @@ impl SimNetwork {
231233
self.labels.insert(label, peer.peer_key);
232234
tokio::spawn(async move {
233235
if let Some(specs) = node_specs {
234-
peer.append_contracts(specs.owned_contracts)
236+
peer.append_contracts(specs.owned_contracts, specs.contract_subscribers)
235237
.await
236238
.map_err(|_| anyhow::anyhow!("failed inserting test owned contracts"))?;
237239
}
238240
peer.listen_on(user_events).await
239241
});
240242
}
241243

244+
pub fn get_locations_by_node(&self) -> HashMap<String, PeerKeyLocation> {
245+
let mut locations_by_node: HashMap<String, PeerKeyLocation> = HashMap::new();
246+
247+
// Get node and gateways location by label
248+
for (node, label) in &self.nodes {
249+
locations_by_node.insert(
250+
label.to_string(),
251+
node.op_storage.ring.own_location().clone(),
252+
);
253+
}
254+
for (node, config) in &self.gateways {
255+
locations_by_node.insert(
256+
config.label.to_string(),
257+
node.op_storage.ring.own_location().clone(),
258+
);
259+
}
260+
locations_by_node
261+
}
262+
242263
pub fn connected(&self, peer: &str) -> bool {
243264
if let Some(key) = self.labels.get(peer) {
244265
self.event_listener.is_connected(key)
@@ -255,6 +276,10 @@ impl SimNetwork {
255276
}
256277
}
257278

279+
pub fn count_broadcasts(&self, key: &ContractKey, value: &ContractValue) -> usize {
280+
self.event_listener.get_broadcast_count(key, value)
281+
}
282+
258283
pub fn has_got_contract(&self, peer: &str, key: &ContractKey) -> bool {
259284
if let Some(pk) = self.labels.get(peer) {
260285
self.event_listener.has_got_contract(pk, key)

crates/locutus-node/src/operations/get.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,12 +724,14 @@ mod test {
724724
owned_contracts: vec![],
725725
non_owned_contracts: vec![key],
726726
events_to_generate: HashMap::from_iter([(1, get_event)]),
727+
contract_subscribers: HashMap::new(),
727728
};
728729

729730
let gw_0 = NodeSpecification {
730731
owned_contracts: vec![(contract, contract_val)],
731732
non_owned_contracts: vec![],
732733
events_to_generate: HashMap::new(),
734+
contract_subscribers: HashMap::new(),
733735
};
734736

735737
let get_specs = HashMap::from_iter([
@@ -769,6 +771,7 @@ mod test {
769771
owned_contracts: vec![],
770772
non_owned_contracts: vec![key],
771773
events_to_generate: HashMap::from_iter([(1, get_event)]),
774+
contract_subscribers: HashMap::new(),
772775
};
773776

774777
let get_specs = HashMap::from_iter([("node-1".to_string(), node_1)]);

0 commit comments

Comments
 (0)