Skip to content

lighthouse: add commit_failures support #183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions proto/torchft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ message QuorumMember {
int64 step = 4;
uint64 world_size = 5;
bool shrink_only = 6;
int64 commit_failures = 8;
// User passing in data stored as JSON string.
string data = 7;
}
Expand Down Expand Up @@ -77,6 +78,7 @@ message ManagerQuorumRequest {
string checkpoint_metadata = 3;
bool shrink_only = 4;
bool init_sync = 5;
int64 commit_failures = 6;
}

message ManagerQuorumResponse {
Expand All @@ -93,6 +95,7 @@ message ManagerQuorumResponse {
int64 replica_rank = 9;
int64 replica_world_size = 10;
bool heal = 11;
int64 commit_failures = 12;
}

message CheckpointMetadataRequest {
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ impl ManagerClient {
checkpoint_metadata: String,
shrink_only: bool,
init_sync: bool,
commit_failures: i64,
timeout: Duration,
) -> Result<QuorumResult, StatusError> {
py.allow_threads(move || {
Expand All @@ -186,6 +187,7 @@ impl ManagerClient {
checkpoint_metadata: checkpoint_metadata,
shrink_only: shrink_only,
init_sync: init_sync,
commit_failures: commit_failures,
});

// This timeout is processed on the server side so we also enable
Expand Down Expand Up @@ -547,6 +549,7 @@ impl LighthouseClient {
world_size: world_size,
shrink_only: shrink_only,
data: data_string,
commit_failures: 0,
}),
});

Expand Down
104 changes: 103 additions & 1 deletion src/lighthouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,12 @@ impl Lighthouse {
if quorum_met.is_some() {
let participants = quorum_met.unwrap();

let commit_failure_replica_ids: Vec<String> = participants
.iter()
.filter(|p| p.commit_failures > 0)
.map(|p| p.replica_id.clone())
.collect();

// only increment quorum ID if something about the quorum
// changed (members/addresses/etc)
if state.prev_quorum.is_none()
Expand All @@ -301,6 +307,13 @@ impl Lighthouse {
"Detected quorum change, bumping quorum_id to {}",
state.quorum_id
);
} else if commit_failure_replica_ids.len() > 0 {
state.quorum_id += 1;
info!(
"Detected commit failures in [{}], bumping quorum_id to {}",
commit_failure_replica_ids.join(", "),
state.quorum_id
);
}

let quorum = Quorum {
Expand Down Expand Up @@ -639,6 +652,7 @@ mod tests {
world_size: 1,
shrink_only: false,
data: String::new(),
commit_failures: 0,
},
},
);
Expand All @@ -656,6 +670,7 @@ mod tests {
world_size: 1,
shrink_only: false,
data: String::new(),
commit_failures: 0,
},
},
);
Expand Down Expand Up @@ -712,6 +727,7 @@ mod tests {
world_size: 1,
shrink_only: false,
data: String::new(),
commit_failures: 0,
},
},
);
Expand Down Expand Up @@ -751,6 +767,7 @@ mod tests {
world_size: 1,
shrink_only: false,
data: String::new(),
commit_failures: 0,
},
},
);
Expand Down Expand Up @@ -798,6 +815,7 @@ mod tests {
world_size: 1,
shrink_only: false,
data: String::new(),
commit_failures: 0,
},
},
);
Expand All @@ -819,6 +837,7 @@ mod tests {
world_size: 1,
shrink_only: false,
data: String::new(),
commit_failures: 0,
}],
created: Some(SystemTime::now().into()),
});
Expand All @@ -838,6 +857,7 @@ mod tests {
world_size: 1,
shrink_only: false,
data: String::new(),
commit_failures: 0,
},
},
);
Expand Down Expand Up @@ -882,6 +902,7 @@ mod tests {
world_size: 1,
shrink_only: false,
data: String::new(),
commit_failures: 0,
},
QuorumMember {
replica_id: "b".to_string(),
Expand All @@ -891,6 +912,7 @@ mod tests {
world_size: 1,
shrink_only: false,
data: String::new(),
commit_failures: 0,
},
],
created: Some(SystemTime::now().into()),
Expand All @@ -908,6 +930,7 @@ mod tests {
world_size: 1,
shrink_only: true,
data: String::new(),
commit_failures: 0,
},
},
);
Expand All @@ -926,6 +949,7 @@ mod tests {
world_size: 1,
shrink_only: true,
data: String::new(),
commit_failures: 0,
},
},
);
Expand Down Expand Up @@ -975,6 +999,7 @@ mod tests {
world_size: 1,
shrink_only: false,
data: String::new(),
commit_failures: 0,
}),
});

Expand Down Expand Up @@ -1021,6 +1046,7 @@ mod tests {
world_size: 1,
shrink_only: false,
data: String::new(),
commit_failures: 0,
},
},
);
Expand All @@ -1047,6 +1073,7 @@ mod tests {
world_size: 1,
shrink_only: false,
data: String::new(),
commit_failures: 0,
}];
let b = vec![QuorumMember {
replica_id: "1".to_string(),
Expand All @@ -1056,6 +1083,7 @@ mod tests {
world_size: 1,
shrink_only: false,
data: String::new(),
commit_failures: 0,
}];

// replica_id is the same
Expand All @@ -1069,12 +1097,13 @@ mod tests {
world_size: 1,
shrink_only: false,
data: String::new(),
commit_failures: 0,
}];
// replica_id changed
assert!(quorum_changed(&a, &c));
}
#[tokio::test]

#[tokio::test]
async fn test_lighthouse_join_during_shrink() -> Result<()> {
fn create_member(id: &str, addr_num: &str, step: i64, shrink_only: bool) -> QuorumMember {
QuorumMember {
Expand All @@ -1085,6 +1114,7 @@ mod tests {
world_size: 1,
shrink_only,
data: String::new(),
commit_failures: 0,
}
}

Expand Down Expand Up @@ -1179,4 +1209,76 @@ mod tests {
lighthouse_task.abort();
Ok(())
}

#[tokio::test]
async fn test_lighthouse_commit_failures() -> Result<()> {
fn create_member(id: &str, commit_failures: i64) -> QuorumMember {
QuorumMember {
replica_id: id.to_string(),
address: format!("addr{}", id),
store_address: format!("store{}", id),
step: 10,
world_size: 1,
shrink_only: false,
data: String::new(),
commit_failures,
}
}

fn create_request(member: &QuorumMember) -> tonic::Request<LighthouseQuorumRequest> {
tonic::Request::new(LighthouseQuorumRequest {
requester: Some(member.clone()),
})
}

let opt = LighthouseOpt {
min_replicas: 2,
bind: "[::]:0".to_string(),
join_timeout_ms: 1000,
quorum_tick_ms: 10,
heartbeat_timeout_ms: 5000,
};

// Start the lighthouse service
let lighthouse = Lighthouse::new(opt).await?;
let lighthouse_task = tokio::spawn(lighthouse.clone().run());

// Create client to interact with lighthouse
let mut client = lighthouse_client_new(lighthouse.address()).await?;

// First two quorums should be stable
for _i in 0..2 {
let first_request = create_request(&create_member("replica0", 0));
let second_request = create_request(&create_member("replica1", 0));

tokio::spawn({
let mut client = client.clone();
async move { client.quorum(first_request).await }
});
let first_response = client.quorum(second_request).await?;
let first_quorum = first_response.into_inner().quorum.unwrap();
assert_eq!(first_quorum.quorum_id, 1);
assert_eq!(first_quorum.participants.len(), 2);
assert_eq!(first_quorum.participants[0].commit_failures, 0);
assert_eq!(first_quorum.participants[1].commit_failures, 0);
}

// commit_failures should increment quorum_id
let first_request = create_request(&create_member("replica0", 0));
let second_request = create_request(&create_member("replica1", 2));

tokio::spawn({
let mut client = client.clone();
async move { client.quorum(first_request).await }
});
let first_response = client.quorum(second_request).await?;
let first_quorum = first_response.into_inner().quorum.unwrap();
assert_eq!(first_quorum.quorum_id, 2);
assert_eq!(first_quorum.participants.len(), 2);
assert_eq!(first_quorum.participants[0].commit_failures, 0);
assert_eq!(first_quorum.participants[1].commit_failures, 2);

lighthouse_task.abort();
Ok(())
}
}
Loading