Skip to content

Test and fix put broadcast #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 141 additions & 3 deletions crates/locutus-node/src/node/event_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,32 @@ impl<'a> EventLog<'a> {
},
*msg.id(),
),
Message::Put(PutMsg::Broadcasting {
new_value,
broadcast_to,
key,
..
}) => EventKind::Put(
PutEvent::BroadcastEmitted {
broadcast_to: broadcast_to.clone(),
key: key.clone(),
value: new_value.clone(),
},
*msg.id(),
),
Message::Put(PutMsg::BroadcastTo {
sender,
new_value,
key,
..
}) => EventKind::Put(
PutEvent::BroadcastReceived {
requester: sender.peer.clone(),
key: key.clone(),
value: new_value.clone(),
},
*msg.id(),
),
Message::Get(GetMsg::ReturnGet {
key,
value: StoreResponse { value: Some(_), .. },
Expand Down Expand Up @@ -134,6 +160,32 @@ enum PutEvent {
/// value that was put
value: ContractValue,
},
BroadcastEmitted {
/// subscribed peers
broadcast_to: Vec<PeerKeyLocation>,
/// key of the contract which value was being updated
key: ContractKey,
/// value that was put
value: ContractValue,
},
BroadcastReceived {
/// peer who started the broadcast op
requester: PeerKey,
/// key of the contract which value was being updated
key: ContractKey,
/// value that was put
value: ContractValue,
},
BroadcastComplete {
/// peer who performed the event
performer: PeerKey,
/// peer who started the broadcast op
requester: PeerKey,
/// key of the contract which value was being updated
key: ContractKey,
/// value that was put
value: ContractValue,
},
}

#[inline]
Expand All @@ -150,13 +202,16 @@ fn create_log(logs: &[MessageLog], log: EventLog) -> (MessageLog, ListenerLogId)
.iter()
.filter_map(|l| {
if matches!(l, MessageLog { kind: EventKind::Put(_, id), .. } if incoming_tx == id ) {
Some(&l.kind)
match l.kind {
EventKind::Put(PutEvent::BroadcastEmitted { .. }, _) => None,
_ => Some(&l.kind),
}
} else {
None
}
})
.chain([&kind]);
let kind = fuse_events_msg(find_put_ops).unwrap_or(kind);
let kind = fuse_successful_put_op(find_put_ops).unwrap_or(kind);

let msg_log = MessageLog {
ts: Instant::now(),
Expand All @@ -166,7 +221,9 @@ fn create_log(logs: &[MessageLog], log: EventLog) -> (MessageLog, ListenerLogId)
(msg_log, log_id)
}

fn fuse_events_msg<'a>(mut put_ops: impl Iterator<Item = &'a EventKind>) -> Option<EventKind> {
fn fuse_successful_put_op<'a>(
mut put_ops: impl Iterator<Item = &'a EventKind>,
) -> Option<EventKind> {
let prev_msgs = [put_ops.next().cloned(), put_ops.next().cloned()];
match prev_msgs {
[Some(EventKind::Put(PutEvent::Request { performer, key }, id)), Some(EventKind::Put(PutEvent::PutSuccess { requester, value }, _))] => {
Expand All @@ -189,6 +246,7 @@ mod test_utils {
use std::{collections::HashMap, sync::Arc};

use dashmap::DashMap;
use itertools::Itertools;
use parking_lot::RwLock;

use crate::{contract::ContractKey, message::TxType, ring::Distance};
Expand Down Expand Up @@ -234,6 +292,86 @@ mod test_utils {
})
}

pub fn get_broadcast_count(
&self,
expected_key: &ContractKey,
expected_value: &ContractValue,
) -> usize {
let mut logs = self.logs.read();
logs.iter().filter(|log| {
matches!(log.kind, EventKind::Put(PutEvent::BroadcastEmitted { ref key, ref value, .. }, ..) if key == expected_key && value == expected_value )
}).count()
}

pub fn has_broadcast_contract(
&self,
mut broadcast_pairs: Vec<(PeerKey, PeerKey)>,
expected_key: &ContractKey,
expected_value: &ContractValue,
) -> bool {
let logs = self.logs.read();
let mut broadcast_ops = logs.iter().filter_map(|l| {
if matches!(
l,
MessageLog {
kind: EventKind::Put(_, id),
..
}
) {
match l.kind {
EventKind::Put(PutEvent::BroadcastEmitted { .. }, _)
| EventKind::Put(PutEvent::BroadcastReceived { .. }, _) => Some(&l.kind),
_ => None,
}
} else {
None
}
});

let prev_msgs = [broadcast_ops.next().cloned(), broadcast_ops.next().cloned()];
let broadcast = match prev_msgs {
[Some(EventKind::Put(PutEvent::BroadcastEmitted { broadcast_to, .. }, id1)), Some(EventKind::Put(
PutEvent::BroadcastReceived {
requester,
key,
value,
},
id2,
))] => {
if id1 == id2 {
Some(EventKind::Put(
PutEvent::BroadcastComplete {
performer: broadcast_to.get(0).unwrap().peer,
requester,
key,
value,
},
id1,
))
} else {
None
}
}
_ => None,
};

match broadcast {
Some(EventKind::Put(
PutEvent::BroadcastComplete {
ref performer,
ref requester,
..
},
_,
)) => {
let expected_pair = (performer, requester);
broadcast_pairs.retain(|pair| matches!(pair, expected_pair));
!broadcast_pairs.is_empty()
}
_ => false,
}
}

pub fn has_got_contract(&self, peer: &PeerKey, expected_key: &ContractKey) -> bool {
let logs = self.logs.read();
logs.iter().any(|log| {
Expand Down
13 changes: 11 additions & 2 deletions crates/locutus-node/src/node/in_memory.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::collections::HashMap;
use std::sync::Arc;

use tokio::sync::mpsc::{self, Receiver};

use crate::contract::{
Contract, ContractError, ContractHandlerEvent, ContractValue, SimStoreError,
Contract, ContractError, ContractHandlerEvent, ContractKey, ContractValue, SimStoreError,
};
use crate::{
conn_manager::{in_memory::MemoryConnManager, ConnectionBridge, PeerKey},
Expand Down Expand Up @@ -84,6 +85,7 @@ where
pub(crate) async fn append_contracts(
&self,
contracts: Vec<(Contract, ContractValue)>,
contract_subscribers: HashMap<ContractKey, Vec<PeerKeyLocation>>,
) -> Result<(), ContractError<CErr>> {
for (contract, value) in contracts {
let key = contract.key();
Expand All @@ -98,7 +100,14 @@ where
key,
self.op_storage.ring.peer_key
);
self.op_storage.ring.cached_contracts.insert(key);
self.op_storage.ring.cached_contracts.insert(key.clone());

if let Some(subscribers) = contract_subscribers.get(&key) {
// add contract subscribers
for subscriber in subscribers {
self.op_storage.ring.add_subscriber(key, *subscriber);
}
}
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion crates/locutus-node/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ where
}

#[cfg(not(test))]
impl<CErr> Node<CErr>
impl<CErr> Node<CErr>
where
CErr: std::error::Error + Send + Sync + 'static,
{
Expand Down
47 changes: 46 additions & 1 deletion crates/locutus-node/src/node/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use rand::Rng;
use tokio::sync::watch::{channel, Receiver, Sender};

use crate::contract::{Contract, ContractKey, ContractValue, SimStoreError};
use crate::ring::PeerKeyLocation;
use crate::user_events::UserEvent;
use crate::{
conn_manager::PeerKey,
Expand Down Expand Up @@ -61,6 +62,7 @@ pub(crate) struct NodeSpecification {
pub owned_contracts: Vec<(Contract, ContractValue)>,
pub non_owned_contracts: Vec<ContractKey>,
pub events_to_generate: HashMap<EventId, UserEvent>,
pub contract_subscribers: HashMap<ContractKey, Vec<PeerKeyLocation>>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -231,14 +233,33 @@ impl SimNetwork {
self.labels.insert(label, peer.peer_key);
tokio::spawn(async move {
if let Some(specs) = node_specs {
peer.append_contracts(specs.owned_contracts)
peer.append_contracts(specs.owned_contracts, specs.contract_subscribers)
.await
.map_err(|_| anyhow::anyhow!("failed inserting test owned contracts"))?;
}
peer.listen_on(user_events).await
});
}

pub fn get_locations_by_node(&self) -> HashMap<String, PeerKeyLocation> {
let mut locations_by_node: HashMap<String, PeerKeyLocation> = HashMap::new();

// Get node and gateways location by label
for (node, label) in &self.nodes {
locations_by_node.insert(
label.to_string(),
node.op_storage.ring.own_location().clone(),
);
}
for (node, config) in &self.gateways {
locations_by_node.insert(
config.label.to_string(),
node.op_storage.ring.own_location().clone(),
);
}
locations_by_node
}

pub fn connected(&self, peer: &str) -> bool {
if let Some(key) = self.labels.get(peer) {
self.event_listener.is_connected(key)
Expand All @@ -255,6 +276,30 @@ impl SimNetwork {
}
}

pub fn has_broadcast_contract(
&self,
broadcast_pairs: Vec<(&str, &str)>,
key: &ContractKey,
value: &ContractValue,
) -> bool {
let peers = broadcast_pairs
.into_iter()
.step_by(2)
.map(
|(peer1, peer2)| match (self.labels.get(peer1), self.labels.get(peer2)) {
(Some(pk1), Some(pk2)) => (*pk1, *pk2),
_ => panic!("peer not found"),
},
)
.collect();
self.event_listener
.has_broadcast_contract(peers, key, value)
}

pub fn count_broadcasts(&self, key: &ContractKey, value: &ContractValue) -> usize {
self.event_listener.get_broadcast_count(key, value)
}

pub fn has_got_contract(&self, peer: &str, key: &ContractKey) -> bool {
if let Some(pk) = self.labels.get(peer) {
self.event_listener.has_got_contract(pk, key)
Expand Down
3 changes: 3 additions & 0 deletions crates/locutus-node/src/operations/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,12 +724,14 @@ mod test {
owned_contracts: vec![],
non_owned_contracts: vec![key],
events_to_generate: HashMap::from_iter([(1, get_event)]),
contract_subscribers: HashMap::new(),
};

let gw_0 = NodeSpecification {
owned_contracts: vec![(contract, contract_val)],
non_owned_contracts: vec![],
events_to_generate: HashMap::new(),
contract_subscribers: HashMap::new(),
};

let get_specs = HashMap::from_iter([
Expand Down Expand Up @@ -769,6 +771,7 @@ mod test {
owned_contracts: vec![],
non_owned_contracts: vec![key],
events_to_generate: HashMap::from_iter([(1, get_event)]),
contract_subscribers: HashMap::new(),
};

let get_specs = HashMap::from_iter([("node-1".to_string(), node_1)]);
Expand Down
Loading