Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/partition-store/src/vqueue_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ impl WriteVQueueTable for PartitionStoreTransaction<'_> {
self.raw_delete_cf(KeyKind::VQueueInbox, key_buffer)
}

fn mark_queue_as_active(&mut self, qid: &restate_types::vqueue::VQueueId) {
fn mark_vqueue_as_active(&mut self, qid: &restate_types::vqueue::VQueueId) {
let mut key_buffer = [0u8; ActiveKey::serialized_length_fixed()];
ActiveKey {
partition_key: qid.partition_key,
Expand All @@ -306,7 +306,7 @@ impl WriteVQueueTable for PartitionStoreTransaction<'_> {
self.raw_put_cf(KeyKind::VQueueActive, key_buffer, []);
}

fn mark_queue_as_empty(&mut self, qid: &restate_types::vqueue::VQueueId) {
fn mark_vqueue_as_dormant(&mut self, qid: &restate_types::vqueue::VQueueId) {
let mut key_buffer = [0u8; ActiveKey::serialized_length_fixed()];
ActiveKey {
partition_key: qid.partition_key,
Expand Down
28 changes: 10 additions & 18 deletions crates/storage-api/src/vqueue_table/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@ use restate_types::vqueue::EffectivePriority;

use super::VisibleAt;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VQueueStatus {
/// Enabled (not-paused) and has items to process
Active,
/// Regardless whether it's paused or not, it's empty (nothing to process)
Empty,
/// Paused indicates it's non-empty but paused (should not process its items)
Paused,
}

#[derive(Debug, Default, Clone, bilrost::Message)]
pub struct VQueueStatistics {
/// The time spend in the queue before the first attempt to run. Measured by EMA of time
Expand Down Expand Up @@ -99,14 +89,16 @@ impl VQueueMeta {
self.num_waiting.iter().sum()
}

pub fn status(&self) -> VQueueStatus {
if self.is_empty() {
VQueueStatus::Empty
} else if self.is_paused() {
VQueueStatus::Paused
} else {
VQueueStatus::Active
}
/// A vqueue is considered active when it's of interest to the scheduler.
///
/// The scheduler cares about vqueues that have entries that are already running or that are waiting
/// to run. With some special rules to consider when the queue is paused. When the vqueue is
/// paused, the scheduler will only be interested in its "running" entries and not in its
/// waiting entries. Therefore, it will remain to be "active" as long as it has running
/// entries. Once running entries are moved to waiting or completed, the vqueue is be
/// considered dormant until it's unpaused.
pub fn is_active(&self) -> bool {
self.stats.num_running > 0 || (self.total_waiting() > 0 && !self.is_paused())
}

pub fn num_waiting(&self, priority: EffectivePriority) -> u32 {
Expand Down
21 changes: 19 additions & 2 deletions crates/storage-api/src/vqueue_table/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,27 @@ pub trait WriteVQueueTable {
fn delete_inbox_entry(&mut self, qid: &VQueueId, stage: Stage, card: &EntryCard);

/// Adds a vqueue to the list of active vqueues
fn mark_queue_as_active(&mut self, qid: &VQueueId);
///
/// A vqueue is considered active when it's of interest to the scheduler.
///
/// The scheduler cares about vqueues that have entries that are already running or that are waiting
/// to run. With some special rules to consider when the queue is paused. When the vqueue is
/// paused, the scheduler will only be interested in its "running" entries and not in its
/// waiting entries. Therefore, it will remain to be "active" as long as it has running
/// entries. Once running entries are moved to waiting or completed, the vqueue is be
/// considered dormant until it's unpaused.
///
/// A vqueue that is "not" active does not mean it's "empty". It could be paused or
/// only contains entries in parked or completed states. As such, it's not considered
/// by the scheduler and it's considered "dormant".
fn mark_vqueue_as_active(&mut self, qid: &VQueueId);

/// Removes the vqueue from the list of active vqueues
fn mark_queue_as_empty(&mut self, qid: &VQueueId);
///
/// A dormant vqueue is not necessarily `empty`. It's a vqueue _might_ have items (or not)
/// in parked or completed states, or it might have waiting items in its inbox but the
/// vqueue is paused and would not be visible to the scheduler.
fn mark_vqueue_as_dormant(&mut self, qid: &VQueueId);

/// Updates a vqueue's entry's state
fn put_vqueue_entry_state<E>(
Expand Down
1 change: 1 addition & 0 deletions crates/vqueues/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ bilrost = { workspace = true, features = ["smallvec"] }
derive_more = { workspace = true }
futures = { workspace = true }
hashbrown = { version = "0.16" }
metrics = { workspace = true }
pin-project = { workspace = true }
rocksdb = { workspace = true }
smallvec = { workspace = true }
Expand Down
21 changes: 10 additions & 11 deletions crates/vqueues/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use hashbrown::{HashMap, hash_map};
use tracing::debug;

use restate_storage_api::StorageError;
use restate_storage_api::vqueue_table::metadata::{VQueueMeta, VQueueMetaUpdates, VQueueStatus};
use restate_storage_api::vqueue_table::metadata::{VQueueMeta, VQueueMetaUpdates};
use restate_storage_api::vqueue_table::{ReadVQueueTable, ScanVQueueTable};
use restate_types::vqueue::VQueueId;

Expand Down Expand Up @@ -41,10 +41,6 @@ impl<'a> VQueuesMeta<'a> {
Self { inner: cache }
}

pub fn num_active_vqueues(&self) -> usize {
self.inner.queues.len()
}

pub(crate) fn config_pool(&'a self) -> &'a ConfigPool {
&self.inner.config
}
Expand All @@ -53,8 +49,11 @@ impl<'a> VQueuesMeta<'a> {
self.inner.queues.get(qid)
}

pub fn iter_vqueues(&'a self) -> impl ExactSizeIterator<Item = (&'a VQueueId, &'a VQueueMeta)> {
self.inner.queues.iter()
pub fn iter_active_vqueues(&'a self) -> impl Iterator<Item = (&'a VQueueId, &'a VQueueMeta)> {
self.inner
.queues
.iter()
.filter(|(_, meta)| meta.is_active())
}

pub fn report(&self) {
Expand Down Expand Up @@ -135,18 +134,18 @@ impl VQueuesMetaMut {
}
}

/// Returns the status of the vqueue before and after all the updates
/// Returns is_active of the vqueue before and after all the updates
/// in the form of a tuple (before, after).
pub(crate) async fn apply_updates<S: ReadVQueueTable>(
&mut self,
storage: &mut S,
qid: &VQueueId,
updates: &VQueueMetaUpdates,
) -> Result<(VQueueStatus, VQueueStatus)> {
) -> Result<(bool, bool)> {
let vqueue = self.load(storage, qid).await?;
let before = vqueue.status();
let before = vqueue.is_active();
vqueue.apply_updates(updates)?;
let after = vqueue.status();
let after = vqueue.is_active();

// todo(asoli): Add compaction logic to remove empty vqueues that has been empty for a while
// only if the cache hits a certain threshold.
Expand Down
59 changes: 42 additions & 17 deletions crates/vqueues/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@
// by the Apache License, Version 2.0.

mod cache;
mod metric_definitions;
pub mod scheduler;
mod vqueue_config;

pub use cache::{VQueuesMeta, VQueuesMetaMut};
pub use metric_definitions::describe_metrics;
pub use scheduler::SchedulerService;

use restate_storage_api::StorageError;
use restate_storage_api::vqueue_table::metadata::{VQueueMetaUpdates, VQueueStatus};
use restate_storage_api::vqueue_table::metadata::VQueueMetaUpdates;
use restate_storage_api::vqueue_table::{
AsEntryStateHeader, EntryCard, EntryId, EntryKind, ReadVQueueTable, Stage, VisibleAt,
WriteVQueueTable, metadata,
Expand Down Expand Up @@ -135,7 +137,7 @@ where
);

// Update cache
let (status_before, status_after) = self
let (was_active_before, is_active_now) = self
.cache
.apply_updates(self.storage, &self.qid, &updates)
.await?;
Expand All @@ -156,15 +158,14 @@ where
.put_item(&self.qid, card.created_at, card.kind, &card.id, item);
}

// do not keep putting the same queue if it's already known to be active
if status_before == VQueueStatus::Empty
&& matches!(status_after, VQueueStatus::Active | VQueueStatus::Paused)
{
// Components we need. PKEY, QID. We have all.
self.storage.mark_queue_as_active(&self.qid);
if was_active_before != is_active_now {
assert!(is_active_now);
self.storage.mark_vqueue_as_active(&self.qid);
}

if let Some(collector) = self.action_collector.as_deref_mut() {
if let Some(collector) = self.action_collector.as_deref_mut()
&& is_active_now
{
// Let the scheduler know about the new entry to keep its head-of-line cache of the vqueue
// as fresh as possible.
let inbox_event = VQueueEvent::new(self.qid, EventDetails::Enqueued(card));
Expand Down Expand Up @@ -194,11 +195,17 @@ where
},
);

self.cache
let (was_active_before, is_active_now) = self
.cache
.apply_updates(self.storage, &self.qid, &updates)
.await?;
self.storage.update_vqueue(&self.qid, &updates);

if was_active_before != is_active_now {
assert!(is_active_now);
self.storage.mark_vqueue_as_active(&self.qid);
}
Comment on lines +204 to +207
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the situation in which we can call attempt_to_run and the vqueue being dormant? Would it be a situation in which a user paused a vqueue and the scheduler was scheduling an invocation concurrently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen if the vqueue has no waiting items and we decided to unpark an item by skipping the inbox stage for any reason. It's primarily for being defensive but it shouldn't happen with the current defined flows.


let mut modified_card = card.clone();
modified_card.priority = EffectivePriority::TokenHeld;

Expand Down Expand Up @@ -230,13 +237,19 @@ where
);

// Update cache
self.cache
let (was_active_before, is_active_now) = self
.cache
.apply_updates(self.storage, &self.qid, &updates)
.await?;

// Update vqueue meta in storage
self.storage.update_vqueue(&self.qid, &updates);

if was_active_before != is_active_now {
assert!(is_active_now);
self.storage.mark_vqueue_as_active(&self.qid);
}

let mut modified_card = card.clone();
if card.priority.has_started() {
// we need to do this to ensure that inbox entries of started entries follow the
Expand Down Expand Up @@ -284,13 +297,19 @@ where
);

// Update cache
self.cache
let (was_active_before, is_active_now) = self
.cache
.apply_updates(self.storage, &self.qid, &updates)
.await?;

// Update vqueue meta in storage
self.storage.update_vqueue(&self.qid, &updates);

if was_active_before != is_active_now {
assert!(!is_active_now);
self.storage.mark_vqueue_as_dormant(&self.qid);
}

let mut modified_card = card.clone();
if should_release_concurrency_token && card.priority.token_held() {
// adjust the priority to reflect releasing the token
Expand Down Expand Up @@ -337,11 +356,17 @@ where
);

// Update cache
self.cache
let (was_active_before, is_active_now) = self
.cache
.apply_updates(self.storage, &self.qid, &updates)
.await?;
self.storage.update_vqueue(&self.qid, &updates);

if was_active_before != is_active_now {
assert!(!is_active_now);
self.storage.mark_vqueue_as_dormant(&self.qid);
}

// We add the entry back into the waiting inbox
self.storage.put_inbox_entry(&self.qid, Stage::Inbox, card);

Expand Down Expand Up @@ -384,15 +409,15 @@ where
);

// Update cache
let (status_before, status_after) = self
let (was_active_before, is_active_now) = self
.cache
.apply_updates(self.storage, &self.qid, &updates)
.await?;
self.storage.update_vqueue(&self.qid, &updates);

if status_before != VQueueStatus::Empty && status_after == VQueueStatus::Empty {
// Components we need. PKEY, QID. We have all.
self.storage.mark_queue_as_empty(&self.qid);
if was_active_before != is_active_now {
assert!(!is_active_now);
self.storage.mark_vqueue_as_dormant(&self.qid);
}

if let Some(collector) = self.action_collector.as_deref_mut() {
Expand Down
52 changes: 52 additions & 0 deletions crates/vqueues/src/metric_definitions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use metrics::{Unit, counter, describe_counter};

pub const VQUEUE_ENQUEUE: &str = "restate.vqueue.scheduler.enqueue.total";
pub const VQUEUE_SCHEDULER_DECISION: &str = "restate.vqueue.scheduler.decision.total";
pub const VQUEUE_RUN_CONFIRMED: &str = "restate.vqueue.scheduler.run_confirmed.total";
pub const VQUEUE_RUN_REJECTED: &str = "restate.vqueue.scheduler.run_rejected.total";

pub const ACTION_YIELD: &str = "yield";
pub const ACTION_RESUME: &str = "resume";
pub const ACTION_RUN: &str = "run";

pub fn describe_metrics() {
describe_counter!(
VQUEUE_ENQUEUE,
Unit::Count,
"Number of entries/invocations in vqueues added to the waiting inbox"
);

describe_counter!(
VQUEUE_SCHEDULER_DECISION,
Unit::Count,
"Number of entries in vqueues scheduler, broken down by decision"
);

describe_counter!(
VQUEUE_RUN_CONFIRMED,
Unit::Count,
"Number of entries/invocations in vqueues where the run request was confirmed"
);

describe_counter!(
VQUEUE_RUN_REJECTED,
Unit::Count,
"Number of entries/invocations in vqueues where the run request was rejected"
);
}

pub fn publish_scheduler_decision_metrics(num_run: usize, num_yield: usize, num_resume: usize) {
counter!(VQUEUE_SCHEDULER_DECISION, "action" => ACTION_RUN).increment(num_run as u64);
counter!(VQUEUE_SCHEDULER_DECISION, "action" => ACTION_RESUME).increment(num_resume as u64);
counter!(VQUEUE_SCHEDULER_DECISION, "action" => ACTION_YIELD).increment(num_yield as u64);
}
20 changes: 20 additions & 0 deletions crates/vqueues/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use restate_storage_api::vqueue_table::{ScanVQueueTable, VQueueStore};
use restate_types::vqueue::VQueueId;

use crate::VQueueEvent;
use crate::metric_definitions::publish_scheduler_decision_metrics;
use crate::{VQueuesMeta, VQueuesMetaMut};

use self::drr::DRRScheduler;
Expand Down Expand Up @@ -128,6 +129,25 @@ impl<Item> Decision<Item> {
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}

pub fn report_metrics(&self) {
let mut num_run = 0;
let mut num_resume = 0;
let mut num_yield = 0;
for segment in self
.0
.values()
.flat_map(|assignments| &assignments.segments)
{
let count = segment.items.len();
match segment.action {
Action::ResumeAlreadyRunning => num_resume += count,
Action::MoveToRunning => num_run += count,
Action::Yield => num_yield += count,
}
}
publish_scheduler_decision_metrics(num_run, num_yield, num_resume);
}
}

enum State<S: VQueueStore, Token> {
Expand Down
Loading
Loading