Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/pixi_build_frontend/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ impl IsolatedTool {
activation_scripts: activation,
}
}

/// Get the prefix of the isolated tool.
pub fn prefix(&self) -> &PathBuf {
&self.prefix
}
}

impl Tool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,15 @@ impl CommandDispatcher {

// Add debug information about what the backend supports.
tracing::info!(
"Instantiated backend {}{}, negotiated API version {}",
"Instantiated backend {}{}, negotiated API version {}{}",
tool.executable(),
tool.version().map_or_else(String::new, |v| format!("@{v}")),
api_version,
if let Some(isolated_tool) = tool.as_isolated() {
format!(", from prefix {}", isolated_tool.prefix().display())
} else {
"".to_string()
},
);

// Make sure that the project model is compatible with the API version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,20 @@ impl CommandDispatcherProcessor {
.insert(pending_id.into(), parent_context);
}

// Create a child cancellation token linked to parent's token (if any).
let cancellation_token =
self.get_child_cancellation_token(task.parent, task.cancellation_token);

// Add to the list of pending tasks
self.pending_backend_source_builds.push_back((
pending_id,
*task.spec,
task.cancellation_token,
));
self.pending_backend_source_builds
.push_back((pending_id, *task.spec, cancellation_token));

self.start_next_backend_source_build();
}

fn start_next_backend_source_build(&mut self) {
use crate::command_dispatcher::CommandDispatcherContext;

let limit = self
.inner
.limits
Expand Down Expand Up @@ -72,6 +75,10 @@ impl CommandDispatcherProcessor {
reporter.on_started(id, Box::new(rx));
}

// Store the cancellation token for this context so child tasks can link to it.
let context = CommandDispatcherContext::BackendSourceBuild(backend_source_build_id);
self.store_cancellation_token(context, cancellation_token.clone());

// Add the task to the list of pending futures.
self.pending_futures.push(
cancellation_token
Expand All @@ -97,7 +104,12 @@ impl CommandDispatcherProcessor {
id: BackendSourceBuildId,
result: Result<BackendBuiltSource, CommandDispatcherError<BackendSourceBuildError>>,
) {
self.parent_contexts.remove(&id.into());
use crate::command_dispatcher::CommandDispatcherContext;

let context = CommandDispatcherContext::BackendSourceBuild(id);
self.parent_contexts.remove(&context);
self.remove_cancellation_token(context);

let env = self
.backend_source_builds
.remove(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl CommandDispatcherProcessor {
task.spec,
task.cancellation_token,
log_sink,
task.parent,
);
}
}
Expand All @@ -97,10 +98,17 @@ impl CommandDispatcherProcessor {
spec: BuildBackendMetadataSpec,
cancellation_token: CancellationToken,
log_sink: UnboundedSender<String>,
parent: Option<CommandDispatcherContext>,
) {
let dispatcher = self.create_task_command_dispatcher(
CommandDispatcherContext::BuildBackendMetadata(build_backend_metadata_id),
);
let dispatcher_context =
CommandDispatcherContext::BuildBackendMetadata(build_backend_metadata_id);
let dispatcher = self.create_task_command_dispatcher(dispatcher_context);

// Create a child cancellation token linked to parent's token (if any).
let cancellation_token = self.get_child_cancellation_token(parent, cancellation_token);

// Store the cancellation token for this context so child tasks can link to it.
self.store_cancellation_token(dispatcher_context, cancellation_token.clone());

// Open a channel to receive build output.
self.pending_futures.push(
Expand Down Expand Up @@ -133,7 +141,10 @@ impl CommandDispatcherProcessor {
CommandDispatcherError<BuildBackendMetadataError>,
>,
) {
self.parent_contexts.remove(&id.into());
let context = CommandDispatcherContext::BuildBackendMetadata(id);
self.parent_contexts.remove(&context);
self.remove_cancellation_token(context);

if let Some((reporter, reporter_id)) = self
.reporter
.as_deref_mut()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl CommandDispatcherProcessor {
dev_source_metadata_id,
task.spec,
task.cancellation_token,
task.parent,
);
}
}
Expand All @@ -72,10 +73,18 @@ impl CommandDispatcherProcessor {
dev_source_metadata_id: DevSourceMetadataId,
spec: DevSourceMetadataSpec,
cancellation_token: CancellationToken,
parent: Option<CommandDispatcherContext>,
) {
let dispatcher = self.create_task_command_dispatcher(
CommandDispatcherContext::DevSourceMetadata(dev_source_metadata_id),
);
let dispatcher_context =
CommandDispatcherContext::DevSourceMetadata(dev_source_metadata_id);
let dispatcher = self.create_task_command_dispatcher(dispatcher_context);

// Create a child cancellation token linked to parent's token (if any).
let cancellation_token = self.get_child_cancellation_token(parent, cancellation_token);

// Store the cancellation token for this context so child tasks can link to it.
self.store_cancellation_token(dispatcher_context, cancellation_token.clone());

self.pending_futures.push(
cancellation_token
.run_until_cancelled_owned(spec.request(dispatcher))
Expand All @@ -98,7 +107,10 @@ impl CommandDispatcherProcessor {
id: DevSourceMetadataId,
result: Result<DevSourceMetadata, CommandDispatcherError<DevSourceMetadataError>>,
) {
self.parent_contexts.remove(&id.into());
let context = CommandDispatcherContext::DevSourceMetadata(id);
self.parent_contexts.remove(&context);
self.remove_cancellation_token(context);

self.dev_source_metadata
.get_mut(&id)
.expect("cannot find pending task")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,17 @@ impl CommandDispatcherProcessor {
.as_mut()
.and_then(|reporter| reporter.create_install_reporter(reporter_context));

// Create a child cancellation token linked to parent's token (if any).
let cancellation_token =
self.get_child_cancellation_token(task.parent, task.cancellation_token);

// Store the cancellation token for this context so child tasks can link to it.
self.store_cancellation_token(dispatcher_context, cancellation_token.clone());

// Add the task to the list of pending futures.
let dispatcher = self.create_task_command_dispatcher(dispatcher_context);
self.pending_futures.push(
task.cancellation_token
cancellation_token
.run_until_cancelled_owned(task.spec.install(dispatcher, install_reporter))
.map(move |result| {
TaskResult::InstallPixiEnvironment(
Expand All @@ -81,7 +88,9 @@ impl CommandDispatcherProcessor {
CommandDispatcherError<InstallPixiEnvironmentError>,
>,
) {
self.parent_contexts.remove(&id.into());
let context = CommandDispatcherContext::InstallPixiEnvironment(id);
self.parent_contexts.remove(&context);
self.remove_cancellation_token(context);
let env = self
.install_pixi_environment
.remove(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,18 @@ impl CommandDispatcherProcessor {
reporter.on_started(reporter_id)
}

let command_queue = self.create_task_command_dispatcher(
CommandDispatcherContext::InstantiateToolEnv(id),
);
let dispatcher_context = CommandDispatcherContext::InstantiateToolEnv(id);

// Create a child cancellation token linked to parent's token (if any).
let cancellation_token =
self.get_child_cancellation_token(task.parent, task.cancellation_token);

// Store the cancellation token for this context so child tasks can link to it.
self.store_cancellation_token(dispatcher_context, cancellation_token.clone());

let command_queue = self.create_task_command_dispatcher(dispatcher_context);
self.pending_futures.push(
task.cancellation_token
cancellation_token
.run_until_cancelled_owned(task.spec.instantiate(command_queue))
.map(move |result| {
TaskResult::InstantiateToolEnv(
Expand All @@ -103,7 +110,10 @@ impl CommandDispatcherProcessor {
CommandDispatcherError<InstantiateToolEnvironmentError>,
>,
) {
self.parent_contexts.remove(&id.into());
let context = CommandDispatcherContext::InstantiateToolEnv(id);
self.parent_contexts.remove(&context);
self.remove_cancellation_token(context);

if let Some((reporter, reporter_id)) = self
.reporter
.as_deref_mut()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ pub(crate) struct CommandDispatcherProcessor {
/// Keeps track of the parent context for each task that is being processed.
parent_contexts: HashMap<CommandDispatcherContext, CommandDispatcherContext>,

/// Keeps track of cancellation tokens for each task context.
/// Used to create child tokens that are automatically cancelled when the parent is cancelled.
cancellation_tokens: HashMap<CommandDispatcherContext, CancellationToken>,

/// A weak reference to the sender. This is used to allow constructing new
/// [`Dispatchers`] without keeping the channel alive if there are no
/// dispatchers alive. This is important because the command_dispatcher
Expand Down Expand Up @@ -344,6 +348,7 @@ impl CommandDispatcherProcessor {
let task = Self {
receiver: rx,
parent_contexts: HashMap::new(),
cancellation_tokens: HashMap::new(),
sender: weak_tx,
conda_solves: slotmap::SlotMap::default(),
pending_conda_solves: VecDeque::new(),
Expand Down Expand Up @@ -627,4 +632,34 @@ impl CommandDispatcherProcessor {
.filter_map(|context| T::try_from(context).ok())
.contains(&id)
}

/// Creates a child cancellation token linked to the parent's token.
///
/// If the parent context has a stored cancellation token, creates a child token
/// that will be automatically cancelled when the parent is cancelled.
/// Otherwise, returns the provided token unchanged.
fn get_child_cancellation_token(
&self,
parent: Option<CommandDispatcherContext>,
token: CancellationToken,
) -> CancellationToken {
parent
.and_then(|ctx| self.cancellation_tokens.get(&ctx))
.map(|parent_token| parent_token.child_token())
.unwrap_or(token)
}

/// Stores a cancellation token for the given context.
fn store_cancellation_token(
&mut self,
context: CommandDispatcherContext,
token: CancellationToken,
) {
self.cancellation_tokens.insert(context, token);
}

/// Removes the cancellation token for the given context.
fn remove_cancellation_token(&mut self, context: CommandDispatcherContext) {
self.cancellation_tokens.remove(&context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,22 @@ impl CommandDispatcherProcessor {
self.parent_contexts.insert(environment_id.into(), parent);
}

// Create a child cancellation token linked to parent's token (if any).
let cancellation_token =
self.get_child_cancellation_token(task.parent, task.cancellation_token);

// Add the environment to the list of pending environments.
self.pending_conda_solves
.push_back((environment_id, task.spec, task.cancellation_token));
.push_back((environment_id, task.spec, cancellation_token));

// Queue up as many solves as possible.
self.start_next_conda_environment_solves();
}

/// Queue as many solves as possible within the allowed limits.
fn start_next_conda_environment_solves(&mut self) {
use crate::command_dispatcher::CommandDispatcherContext;

let limit = self
.inner
.limits
Expand All @@ -67,6 +73,10 @@ impl CommandDispatcherProcessor {
reporter.on_start(id)
}

// Store the cancellation token for this context so child tasks can link to it.
let context = CommandDispatcherContext::SolveCondaEnvironment(environment_id);
self.store_cancellation_token(context, cancellation_token.clone());

// Add the task to the list of pending futures.
self.pending_futures.push(
cancellation_token
Expand All @@ -92,7 +102,11 @@ impl CommandDispatcherProcessor {
id: SolveCondaEnvironmentId,
result: Result<Vec<PixiRecord>, CommandDispatcherError<SolveCondaEnvironmentError>>,
) {
self.parent_contexts.remove(&id.into());
use crate::command_dispatcher::CommandDispatcherContext;

let context = CommandDispatcherContext::SolveCondaEnvironment(id);
self.parent_contexts.remove(&context);
self.remove_cancellation_token(context);
let env = self
.conda_solves
.remove(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,18 @@ impl CommandDispatcherProcessor {
.as_deref_mut()
.and_then(|reporter| reporter.create_gateway_reporter(reporter_context));

// Create a child cancellation token linked to parent's token (if any).
// This ensures that when the parent task is cancelled, this task is also cancelled.
let cancellation_token =
self.get_child_cancellation_token(task.parent, task.cancellation_token);

// Store the cancellation token for this context so child tasks can link to it.
self.store_cancellation_token(dispatcher_context, cancellation_token.clone());

// Add the task to the list of pending futures.
let dispatcher = self.create_task_command_dispatcher(dispatcher_context);
self.pending_futures.push(
task.cancellation_token
cancellation_token
.run_until_cancelled_owned(task.spec.solve(dispatcher, gateway_reporter))
.map(move |result| {
TaskResult::SolvePixiEnvironment(
Expand All @@ -76,7 +84,9 @@ impl CommandDispatcherProcessor {
id: SolvePixiEnvironmentId,
result: Result<Vec<PixiRecord>, CommandDispatcherError<SolvePixiEnvironmentError>>,
) {
self.parent_contexts.remove(&id.into());
let context = CommandDispatcherContext::SolvePixiEnvironment(id);
self.parent_contexts.remove(&context);
self.remove_cancellation_token(context);
let env = self
.solve_pixi_environments
.remove(id)
Expand Down
Loading
Loading