Skip to content

Commit 4f79ce5

Browse files
committed
test(subscriber): add initial integration test
The `console-subscriber` crate is hard to test. This first test covers the self wake defect that was fixed in #430.
1 parent 9107562 commit 4f79ce5

File tree

1 file changed

+125
-0
lines changed

1 file changed

+125
-0
lines changed

console-subscriber/tests/wake.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
use std::{thread, time::Duration};
2+
3+
use console_api::instrument::{instrument_client::InstrumentClient, InstrumentRequest};
4+
use futures::stream::StreamExt;
5+
use tokio::{sync::oneshot, task, time::sleep};
6+
use tracing_subscriber::prelude::*;
7+
8+
#[test]
9+
fn self_wake() {
10+
let (console_layer, server) = console_subscriber::ConsoleLayer::builder().build();
11+
12+
let registry = tracing_subscriber::registry().with(console_layer);
13+
14+
let (finish_tx, finish_rx) = oneshot::channel::<()>();
15+
16+
let join_handle = thread::Builder::new()
17+
.name("console_subscriber".into())
18+
.spawn(move || {
19+
let _subscriber_guard =
20+
tracing::subscriber::set_default(tracing_core::subscriber::NoSubscriber::default());
21+
let runtime = tokio::runtime::Builder::new_current_thread()
22+
.enable_io()
23+
.enable_time()
24+
.build()
25+
.expect("console subscriber runtime initialization failed");
26+
27+
runtime
28+
.block_on(async move {
29+
task::Builder::new()
30+
.name("console_server")
31+
.spawn(async move {
32+
server
33+
.serve()
34+
.await
35+
.expect("console subscriber server failed")
36+
})
37+
.unwrap();
38+
39+
let expect = task::Builder::new().name("expect").spawn(async {
40+
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
41+
42+
let target = "http://127.0.0.1:6669".to_owned();
43+
let mut client = InstrumentClient::connect(target).await.unwrap();
44+
45+
46+
let mut fail_count = 0;
47+
let mut stream = loop {
48+
let request = tonic::Request::new(InstrumentRequest {});
49+
match client.watch_updates(request).await {
50+
Ok(stream) => break stream.into_inner(),
51+
Err(err) => {
52+
if fail_count < 5 {
53+
fail_count += 1;
54+
println!(
55+
"Could not connect ({fail_count}), will try again: {err}"
56+
);
57+
sleep(Duration::from_millis(fail_count * 100)).await;
58+
} else {
59+
panic!("Client cannot connect to watch updates: {err}");
60+
}
61+
}
62+
}
63+
};
64+
65+
let mut i: usize = 0;
66+
while let Some(update) = stream.next().await {
67+
match update {
68+
Ok(update) => {
69+
println!("UPDATE {}: {:#?}\n", i, update.task_update);
70+
if let Some(task_update) = update.task_update {
71+
println!(
72+
"UPDATE: new task count: {}, update count: {}, dropped count: {}",
73+
task_update.new_tasks.len(),
74+
task_update.stats_update.len(),
75+
task_update.dropped_events
76+
);
77+
}
78+
i += 1;
79+
}
80+
Err(e) => {
81+
panic!("update stream error: {}", e);
82+
}
83+
}
84+
if i > 2 {
85+
break;
86+
}
87+
}
88+
match finish_tx.send(()) {
89+
Ok(_) => println!("Send finish message!"),
90+
Err(err) => println!("Could not send finish message: {err:?}"),
91+
}
92+
}).unwrap();
93+
94+
match expect.await {
95+
Ok(_) => println!("Successfully awaited expect task!"),
96+
Err(err) => println!("Error awaiting expect task: {err:?}"),
97+
}
98+
});
99+
})
100+
.expect("console subscriber could not spawn thread");
101+
102+
tracing::subscriber::with_default(registry, || {
103+
let runtime = tokio::runtime::Builder::new_current_thread()
104+
.enable_all()
105+
.build()
106+
.unwrap();
107+
108+
tokio::task::Builder::new()
109+
.name("mog")
110+
.spawn_on(async { task::yield_now().await }, runtime.handle())
111+
.unwrap();
112+
runtime.block_on(async {
113+
println!("Before await finish...");
114+
match finish_rx.await {
115+
Ok(_) => println!("finish message received."),
116+
Err(err) => println!("finish message could not be received: {err}"),
117+
}
118+
});
119+
});
120+
121+
match join_handle.join() {
122+
Ok(_) => println!("Successfully joined console subscriber thread"),
123+
Err(err) => println!("Error joining console subscriber thread: {err:?}"),
124+
}
125+
}

0 commit comments

Comments
 (0)