Skip to content

Commit 6d05876

Browse files
authored
Merge pull request #164 from DeterminateSystems/stop-listening-when-shutting-down
Stop listening for new events once we've started shutting down
2 parents 992aa39 + 5366f49 commit 6d05876

File tree

5 files changed

+102
-99
lines changed

5 files changed

+102
-99
lines changed

.github/workflows/build.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ jobs:
2424
runner: namespace-profile-default-arm64
2525
- nix-system: x86_64-darwin
2626
system: X64-macOS
27-
runner: macos-14-large
27+
runner: macos-latest-xlarge
2828
- nix-system: aarch64-darwin
2929
system: ARM64-macOS
3030
runner: macos-latest-xlarge

.github/workflows/check-and-test.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ jobs:
5252
- system: ARM64-Linux
5353
runner: namespace-profile-default-arm64
5454
- system: X64-macOS
55-
runner: macos-14-large
55+
runner: macos-latest-xlarge
5656
- system: ARM64-macOS
5757
runner: macos-latest-xlarge
5858
permissions:

magic-nix-cache/src/api.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,7 @@ async fn workflow_finish(
110110
tracing::info!("FlakeHub cache is not enabled, not uploading anything to it");
111111
}
112112

113-
if let Some(sender) = state.shutdown_sender.lock().await.take() {
114-
sender
115-
.send(())
116-
.map_err(|_| Error::Internal("Sending shutdown server message".to_owned()))?;
117-
}
113+
state.shutdown_token.cancel();
118114

119115
// NOTE(cole-h): see `init_logging`
120116
if let Some(logfile) = &state.logfile {

magic-nix-cache/src/main.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use clap::Parser;
3737
use serde::{Deserialize, Serialize};
3838
use tokio::fs::File;
3939
use tokio::io::AsyncWriteExt;
40-
use tokio::sync::{oneshot, Mutex, RwLock};
40+
use tokio::sync::{Mutex, RwLock};
4141
use tracing_subscriber::filter::EnvFilter;
4242
use tracing_subscriber::layer::SubscriberExt;
4343
use tracing_subscriber::util::SubscriberInitExt;
@@ -204,9 +204,6 @@ struct StateInner {
204204
/// The upstream cache.
205205
upstream: Option<String>,
206206

207-
/// The sender half of the oneshot channel to trigger a shutdown.
208-
shutdown_sender: Mutex<Option<oneshot::Sender<()>>>,
209-
210207
/// Set of store path hashes that are not present in GHAC.
211208
narinfo_negative_cache: Arc<RwLock<HashSet<String>>>,
212209

@@ -224,6 +221,9 @@ struct StateInner {
224221

225222
/// The paths in the Nix store when Magic Nix Cache started, if store diffing is enabled.
226223
original_paths: Option<Mutex<HashSet<PathBuf>>>,
224+
225+
/// A CancellationToken that will be cancelled once magic-nix-cache starts shutting down.
226+
shutdown_token: tokio_util::sync::CancellationToken,
227227
}
228228

229229
#[derive(Debug, Clone)]
@@ -443,19 +443,19 @@ async fn main_cli(args: Args, recorder: detsys_ids_client::Recorder) -> Result<(
443443
None
444444
};
445445

446-
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
446+
let shutdown_token = tokio_util::sync::CancellationToken::new();
447447

448448
let original_paths = args.diff_store.then_some(Mutex::new(HashSet::new()));
449449
let state = Arc::new(StateInner {
450450
gha_cache,
451451
upstream: args.upstream.clone(),
452-
shutdown_sender: Mutex::new(Some(shutdown_sender)),
453452
narinfo_negative_cache,
454453
metrics,
455454
store,
456455
flakehub_state: RwLock::new(flakehub_state),
457456
logfile: guard.logfile,
458457
original_paths,
458+
shutdown_token: shutdown_token.clone(),
459459
});
460460

461461
if dnixd_available == Dnixd::Available {
@@ -551,7 +551,7 @@ async fn main_cli(args: Args, recorder: detsys_ids_client::Recorder) -> Result<(
551551
let listener = tokio::net::TcpListener::bind(&args.listen).await?;
552552
let ret = axum::serve(listener, app.into_make_service())
553553
.with_graceful_shutdown(async move {
554-
shutdown_receiver.await.ok();
554+
shutdown_token.cancelled_owned().await;
555555
tracing::info!("Shutting down");
556556
})
557557
.await;

magic-nix-cache/src/pbh.rs

Lines changed: 92 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::io::Write as _;
22
use std::net::SocketAddr;
33
use std::os::unix::fs::PermissionsExt as _;
44
use std::path::PathBuf;
5+
use std::sync::Arc;
56

67
use anyhow::anyhow;
78
use anyhow::Context as _;
@@ -24,102 +25,108 @@ pub async fn subscribe_uds_post_build_hook(
2425
) -> Result<()> {
2526
tokio::spawn(async move {
2627
let dnixd_uds_socket_path = &dnixd_uds_socket_path;
27-
loop {
28-
let Ok(socket_conn) = UnixStream::connect(dnixd_uds_socket_path).await else {
29-
tracing::error!("built-paths: failed to connect to determinate-nixd's socket");
30-
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
31-
continue;
32-
};
33-
let stream = TokioIo::new(socket_conn);
34-
let executor: TokioExecutor = TokioExecutor::new();
3528

36-
let sender_conn = hyper::client::conn::http2::handshake(executor, stream).await;
29+
tokio::select! {
30+
_ = state.shutdown_token.cancelled() => {
31+
}
32+
_ = handle_events(Arc::clone(&state), dnixd_uds_socket_path) => {
33+
}
34+
}
35+
});
36+
37+
Ok(())
38+
}
3739

38-
let Ok((mut sender, conn)) = sender_conn else {
39-
tracing::error!("built-paths: failed to http2 handshake");
40+
async fn handle_events(state: State, dnixd_uds_socket_path: &PathBuf) -> ! {
41+
loop {
42+
let Ok(socket_conn) = UnixStream::connect(dnixd_uds_socket_path).await else {
43+
tracing::error!("built-paths: failed to connect to determinate-nixd's socket");
44+
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
45+
continue;
46+
};
47+
let stream = TokioIo::new(socket_conn);
48+
let executor: TokioExecutor = TokioExecutor::new();
49+
50+
let sender_conn = hyper::client::conn::http2::handshake(executor, stream).await;
51+
52+
let Ok((mut sender, conn)) = sender_conn else {
53+
tracing::error!("built-paths: failed to http2 handshake");
54+
continue;
55+
};
56+
57+
// NOTE(colemickens): for now we just drop the joinhandle and let it keep running
58+
let _join_handle = tokio::task::spawn(async move {
59+
if let Err(err) = conn.await {
60+
tracing::error!("Connection failed: {:?}", err);
61+
}
62+
});
63+
64+
let request = http::Request::builder()
65+
.method(http::Method::GET)
66+
.uri("http://localhost/events")
67+
.body(axum::body::Body::empty());
68+
let Ok(request) = request else {
69+
tracing::error!("built-paths: failed to create request to subscribe");
70+
continue;
71+
};
72+
73+
let response = sender.send_request(request).await;
74+
let response = match response {
75+
Ok(r) => r,
76+
Err(e) => {
77+
tracing::error!("built-paths: failed to send subscription request: {:?}", e);
4078
continue;
41-
};
79+
}
80+
};
81+
let mut data = response.into_data_stream();
4282

43-
// NOTE(colemickens): for now we just drop the joinhandle and let it keep running
44-
let _join_handle = tokio::task::spawn(async move {
45-
if let Err(err) = conn.await {
46-
tracing::error!("Connection failed: {:?}", err);
83+
while let Some(event_str) = data.next().await {
84+
let event_str = match event_str {
85+
Ok(event) => event,
86+
Err(e) => {
87+
tracing::error!("built-paths: error while receiving: {}", e);
88+
break;
4789
}
48-
});
49-
50-
let request = http::Request::builder()
51-
.method(http::Method::GET)
52-
.uri("http://localhost/events")
53-
.body(axum::body::Body::empty());
54-
let Ok(request) = request else {
55-
tracing::error!("built-paths: failed to create request to subscribe");
90+
};
91+
92+
let Some(event_str) = event_str.strip_prefix("data: ".as_bytes()) else {
93+
tracing::debug!("built-paths subscription: ignoring non-data frame");
94+
continue;
95+
};
96+
let Ok(event): core::result::Result<BuiltPathResponseEventV1, _> =
97+
serde_json::from_slice(event_str)
98+
else {
99+
tracing::error!("failed to decode built-path response as BuiltPathResponseEventV1");
56100
continue;
57101
};
58102

59-
let response = sender.send_request(request).await;
60-
let response = match response {
61-
Ok(r) => r,
62-
Err(e) => {
63-
tracing::error!("buit-paths: failed to send subscription request: {:?}", e);
64-
continue;
65-
}
103+
let maybe_store_paths = event
104+
.outputs
105+
.iter()
106+
.map(|path| {
107+
state
108+
.store
109+
.follow_store_path(path)
110+
.map_err(|_| anyhow!("failed to collect store paths"))
111+
})
112+
.collect::<Result<Vec<_>>>();
113+
114+
let Ok(store_paths) = maybe_store_paths else {
115+
tracing::error!("built-paths: encountered an error aggregating build store paths");
116+
continue;
66117
};
67-
let mut data = response.into_data_stream();
68-
69-
while let Some(event_str) = data.next().await {
70-
let event_str = match event_str {
71-
Ok(event) => event,
72-
Err(e) => {
73-
tracing::error!("built-paths: error while receiving: {}", e);
74-
break;
75-
}
76-
};
77-
78-
let Some(event_str) = event_str.strip_prefix("data: ".as_bytes()) else {
79-
tracing::debug!("built-paths subscription: ignoring non-data frame");
80-
continue;
81-
};
82-
let Ok(event): core::result::Result<BuiltPathResponseEventV1, _> =
83-
serde_json::from_slice(event_str)
84-
else {
85-
tracing::error!(
86-
"failed to decode built-path response as BuiltPathResponseEventV1"
87-
);
88-
continue;
89-
};
90-
91-
let maybe_store_paths = event
92-
.outputs
93-
.iter()
94-
.map(|path| {
95-
state
96-
.store
97-
.follow_store_path(path)
98-
.map_err(|_| anyhow!("failed to collect store paths"))
99-
})
100-
.collect::<Result<Vec<_>>>();
101-
102-
let Ok(store_paths) = maybe_store_paths else {
103-
tracing::error!(
104-
"built-paths: encountered an error aggregating build store paths"
105-
);
106-
continue;
107-
};
108-
109-
tracing::debug!("about to enqueue paths: {:?}", store_paths);
110-
if let Err(e) = crate::api::enqueue_paths(&state, store_paths).await {
111-
tracing::error!(
112-
"built-paths: failed to enqueue paths for drv ({}): {}",
113-
event.drv.display(),
114-
e
115-
);
116-
continue;
117-
}
118+
119+
tracing::debug!("about to enqueue paths: {:?}", store_paths);
120+
if let Err(e) = crate::api::enqueue_paths(&state, store_paths).await {
121+
tracing::error!(
122+
"built-paths: failed to enqueue paths for drv ({}): {}",
123+
event.drv.display(),
124+
e
125+
);
126+
continue;
118127
}
119128
}
120-
});
121-
122-
Ok(())
129+
}
123130
}
124131

125132
pub async fn setup_legacy_post_build_hook(

0 commit comments

Comments
 (0)