Skip to content

Commit f94c8d3

Browse files
committed
GH-800: wip: fixing 2 tests
1 parent 3b387a7 commit f94c8d3

File tree

1 file changed

+75
-53
lines changed

1 file changed

+75
-53
lines changed

node/src/proxy_client/stream_handler_pool.rs

Lines changed: 75 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1026,63 +1026,85 @@ mod tests {
10261026

10271027
#[test]
10281028
fn stream_handler_pool_sends_shutdown_signal_when_last_data_is_true() {
1029+
// TODO: GH-800: Identify why am I receiving this error - no Task is currently running
10291030
init_test_logging();
10301031
let test_name = "stream_handler_pool_sends_shutdown_signal_when_last_data_is_true";
1031-
let (shutdown_tx, mut shutdown_rx) = unbounded_channel();
1032-
thread::spawn(move || {
1033-
let stream_key = StreamKey::make_meaningful_stream_key("I should die");
1034-
let client_request_payload = ClientRequestPayload_0v1 {
1035-
stream_key,
1036-
sequenced_packet: SequencedPacket {
1037-
data: b"I'm gonna kill you stream key".to_vec(),
1038-
sequence_number: 0,
1039-
last_data: true,
1040-
},
1041-
target_hostname: Some(String::from("3.4.5.6:80")),
1042-
target_port: HTTP_PORT,
1043-
protocol: ProxyProtocol::HTTP,
1044-
originator_public_key: PublicKey::new(&b"brutal death"[..]),
1045-
};
1046-
let package = ExpiredCoresPackage::new(
1047-
SocketAddr::from_str("1.2.3.4:1234").unwrap(),
1048-
Some(make_wallet("consuming")),
1049-
make_meaningless_route(),
1050-
client_request_payload.into(),
1051-
0,
1052-
);
1053-
let peer_addr = SocketAddr::from_str("3.4.5.6:80").unwrap();
1054-
let peer_actors = peer_actors_builder().build();
1055-
let mut subject = StreamHandlerPoolReal::new(
1056-
Box::new(ResolverWrapperMock::new()),
1057-
main_cryptde(),
1058-
peer_actors.accountant.report_exit_service_provided.clone(),
1059-
peer_actors.proxy_client_opt.unwrap().clone(),
1060-
100,
1061-
200,
1062-
);
1063-
{
1064-
let mut inner = subject.inner.lock().unwrap();
1065-
inner.logger = Logger::new(test_name);
1066-
inner.stream_writer_channels.insert(
1067-
stream_key,
1068-
StreamSenders {
1069-
writer_data: Box::new(SenderWrapperMock::new(peer_addr)),
1070-
reader_shutdown_tx: shutdown_tx,
1071-
},
1072-
);
1073-
}
1032+
let mut system = System::new(test_name);
1033+
let future = future::lazy(move || {
1034+
let (shutdown_tx, mut shutdown_rx) = unbounded_channel();
1035+
actix::spawn(move || {
1036+
future::lazy({
1037+
let stream_key = StreamKey::make_meaningful_stream_key("I should die");
1038+
let client_request_payload = ClientRequestPayload_0v1 {
1039+
stream_key,
1040+
sequenced_packet: SequencedPacket {
1041+
data: b"I'm gonna kill you stream key".to_vec(),
1042+
sequence_number: 0,
1043+
last_data: true,
1044+
},
1045+
target_hostname: Some(String::from("3.4.5.6:80")),
1046+
target_port: HTTP_PORT,
1047+
protocol: ProxyProtocol::HTTP,
1048+
originator_public_key: PublicKey::new(&b"brutal death"[..]),
1049+
};
1050+
let package = ExpiredCoresPackage::new(
1051+
SocketAddr::from_str("1.2.3.4:1234").unwrap(),
1052+
Some(make_wallet("consuming")),
1053+
make_meaningless_route(),
1054+
client_request_payload.into(),
1055+
0,
1056+
);
1057+
let peer_addr = SocketAddr::from_str("3.4.5.6:80").unwrap();
1058+
let peer_actors = peer_actors_builder().build();
1059+
let mut subject = StreamHandlerPoolReal::new(
1060+
Box::new(ResolverWrapperMock::new()),
1061+
main_cryptde(),
1062+
peer_actors.accountant.report_exit_service_provided.clone(),
1063+
peer_actors.proxy_client_opt.unwrap().clone(),
1064+
100,
1065+
200,
1066+
);
1067+
{
1068+
let mut inner = subject.inner.lock().unwrap();
1069+
inner.logger = Logger::new(test_name);
1070+
inner.stream_writer_channels.insert(
1071+
stream_key,
1072+
StreamSenders {
1073+
writer_data: Box::new(SenderWrapperMock::new(peer_addr)),
1074+
reader_shutdown_tx: shutdown_tx,
1075+
},
1076+
);
1077+
}
10741078

1075-
run_process_package_in_actix(subject, package);
1076-
});
1077-
let received = shutdown_rx.poll().unwrap();
1078-
assert_eq!(received, Async::Ready(Some(())));
1079-
TestLogHandler::new().await_log_containing(
1080-
&format!(
1081-
"DEBUG: {test_name}: Removing StreamWriter and Shutting down StreamReader \
1079+
let paying_wallet = package.paying_wallet.clone();
1080+
let payload = match package.payload {
1081+
MessageType::ClientRequest(vd) => vd
1082+
.extract(
1083+
&crate::sub_lib::migrations::client_request_payload::MIGRATIONS,
1084+
)
1085+
.unwrap(),
1086+
_ => panic!("Expected MessageType::ClientRequest, got something else"),
1087+
};
1088+
// actix::run(move || {
1089+
subject.process_package(payload, paying_wallet);
1090+
// ok(())
1091+
// })
1092+
1093+
// run_process_package_in_actix(subject, package);
1094+
})
1095+
});
1096+
let received = shutdown_rx.poll().unwrap();
1097+
assert_eq!(received, Async::Ready(Some(())));
1098+
TestLogHandler::new().await_log_containing(
1099+
&format!(
1100+
"DEBUG: {test_name}: Removing StreamWriter and Shutting down StreamReader \
10821101
for oUHoHuDKHjeWq+BJzBIqHpPFBQw to 3.4.5.6:80"
1083-
),
1084-
500,
1085-
);
1102+
),
1103+
500,
1104+
);
1105+
ok(())
1106+
});
1107+
system.block_on(future).unwrap();
10861108
}
10871109

10881110
#[test]

0 commit comments

Comments
 (0)