Skip to content

Commit 34196df

Browse files
committed
Refactoring of journal flush
1 parent bc843e1 commit 34196df

File tree

4 files changed

+27
-38
lines changed

4 files changed

+27
-38
lines changed

CHANGELOG.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@
1212
* `hq journal report` that exports a statistics into an HTML report
1313
* Server scheduler more respects priorities
1414
* Server scheduler improved when resource variants are used
15-
* Extended events:
16-
* New journal events: `job-idle`, `task-notify`.
17-
* `worker-new` now contains the worker configuration,
18-
* `job-created` now contains task information
19-
* `task-started` now contains a resource variant id
20-
* We now ensures that after a sucessful modifying client's operation (submit, cancel, open/close job, queues modification),
15+
* Extended events:
16+
* New journal events: `job-idle`, `task-notify`.
17+
* `worker-new` now contains the worker configuration,
18+
* `job-created` now contains task information
19+
* `task-started` now contains a resource variant id
20+
* We now ensure that after a successful modifying client's operation (submit, cancel, open/close job, queues
21+
modification),
2122
the operation is immediately a part of the written journal.
2223

2324
### Changes

crates/hyperqueue/src/server/client/autoalloc.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,7 @@ pub async fn handle_autoalloc_message(
4343
.add_queue(server_dir.directory(), parameters, None, None);
4444
match result.await {
4545
Ok(queue_id) => {
46-
if let Some(callback) = senders.events.flush_journal() {
47-
let _ = callback.await;
48-
};
46+
senders.events.flush_journal().await;
4947
ToClientMessage::AutoAllocResponse(AutoAllocResponse::QueueCreateResponse(
5048
QueueCreateResponse::Created(queue_id),
5149
))
@@ -66,9 +64,7 @@ pub async fn handle_autoalloc_message(
6664
let result = senders.autoalloc.remove_queue(queue_id, force);
6765
match result.await {
6866
Ok(_) => {
69-
if let Some(callback) = senders.events.flush_journal() {
70-
let _ = callback.await;
71-
};
67+
senders.events.flush_journal().await;
7268
ToClientMessage::AutoAllocResponse(AutoAllocResponse::QueueRemoved(queue_id))
7369
}
7470
Err(error) => ToClientMessage::Error(error.to_string()),
@@ -78,9 +74,7 @@ pub async fn handle_autoalloc_message(
7874
let result = senders.autoalloc.pause_queue(queue_id);
7975
match result.await {
8076
Ok(_) => {
81-
if let Some(callback) = senders.events.flush_journal() {
82-
let _ = callback.await;
83-
};
77+
senders.events.flush_journal().await;
8478
ToClientMessage::AutoAllocResponse(AutoAllocResponse::QueuePaused(queue_id))
8579
}
8680
Err(error) => ToClientMessage::Error(error.to_string()),
@@ -90,9 +84,7 @@ pub async fn handle_autoalloc_message(
9084
let result = senders.autoalloc.resume_queue(queue_id);
9185
match result.await {
9286
Ok(_) => {
93-
if let Some(callback) = senders.events.flush_journal() {
94-
let _ = callback.await;
95-
};
87+
senders.events.flush_journal().await;
9688
ToClientMessage::AutoAllocResponse(AutoAllocResponse::QueueResumed(queue_id))
9789
}
9890
Err(error) => ToClientMessage::Error(error.to_string()),

crates/hyperqueue/src/server/client/mod.rs

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -208,10 +208,8 @@ pub async fn client_rpc_loop<
208208
let response = match message {
209209
FromClientMessage::Submit(msg, stream_opts) => {
210210
let response = submit::handle_submit(&state_ref, senders, msg);
211-
if !response.is_error()
212-
&& let Some(callback) = senders.events.flush_journal()
213-
{
214-
let _ = callback.await;
211+
if !response.is_error() {
212+
senders.events.flush_journal().await;
215213
};
216214
if let Some(mut stream_opts) = stream_opts
217215
&& let ToClientMessage::SubmitResponse(SubmitResponse::Ok {
@@ -274,10 +272,8 @@ pub async fn client_rpc_loop<
274272
}
275273
FromClientMessage::Cancel(msg) => {
276274
let response = handle_job_cancel(&state_ref, senders, &msg.selector).await;
277-
if !response.is_error()
278-
&& let Some(callback) = senders.events.flush_journal()
279-
{
280-
let _ = callback.await;
275+
if !response.is_error() {
276+
senders.events.flush_journal().await;
281277
};
282278
response
283279
}
@@ -292,19 +288,15 @@ pub async fn client_rpc_loop<
292288
}
293289
FromClientMessage::OpenJob(job_description) => {
294290
let response = handle_open_job(&state_ref, senders, job_description);
295-
if !response.is_error()
296-
&& let Some(callback) = senders.events.flush_journal()
297-
{
298-
let _ = callback.await;
291+
if !response.is_error() {
292+
senders.events.flush_journal().await;
299293
};
300294
response
301295
}
302296
FromClientMessage::CloseJob(msg) => {
303297
let response = handle_job_close(&state_ref, senders, &msg.selector).await;
304-
if !response.is_error()
305-
&& let Some(callback) = senders.events.flush_journal()
306-
{
307-
let _ = callback.await;
298+
if !response.is_error() {
299+
senders.events.flush_journal().await;
308300
};
309301
response
310302
}
@@ -319,9 +311,7 @@ pub async fn client_rpc_loop<
319311
handle_prune_journal(&state_ref, senders).await
320312
}
321313
FromClientMessage::FlushJournal => {
322-
if let Some(callback) = senders.events.flush_journal() {
323-
let _ = callback.await;
324-
};
314+
senders.events.flush_journal().await;
325315
ToClientMessage::Finished
326316
}
327317
FromClientMessage::TaskExplain(request) => {

crates/hyperqueue/src/server/event/streamer.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ impl EventStreamer {
449449
inner.client_listeners.remove(p);
450450
}
451451

452-
pub fn flush_journal(&self) -> Option<oneshot::Receiver<()>> {
452+
pub fn start_flush(&self) -> Option<oneshot::Receiver<()>> {
453453
let inner = self.inner.get();
454454
if let Some(ref streamer) = inner.storage_sender {
455455
let (sender, receiver) = oneshot::channel();
@@ -465,6 +465,12 @@ impl EventStreamer {
465465
}
466466
}
467467

468+
pub async fn flush_journal(&self) {
469+
if let Some(handle) = self.start_flush() {
470+
let _ = handle.await;
471+
};
472+
}
473+
468474
pub fn prune_journal(
469475
&self,
470476
live_jobs: Set<JobId>,

0 commit comments

Comments
 (0)