Skip to content

Commit 883d620

Browse files
Fix action_info_hash_key_to_awaited_action sync issues with validation and recovery
1 parent cd3f993 commit 883d620

File tree

2 files changed

+522
-16
lines changed

2 files changed

+522
-16
lines changed

nativelink-scheduler/src/memory_awaited_action_db.rs

Lines changed: 277 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use nativelink_util::instant_wrapper::InstantWrapper;
3131
use nativelink_util::spawn;
3232
use nativelink_util::task::JoinHandleDropGuard;
3333
use tokio::sync::{Notify, mpsc, watch};
34-
use tracing::{debug, error};
34+
use tracing::{debug, error, info, instrument, warn};
3535

3636
use crate::awaited_action_db::{
3737
AwaitedAction, AwaitedActionDb, AwaitedActionSubscriber, CLIENT_KEEPALIVE_DURATION,
@@ -549,24 +549,29 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
549549
ActionUniqueQualifier::Cacheable(action_key) => {
550550
let maybe_awaited_action =
551551
action_info_hash_key_to_awaited_action.remove(action_key);
552-
match maybe_awaited_action {
553-
Some(removed_operation_id) => {
554-
if &removed_operation_id != new_awaited_action.operation_id() {
555-
error!(
556-
?removed_operation_id,
557-
?new_awaited_action,
558-
?action_key,
559-
"action_info_hash_key_to_awaited_action and operation_id_to_awaited_action are out of sync",
560-
);
561-
}
562-
}
563-
None => {
552+
if let Some(removed_operation_id) = maybe_awaited_action {
553+
if &removed_operation_id != new_awaited_action.operation_id() {
564554
error!(
555+
?removed_operation_id,
565556
?new_awaited_action,
566557
?action_key,
567-
"action_info_hash_key_to_awaited_action out of sync, it should have had the unique_key",
558+
"action_info_hash_key_to_awaited_action and operation_id_to_awaited_action are out of sync",
568559
);
569560
}
561+
} else {
562+
warn!(
563+
?new_awaited_action,
564+
?action_key,
565+
"action_info_hash_key_to_awaited_action missing key for finished action - attempting recovery"
566+
);
567+
// This is a recoverable inconsistency - the action finished but
568+
// its key mapping was already removed or never existed.
569+
// Since the action is finished, we don't need to restore the mapping,
570+
// but we should check for data corruption indicators.
571+
Self::log_consistency_diagnostic(
572+
action_info_hash_key_to_awaited_action,
573+
new_awaited_action,
574+
);
570575
}
571576
}
572577
ActionUniqueQualifier::Uncacheable(_action_key) => {
@@ -577,6 +582,52 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
577582
}
578583
}
579584

585+
/// Logs diagnostic information when data structure inconsistency is detected.
586+
/// This helps with debugging and provides actionable information for recovery.
587+
#[instrument(
588+
name = "memory_awaited_action_db.log_consistency_diagnostic",
589+
skip_all,
590+
fields(
591+
operation_id = %awaited_action.operation_id(),
592+
action_stage = ?awaited_action.state().stage,
593+
hash_key_count = action_info_hash_key_to_awaited_action.len(),
594+
corruption_detected = tracing::field::Empty,
595+
)
596+
)]
597+
fn log_consistency_diagnostic(
598+
action_info_hash_key_to_awaited_action: &HashMap<ActionUniqueKey, OperationId>,
599+
awaited_action: &AwaitedAction,
600+
) {
601+
let operation_id = awaited_action.operation_id();
602+
let stage = &awaited_action.state().stage;
603+
604+
// Count how many hash key mappings exist for diagnostic purposes
605+
let hash_key_count = action_info_hash_key_to_awaited_action.len();
606+
607+
// Check if this operation_id appears elsewhere in the hash map (shouldn't happen)
608+
let operation_id_appears_elsewhere = action_info_hash_key_to_awaited_action
609+
.values()
610+
.any(|id| id == operation_id);
611+
612+
if operation_id_appears_elsewhere {
613+
error!(
614+
?operation_id,
615+
?stage,
616+
hash_key_count,
617+
"CRITICAL: Operation ID found in hash key map with different key - potential data corruption"
618+
);
619+
tracing::Span::current().record("corruption_detected", true);
620+
} else {
621+
debug!(
622+
?operation_id,
623+
?stage,
624+
hash_key_count,
625+
"Hash key mapping missing for finished action - likely harmless race condition"
626+
);
627+
tracing::Span::current().record("corruption_detected", false);
628+
}
629+
}
630+
580631
fn update_awaited_action(
581632
&mut self,
582633
mut new_awaited_action: AwaitedAction,
@@ -629,8 +680,27 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
629680
.is_same_stage(&new_awaited_action.state().stage);
630681

631682
if !is_same_stage {
632-
self.sorted_action_info_hash_keys
633-
.process_state_changes(&old_awaited_action, &new_awaited_action)?;
683+
// Try to process state changes and validate consistency on error
684+
if let Err(e) = self
685+
.sorted_action_info_hash_keys
686+
.process_state_changes(&old_awaited_action, &new_awaited_action)
687+
{
688+
warn!(
689+
error = ?e,
690+
?old_awaited_action,
691+
?new_awaited_action,
692+
"State change processing failed, validating consistency"
693+
);
694+
// Don't fail on validation errors during error recovery
695+
if let Err(validation_err) = self.validate_consistency() {
696+
error!(
697+
validation_error = ?validation_err,
698+
"Data structure consistency validation failed after state change error"
699+
);
700+
}
701+
return Err(e);
702+
}
703+
634704
Self::process_state_changes_for_hash_key_map(
635705
&mut self.action_info_hash_key_to_awaited_action,
636706
&new_awaited_action,
@@ -818,6 +888,140 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync> AwaitedActionDbI
818888
self.now_fn.clone(),
819889
)))
820890
}
891+
892+
/// Validates consistency between the three main data structures.
893+
/// This is primarily used for debugging and can be called periodically
894+
/// or after errors to detect data corruption.
895+
#[instrument(
896+
name = "memory_awaited_action_db.validate_consistency",
897+
skip_all,
898+
fields(
899+
hash_key_count = self.action_info_hash_key_to_awaited_action.len(),
900+
operation_count = self.operation_id_to_awaited_action.len(),
901+
connected_clients_count = self.connected_clients_for_operation_id.len(),
902+
)
903+
)]
904+
fn validate_consistency(&self) -> Result<(), Error> {
905+
// Check that all entries in action_info_hash_key_to_awaited_action
906+
// have corresponding entries in operation_id_to_awaited_action
907+
for (action_key, operation_id) in &self.action_info_hash_key_to_awaited_action {
908+
if !self
909+
.operation_id_to_awaited_action
910+
.contains_key(operation_id)
911+
{
912+
return Err(make_err!(
913+
Code::Internal,
914+
"Hash key map contains operation_id {operation_id} for key {action_key:?} but operation_id_to_awaited_action does not"
915+
));
916+
}
917+
}
918+
919+
// Check that all cacheable non-finished actions in operation_id_to_awaited_action
920+
// have corresponding entries in action_info_hash_key_to_awaited_action
921+
for (operation_id, tx) in &self.operation_id_to_awaited_action {
922+
let awaited_action = tx.borrow();
923+
if let ActionUniqueQualifier::Cacheable(action_key) =
924+
&awaited_action.action_info().unique_qualifier
925+
{
926+
if !awaited_action.state().stage.is_finished() {
927+
match self.action_info_hash_key_to_awaited_action.get(action_key) {
928+
Some(mapped_operation_id) => {
929+
if mapped_operation_id != operation_id {
930+
return Err(make_err!(
931+
Code::Internal,
932+
"Hash key map has incorrect operation_id mapping: key {action_key:?} maps to {mapped_operation_id} but should map to {operation_id}"
933+
));
934+
}
935+
}
936+
None => {
937+
return Err(make_err!(
938+
Code::Internal,
939+
"Non-finished cacheable action {operation_id} with key {action_key:?} missing from hash key map"
940+
));
941+
}
942+
}
943+
}
944+
}
945+
}
946+
947+
// Check that connected_clients_for_operation_id is consistent
948+
for operation_id in self.connected_clients_for_operation_id.keys() {
949+
if !self
950+
.operation_id_to_awaited_action
951+
.contains_key(operation_id)
952+
{
953+
return Err(make_err!(
954+
Code::Internal,
955+
"connected_clients_for_operation_id contains {operation_id} but operation_id_to_awaited_action does not"
956+
));
957+
}
958+
}
959+
960+
Ok(())
961+
}
962+
963+
/// Attempts to recover from data structure inconsistencies by rebuilding
964+
/// the hash key mapping from the `operation_id_to_awaited_action` map.
965+
/// This is a self-healing mechanism for when the maps get out of sync.
966+
#[instrument(
967+
name = "memory_awaited_action_db.attempt_recovery",
968+
skip_all,
969+
fields(
970+
original_hash_key_count = self.action_info_hash_key_to_awaited_action.len(),
971+
operation_count = self.operation_id_to_awaited_action.len(),
972+
)
973+
)]
974+
fn attempt_recovery(&mut self) {
975+
let mut recovered_mappings = 0;
976+
977+
// First, rebuild action_info_hash_key_to_awaited_action from scratch
978+
let mut new_hash_key_map = HashMap::new();
979+
980+
for (operation_id, tx) in &self.operation_id_to_awaited_action {
981+
let awaited_action = tx.borrow();
982+
if let ActionUniqueQualifier::Cacheable(action_key) =
983+
&awaited_action.action_info().unique_qualifier
984+
{
985+
// Only add non-finished actions to the hash key map
986+
if !awaited_action.state().stage.is_finished() {
987+
if let Some(existing_operation_id) =
988+
new_hash_key_map.insert(action_key.clone(), operation_id.clone())
989+
{
990+
warn!(
991+
?action_key,
992+
?operation_id,
993+
?existing_operation_id,
994+
"Duplicate cacheable action detected during recovery - keeping newer entry"
995+
);
996+
} else {
997+
recovered_mappings += 1;
998+
}
999+
}
1000+
}
1001+
}
1002+
1003+
// Count how many stale mappings we're removing
1004+
let removed_stale_mappings = self
1005+
.action_info_hash_key_to_awaited_action
1006+
.len()
1007+
.saturating_sub(new_hash_key_map.len());
1008+
1009+
// Replace the hash key map with the rebuilt version
1010+
self.action_info_hash_key_to_awaited_action = new_hash_key_map;
1011+
1012+
warn!(
1013+
recovered_mappings,
1014+
removed_stale_mappings, "Data structure recovery completed"
1015+
);
1016+
1017+
// Record recovery metrics for OpenTelemetry
1018+
tracing::Span::current().record("recovered_mappings", recovered_mappings);
1019+
tracing::Span::current().record("removed_stale_mappings", removed_stale_mappings);
1020+
tracing::Span::current().record(
1021+
"final_hash_key_count",
1022+
self.action_info_hash_key_to_awaited_action.len(),
1023+
);
1024+
}
8211025
}
8221026

8231027
#[derive(Debug, MetricsComponent)]
@@ -868,6 +1072,63 @@ impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static>
8681072
}),
8691073
}
8701074
}
1075+
1076+
/// Validates data structure consistency and attempts recovery if needed.
1077+
/// This is a public method that can be called by external monitoring
1078+
/// or debugging tools to check and repair the database state.
1079+
#[instrument(
1080+
name = "memory_awaited_action_db.validate_and_recover",
1081+
skip_all,
1082+
fields(
1083+
validation_result = tracing::field::Empty,
1084+
recovery_attempted = false,
1085+
recovery_result = tracing::field::Empty,
1086+
)
1087+
)]
1088+
pub async fn validate_and_recover(&self) -> Result<(), Error> {
1089+
let mut inner = self.inner.lock().await;
1090+
1091+
// First attempt validation
1092+
match inner.validate_consistency() {
1093+
Ok(()) => {
1094+
debug!("Memory awaited action database consistency validation passed");
1095+
tracing::Span::current().record("validation_result", "passed");
1096+
Ok(())
1097+
}
1098+
Err(validation_error) => {
1099+
warn!(
1100+
error = ?validation_error,
1101+
"Memory awaited action database consistency validation failed, attempting recovery"
1102+
);
1103+
tracing::Span::current().record("validation_result", "failed");
1104+
tracing::Span::current().record("recovery_attempted", true);
1105+
1106+
// Attempt recovery
1107+
inner.attempt_recovery();
1108+
1109+
// Validate again after recovery
1110+
match inner.validate_consistency() {
1111+
Ok(()) => {
1112+
info!("Memory awaited action database successfully recovered");
1113+
tracing::Span::current().record("recovery_result", "success");
1114+
Ok(())
1115+
}
1116+
Err(post_recovery_error) => {
1117+
error!(
1118+
original_error = ?validation_error,
1119+
post_recovery_error = ?post_recovery_error,
1120+
"Failed to recover memory awaited action database"
1121+
);
1122+
tracing::Span::current().record("recovery_result", "failed");
1123+
Err(make_err!(
1124+
Code::Internal,
1125+
"Database recovery failed: original error: {validation_error}, post-recovery error: {post_recovery_error}"
1126+
))
1127+
}
1128+
}
1129+
}
1130+
}
1131+
}
8711132
}
8721133

8731134
impl<I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Sync + 'static> AwaitedActionDb

0 commit comments

Comments
 (0)