Skip to content

[pull] main from quickwit-oss:main #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quickwit/quickwit-actors/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ pub trait Actor: Send + Sized + 'static {
/// This hook is called only once.
///
/// It is always called regardless of the reason why the actor exited.
/// The exit status is passed as an argument to make it possible to act conditionnally
/// The exit status is passed as an argument to make it possible to act conditionally
/// upon it.
/// For instance, it is often better to do as little work as possible on a killed actor.
/// It can be done by checking the `exit_status` and performing an early-exit if it is
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cli/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ pub(super) mod jemalloc_profiled {
ctx.format_fields(writer.by_ref(), event)?;
writeln!(writer)?;

// Print a backtrace to help idenify the callsite
// Print a backtrace to help identify the callsite
backtrace::trace(|frame| {
backtrace::resolve_frame(frame, |symbol| {
if let Some(symbole_name) = symbol.name() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,10 @@ fn assert_post_condition_physical_plan_match_solution(
assert_eq!(num_indexers, id_to_ord_map.indexer_ids.len());
let mut reconstructed_solution = SchedulingSolution::with_num_indexers(num_indexers);
convert_physical_plan_to_solution(physical_plan, id_to_ord_map, &mut reconstructed_solution);
assert_eq!(solution, &reconstructed_solution);
assert_eq!(
solution.indexer_assignments,
reconstructed_solution.indexer_assignments
);
}

fn add_shard_to_indexer(
Expand Down Expand Up @@ -576,7 +579,7 @@ fn add_shard_to_indexer(
}
}

// If the total node capacities is lower than 110% of the problem load, this
// If the total node capacities is lower than 120% of the problem load, this
// function scales the load of the indexer to reach this limit.
fn inflate_node_capacities_if_necessary(problem: &mut SchedulingProblem) {
// First we scale the problem to the point where any indexer can fit the largest shard.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::collections::btree_map::Entry;

use itertools::Itertools;
use quickwit_proto::indexing::CpuCapacity;
use tracing::warn;

use super::scheduling_logic_model::*;
use crate::indexing_scheduler::scheduling::inflate_node_capacities_if_necessary;
Expand All @@ -41,7 +40,7 @@ pub fn solve(
previous_solution: SchedulingSolution,
) -> SchedulingSolution {
// We first inflate the indexer capacities to make sure they globally
// have at least 110% of the total problem load. This is done proportionally
// have at least 120% of the total problem load. This is done proportionally
// to their original capacity.
inflate_node_capacities_if_necessary(&mut problem);
// As a heuristic, to offer stability, we work iteratively
Expand Down Expand Up @@ -294,21 +293,23 @@ fn place_unassigned_shards_ignoring_affinity(
Reverse(load)
});

// Thanks to the call to `inflate_node_capacities_if_necessary`,
// we are certain that even on our first attempt, the total capacity of the indexer exceeds 120%
// of the partial solution.
// Thanks to the call to `inflate_node_capacities_if_necessary`, we are
// certain that even on our first attempt, the total capacity of the indexer
// exceeds 120% of the partial solution. If a large shard needs to be placed
// in an already well balanced solution, it may not fit on any node. In that
// case, we iteratively grow the virtual capacity until it can be placed.
//
// 1.2^30 is about 240.
// If we reach 30 attempts we are certain to have a logical bug.
// 1.2^30 is about 240. If we reach 30 attempts we are certain to have a
// logical bug.
for attempt_number in 0..30 {
match attempt_place_unassigned_shards(&unassigned_shards[..], &problem, partial_solution) {
Ok(solution) => {
if attempt_number != 0 {
warn!(
attempt_number = attempt_number,
"required to scale node capacity"
);
}
Ok(mut solution) => {
// the higher the attempt number, the more unbalanced the solution
tracing::warn!(
attempt_number = attempt_number,
"capacity re-scaled, scheduling solution likely unbalanced"
);
solution.capacity_scaling_iterations = attempt_number;
return solution;
}
Err(NotEnoughCapacity) => {
Expand Down Expand Up @@ -783,4 +784,19 @@ mod tests {
solve(problem, solution);
}
}

#[test]
fn test_capacity_scaling_iteration_required() {
// Create a problem where affinity constraints cause suboptimal placement
// requiring iterative scaling despite initial capacity scaling.
let mut problem =
SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(3000), mcpu(3000)]);
problem.add_source(1, NonZeroU32::new(2500).unwrap()); // Source 0
problem.add_source(1, NonZeroU32::new(2500).unwrap()); // Source 1
problem.add_source(1, NonZeroU32::new(1500).unwrap()); // Source 2
let previous_solution = problem.new_solution();
let solution = solve(problem, previous_solution);

assert_eq!(solution.capacity_scaling_iterations, 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,18 @@ impl IndexerAssignment {
}
}

#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug)]
pub struct SchedulingSolution {
pub indexer_assignments: Vec<IndexerAssignment>,
// used for tests
pub capacity_scaling_iterations: usize,
}

impl SchedulingSolution {
pub fn with_num_indexers(num_indexers: usize) -> SchedulingSolution {
SchedulingSolution {
indexer_assignments: (0..num_indexers).map(IndexerAssignment::new).collect(),
capacity_scaling_iterations: 0,
}
}
pub fn num_indexers(&self) -> usize {
Expand Down
Loading