Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
cleanup multi node test
Browse files Browse the repository at this point in the history
  • Loading branch information
aeyakovenko authored and garious committed May 23, 2018
1 parent 2a0095e commit f3c4acc
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 60 deletions.
1 change: 0 additions & 1 deletion src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,5 +816,4 @@ mod test {
t.join().unwrap();
}
}

}
111 changes: 52 additions & 59 deletions src/thin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,21 +295,21 @@ mod tests {
t.join().unwrap();
}
}
fn replicant(
fn validator(
leader: &ReplicatedData,
exit: Arc<AtomicBool>,
alice: &Mint,
threads: &mut Vec<JoinHandle<()>>,
) {
let replicant = TestNode::new();
let validator = TestNode::new();
let replicant_bank = Bank::new(&alice);
let mut ts = Server::new_validator(
replicant_bank,
replicant.data.clone(),
replicant.sockets.requests,
replicant.sockets.respond,
replicant.sockets.replicate,
replicant.sockets.gossip,
validator.data.clone(),
validator.sockets.requests,
validator.sockets.respond,
validator.sockets.replicate,
validator.sockets.gossip,
leader.clone(),
exit.clone(),
);
Expand Down Expand Up @@ -354,7 +354,7 @@ mod tests {
assert!(converged);
threads.push(t_spy_listen);
threads.push(t_spy_gossip);
let ret: Vec<_> = spy_ref
let v: Vec<ReplicatedData> = spy_ref
.read()
.unwrap()
.table
Expand All @@ -363,10 +363,9 @@ mod tests {
.filter(|x| x.id != me)
.map(|x| x.clone())
.collect();
ret.clone()
v.clone()
}
#[test]
#[ignore]
fn test_multi_node() {
logger::setup();
const N: usize = 5;
Expand All @@ -393,66 +392,60 @@ mod tests {

let mut threads = server.thread_hdls;
for _ in 0..N {
replicant(&leader.data, exit.clone(), &alice, &mut threads);
validator(&leader.data, exit.clone(), &alice, &mut threads);
}
let addrs = converge(&leader.data, exit.clone(), N + 2, &mut threads);
let servers = converge(&leader.data, exit.clone(), N + 2, &mut threads);
//contains the leader addr as well
assert_eq!(addrs.len(), N + 1);
assert_eq!(servers.len(), N + 1);
//verify leader can do transfer
let leader_balance = {
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();

let mut client = ThinClient::new(
leader.data.requests_addr,
requests_socket,
leader.data.events_addr,
events_socket,
);
trace!("getting leader last_id");
let last_id = client.get_last_id().wait().unwrap();
info!("executing leader transer");
let _sig = client
.transfer(500, &alice.keypair(), bob_pubkey, &last_id)
.unwrap();
trace!("getting leader balance");
client.get_balance(&bob_pubkey).unwrap()
};
let leader_balance = tx_and_retry_get_balance(&leader.data, &alice, &bob_pubkey).unwrap();
assert_eq!(leader_balance, 500);
//verify replicant has the same balance
//verify validator has the same balance
let mut success = 0usize;
for rd in addrs.iter() {
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();

let mut client = ThinClient::new(
rd.requests_addr,
requests_socket,
rd.events_addr,
events_socket,
);
for i in 0..10 {
trace!("getting replicant balance {} {}/10", rd.requests_addr, i);
if let Ok(bal) = client.get_balance(&bob_pubkey) {
trace!("replicant balance {}", bal);
if bal == leader_balance {
success += 1;
break;
}
for server in servers.iter() {
let mut client = mk_client(server);
if let Ok(bal) = poll_get_balance(&mut client, &bob_pubkey) {
trace!("validator balance {}", bal);
if bal == leader_balance {
success += 1;
}
sleep(Duration::new(1, 0));
}
}
assert_eq!(success, addrs.len());
assert_eq!(success, servers.len());
exit.store(true, Ordering::Relaxed);
for t in threads {
t.join().unwrap();
}
}

fn mk_client(leader: &ReplicatedData) -> ThinClient {
let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();
let events_socket = UdpSocket::bind("0.0.0.0:0").unwrap();

ThinClient::new(
leader.requests_addr,
requests_socket,
leader.events_addr,
events_socket,
)
}

fn tx_and_retry_get_balance(
leader: &ReplicatedData,
alice: &Mint,
bob_pubkey: &PublicKey,
) -> io::Result<i64> {
let mut client = mk_client(leader);
trace!("getting leader last_id");
let last_id = client.get_last_id().wait().unwrap();
info!("executing leader transer");
let _sig = client
.transfer(500, &alice.keypair(), *bob_pubkey, &last_id)
.unwrap();
poll_get_balance(&mut client, bob_pubkey)
}

}

0 comments on commit f3c4acc

Please sign in to comment.