Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 277 additions & 16 deletions nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use nativelink_util::instant_wrapper::InstantWrapper;
use nativelink_util::spawn;
use nativelink_util::task::JoinHandleDropGuard;
use tokio::sync::{Notify, mpsc, watch};
use tracing::{debug, error};
use tracing::{debug, error, info, instrument, warn};

use crate::awaited_action_db::{
AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, CLIENT_KEEPALIVE_DURATION,
Expand Down Expand Up @@ -549,24 +549,29 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
ActionUniqueQualifier::Cacheable(action_key) => {
let maybe_awaited_action =
action_info_hash_key_to_awaited_action.remove(action_key);
match maybe_awaited_action {
Some(removed_operation_id) => {
if &removed_operation_id != new_awaited_action.operation_id() {
error!(
?removed_operation_id,
?new_awaited_action,
?action_key,
"action_info_hash_key_to_awaited_action and operation_id_to_awaited_action are out of sync",
);
}
}
None => {
if let Some(removed_operation_id) = maybe_awaited_action {
if &removed_operation_id != new_awaited_action.operation_id() {
error!(
?removed_operation_id,
?new_awaited_action,
?action_key,
"action_info_hash_key_to_awaited_action out of sync, it should have had the unique_key",
"action_info_hash_key_to_awaited_action and operation_id_to_awaited_action are out of sync",
);
}
} else {
warn!(
?new_awaited_action,
?action_key,
"action_info_hash_key_to_awaited_action missing key for finished action - attempting recovery"
);
// This is a recoverable inconsistency - the action finished but
// its key mapping was already removed or never existed.
// Since the action is finished, we don't need to restore the mapping,
// but we should check for data corruption indicators.
Self::log_consistency_diagnostic(
action_info_hash_key_to_awaited_action,
new_awaited_action,
);
}
}
ActionUniqueQualifier::Uncacheable(_action_key) => {
Expand All @@ -577,6 +582,52 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
}
}

/// Logs diagnostic information when data structure inconsistency is detected.
/// This helps with debugging and provides actionable information for recovery.
#[instrument(
name = "memory_awaited_action_db.log_consistency_diagnostic",
skip_all,
fields(
operation_id = %awaited_action.operation_id(),
action_stage = ?awaited_action.state().stage,
hash_key_count = action_info_hash_key_to_awaited_action.len(),
corruption_detected = tracing::field::Empty,
)
)]
fn log_consistency_diagnostic(
action_info_hash_key_to_awaited_action: &HashMap<ActionUniqueKey, OperationId>,
awaited_action: &AwaitedAction,
) {
let operation_id = awaited_action.operation_id();
let stage = &awaited_action.state().stage;

// Count how many hash key mappings exist for diagnostic purposes
let hash_key_count = action_info_hash_key_to_awaited_action.len();

// Check if this operation_id appears elsewhere in the hash map (shouldn't happen)
let operation_id_appears_elsewhere = action_info_hash_key_to_awaited_action
.values()
.any(|id| id == operation_id);

if operation_id_appears_elsewhere {
error!(
?operation_id,
?stage,
hash_key_count,
"CRITICAL: Operation ID found in hash key map with different key - potential data corruption"
);
tracing::Span::current().record("corruption_detected", true);
} else {
debug!(
?operation_id,
?stage,
hash_key_count,
"Hash key mapping missing for finished action - likely harmless race condition"
);
tracing::Span::current().record("corruption_detected", false);
}
}

fn update_awaited_action(
&mut self,
mut new_awaited_action: AwaitedAction,
Expand Down Expand Up @@ -629,8 +680,27 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
.is_same_stage(&new_awaited_action.state().stage);

if !is_same_stage {
self.sorted_action_info_hash_keys
.process_state_changes(&old_awaited_action, &new_awaited_action)?;
// Try to process state changes and validate consistency on error
if let Err(e) = self
.sorted_action_info_hash_keys
.process_state_changes(&old_awaited_action, &new_awaited_action)
{
warn!(
error = ?e,
?old_awaited_action,
?new_awaited_action,
"State change processing failed, validating consistency"
);
// Don't fail on validation errors during error recovery
if let Err(validation_err) = self.validate_consistency() {
error!(
validation_error = ?validation_err,
"Data structure consistency validation failed after state change error"
);
}
return Err(e);
}

Self::process_state_changes_for_hash_key_map(
&mut self.action_info_hash_key_to_awaited_action,
&new_awaited_action,
Expand Down Expand Up @@ -818,6 +888,140 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
self.now_fn.clone(),
)))
}

/// Validates consistency between the three main data structures.
/// This is primarily used for debugging and can be called periodically
/// or after errors to detect data corruption.
#[instrument(
name = "memory_awaited_action_db.validate_consistency",
skip_all,
fields(
hash_key_count = self.action_info_hash_key_to_awaited_action.len(),
operation_count = self.operation_id_to_awaited_action.len(),
connected_clients_count = self.connected_clients_for_operation_id.len(),
)
)]
fn validate_consistency(&self) -> Result<(), Error> {
// Check that all entries in action_info_hash_key_to_awaited_action
// have corresponding entries in operation_id_to_awaited_action
for (action_key, operation_id) in &self.action_info_hash_key_to_awaited_action {
if !self
.operation_id_to_awaited_action
.contains_key(operation_id)
{
return Err(make_err!(
Code::Internal,
"Hash key map contains operation_id {operation_id} for key {action_key:?} but operation_id_to_awaited_action does not"
));
}
}

// Check that all cacheable non-finished actions in operation_id_to_awaited_action
// have corresponding entries in action_info_hash_key_to_awaited_action
for (operation_id, tx) in &self.operation_id_to_awaited_action {
let awaited_action = tx.borrow();
if let ActionUniqueQualifier::Cacheable(action_key) =
&awaited_action.action_info().unique_qualifier
{
if !awaited_action.state().stage.is_finished() {
match self.action_info_hash_key_to_awaited_action.get(action_key) {
Some(mapped_operation_id) => {
if mapped_operation_id != operation_id {
return Err(make_err!(
Code::Internal,
"Hash key map has incorrect operation_id mapping: key {action_key:?} maps to {mapped_operation_id} but should map to {operation_id}"
));
}
}
None => {
return Err(make_err!(
Code::Internal,
"Non-finished cacheable action {operation_id} with key {action_key:?} missing from hash key map"
));
}
}
}
}
}

// Check that connected_clients_for_operation_id is consistent
for operation_id in self.connected_clients_for_operation_id.keys() {
if !self
.operation_id_to_awaited_action
.contains_key(operation_id)
{
return Err(make_err!(
Code::Internal,
"connected_clients_for_operation_id contains {operation_id} but operation_id_to_awaited_action does not"
));
}
}

Ok(())
}

/// Attempts to recover from data structure inconsistencies by rebuilding
/// the hash key mapping from the `operation_id_to_awaited_action` map.
/// This is a self-healing mechanism for when the maps get out of sync.
#[instrument(
name = "memory_awaited_action_db.attempt_recovery",
skip_all,
fields(
original_hash_key_count = self.action_info_hash_key_to_awaited_action.len(),
operation_count = self.operation_id_to_awaited_action.len(),
)
)]
fn attempt_recovery(&mut self) {
let mut recovered_mappings = 0;

// First, rebuild action_info_hash_key_to_awaited_action from scratch
let mut new_hash_key_map = HashMap::new();

for (operation_id, tx) in &self.operation_id_to_awaited_action {
let awaited_action = tx.borrow();
if let ActionUniqueQualifier::Cacheable(action_key) =
&awaited_action.action_info().unique_qualifier
{
// Only add non-finished actions to the hash key map
if !awaited_action.state().stage.is_finished() {
if let Some(existing_operation_id) =
new_hash_key_map.insert(action_key.clone(), operation_id.clone())
{
warn!(
?action_key,
?operation_id,
?existing_operation_id,
"Duplicate cacheable action detected during recovery - keeping newer entry"
);
} else {
recovered_mappings += 1;
}
}
}
}

// Count how many stale mappings we're removing
let removed_stale_mappings = self
.action_info_hash_key_to_awaited_action
.len()
.saturating_sub(new_hash_key_map.len());

// Replace the hash key map with the rebuilt version
self.action_info_hash_key_to_awaited_action = new_hash_key_map;

warn!(
recovered_mappings,
removed_stale_mappings, "Data structure recovery completed"
);

// Record recovery metrics for OpenTelemetry
tracing::Span::current().record("recovered_mappings", recovered_mappings);
tracing::Span::current().record("removed_stale_mappings", removed_stale_mappings);
tracing::Span::current().record(
"final_hash_key_count",
self.action_info_hash_key_to_awaited_action.len(),
);
}
}

#[derive(Debug, MetricsComponent)]
Expand Down Expand Up @@ -868,6 +1072,63 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static>
}),
}
}

/// Validates data structure consistency and attempts recovery if needed.
/// This is a public method that can be called by external monitoring
/// or debugging tools to check and repair the database state.
#[instrument(
name = "memory_awaited_action_db.validate_and_recover",
skip_all,
fields(
validation_result = tracing::field::Empty,
recovery_attempted = false,
recovery_result = tracing::field::Empty,
)
)]
pub async fn validate_and_recover(&self) -> Result<(), Error> {
let mut inner = self.inner.lock().await;

// First attempt validation
match inner.validate_consistency() {
Ok(()) => {
debug!("Memory awaited action database consistency validation passed");
tracing::Span::current().record("validation_result", "passed");
Ok(())
}
Err(validation_error) => {
warn!(
error = ?validation_error,
"Memory awaited action database consistency validation failed, attempting recovery"
);
tracing::Span::current().record("validation_result", "failed");
tracing::Span::current().record("recovery_attempted", true);

// Attempt recovery
inner.attempt_recovery();

// Validate again after recovery
match inner.validate_consistency() {
Ok(()) => {
info!("Memory awaited action database successfully recovered");
tracing::Span::current().record("recovery_result", "success");
Ok(())
}
Err(post_recovery_error) => {
error!(
original_error = ?validation_error,
post_recovery_error = ?post_recovery_error,
"Failed to recover memory awaited action database"
);
tracing::Span::current().record("recovery_result", "failed");
Err(make_err!(
Code::Internal,
"Database recovery failed: original error: {validation_error}, post-recovery error: {post_recovery_error}"
))
}
}
}
}
}
}

impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> AwaitedActionDb
Expand Down
Loading
Loading