Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions crates/core/src/startup.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use core_affinity::CoreId;
use crossbeam_queue::ArrayQueue;
use itertools::Itertools;
use spacetimedb_paths::server::{ConfigToml, LogsDir};
Expand All @@ -16,6 +15,8 @@ use tracing_subscriber::{reload, EnvFilter};
use crate::config::{ConfigFile, LogConfig};
use crate::util::jobs::JobCores;

pub use core_affinity::CoreId;

pub struct TracingOptions {
pub config: LogConfig,
/// Whether or not to periodically reload the log config in the background.
Expand Down Expand Up @@ -196,6 +197,13 @@ pub struct CoreReservations {
///
/// Default: 1/8
pub rayon: f64,
/// Cores to reserve for IRQ handling.
///
/// This will be the first `n` [`CoreId`]s in the list.
/// Only make use of this if you're configuring the machine for IRQ pinning!
///
/// Default: 2
pub irq: usize,
/// Extra reserved cores.
///
/// If greater than zero, this many cores will be reserved _before_
Expand All @@ -211,6 +219,7 @@ impl Default for CoreReservations {
databases: 1.0 / 8.0,
tokio_workers: 4.0 / 8.0,
rayon: 1.0 / 8.0,
irq: 2,
reserved: 0,
}
}
Expand All @@ -221,13 +230,15 @@ impl CoreReservations {
///
/// Returns the allocated cores in the order:
///
/// - irq
/// - reserved
/// - databases
/// - tokio_workers
/// - rayon
///
/// Left public for testing and debugging purposes.
pub fn apply(&self, cores: &mut Vec<CoreId>) -> [Vec<CoreId>; 4] {
pub fn apply(&self, cores: &mut Vec<CoreId>) -> [Vec<CoreId>; 5] {
let irq = cores.drain(..self.irq).collect_vec();
let reserved = cores.drain(..self.reserved).collect_vec();

let total = cores.len() as f64;
Expand All @@ -240,7 +251,7 @@ impl CoreReservations {
let tokio_workers = claim(cores, frac(self.tokio_workers)).collect_vec();
let rayon = claim(cores, frac(self.rayon)).collect_vec();

[reserved, databases, tokio_workers, rayon]
[irq, reserved, databases, tokio_workers, rayon]
}
}

Expand Down Expand Up @@ -273,7 +284,7 @@ impl Cores {
fn get(reservations: CoreReservations) -> Option<Self> {
let mut cores = Self::get_core_ids()?;

let [reserved, databases, tokio_workers, rayon] = reservations.apply(&mut cores);
let [_irq, reserved, databases, tokio_workers, rayon] = reservations.apply(&mut cores);

let reserved = (!reserved.is_empty()).then(|| reserved.into());
let databases = databases.into_iter().collect::<JobCores>();
Expand Down Expand Up @@ -306,20 +317,12 @@ impl Cores {

/// Get the cores of the local host, as reported by the operating system.
///
/// Cores 0 and 1 are not included in the returned vec, as we reserve them
/// for the operating system.
///
/// Returns `None` if `num_cpus - 2` is less than 8.
/// Returns `None` if `num_cpus` is less than 8.
/// If `Some` is returned, the `Vec` is non-empty.
pub fn get_core_ids() -> Option<Vec<CoreId>> {
let cores = core_affinity::get_core_ids()
.filter(|cores| cores.len() >= 10)?
.into_iter()
// We reserve the first two cores for the OS.
// This allows us to pin interrupt handlers (IRQs) to these cores,
// particularly those for incoming network traffic,
// preventing them from preempting the main reducer threads.
.filter(|core_id| core_id.id > 1)
.collect_vec();

(!cores.is_empty()).then_some(cores)
Expand All @@ -328,14 +331,14 @@ impl Cores {

#[derive(Default)]
pub struct TokioCores {
workers: Option<Vec<CoreId>>,
pub workers: Option<Vec<CoreId>>,
// For blocking threads, we don't want to limit them to a specific number
// and pin them to their own cores - they're supposed to run concurrently
// with each other. However, `core_affinity` doesn't support affinity masks,
// so we just use the Linux-specific API, since this is only a slight boost
// and we don't care enough about performance on other platforms.
#[cfg(target_os = "linux")]
blocking: Option<nix::sched::CpuSet>,
pub blocking: Option<nix::sched::CpuSet>,
}

impl TokioCores {
Expand Down
Loading