Skip to content

Commit 161af71

Browse files
committed
lighthouse: add commit_failures support
1 parent 9e4bc3c commit 161af71

File tree

6 files changed

+172
-19
lines changed

6 files changed

+172
-19
lines changed

proto/torchft.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ message QuorumMember {
4242
int64 step = 4;
4343
uint64 world_size = 5;
4444
bool shrink_only = 6;
45+
int64 commit_failures = 8;
4546
// User passing in data stored as JSON string.
4647
string data = 7;
4748
}
@@ -77,6 +78,7 @@ message ManagerQuorumRequest {
7778
string checkpoint_metadata = 3;
7879
bool shrink_only = 4;
7980
bool init_sync = 5;
81+
int64 commit_failures = 6;
8082
}
8183

8284
message ManagerQuorumResponse {
@@ -93,6 +95,7 @@ message ManagerQuorumResponse {
9395
int64 replica_rank = 9;
9496
int64 replica_world_size = 10;
9597
bool heal = 11;
98+
int64 commit_failures = 12;
9699
}
97100

98101
message CheckpointMetadataRequest {

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ impl ManagerClient {
177177
checkpoint_metadata: String,
178178
shrink_only: bool,
179179
init_sync: bool,
180+
commit_failures: i64,
180181
timeout: Duration,
181182
) -> Result<QuorumResult, StatusError> {
182183
py.allow_threads(move || {
@@ -186,6 +187,7 @@ impl ManagerClient {
186187
checkpoint_metadata: checkpoint_metadata,
187188
shrink_only: shrink_only,
188189
init_sync: init_sync,
190+
commit_failures: commit_failures,
189191
});
190192

191193
// This timeout is processed on the server side so we also enable
@@ -547,6 +549,7 @@ impl LighthouseClient {
547549
world_size: world_size,
548550
shrink_only: shrink_only,
549551
data: data_string,
552+
commit_failures: 0,
550553
}),
551554
});
552555

src/lighthouse.rs

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,12 @@ impl Lighthouse {
301301
"Detected quorum change, bumping quorum_id to {}",
302302
state.quorum_id
303303
);
304+
} else if participants.iter().map(|p| p.commit_failures).sum::<i64>() > 0 {
305+
state.quorum_id += 1;
306+
info!(
307+
"Detected commit failures, bumping quorum_id to {}",
308+
state.quorum_id
309+
);
304310
}
305311

306312
let quorum = Quorum {
@@ -639,6 +645,7 @@ mod tests {
639645
world_size: 1,
640646
shrink_only: false,
641647
data: String::new(),
648+
commit_failures: 0,
642649
},
643650
},
644651
);
@@ -656,6 +663,7 @@ mod tests {
656663
world_size: 1,
657664
shrink_only: false,
658665
data: String::new(),
666+
commit_failures: 0,
659667
},
660668
},
661669
);
@@ -712,6 +720,7 @@ mod tests {
712720
world_size: 1,
713721
shrink_only: false,
714722
data: String::new(),
723+
commit_failures: 0,
715724
},
716725
},
717726
);
@@ -751,6 +760,7 @@ mod tests {
751760
world_size: 1,
752761
shrink_only: false,
753762
data: String::new(),
763+
commit_failures: 0,
754764
},
755765
},
756766
);
@@ -798,6 +808,7 @@ mod tests {
798808
world_size: 1,
799809
shrink_only: false,
800810
data: String::new(),
811+
commit_failures: 0,
801812
},
802813
},
803814
);
@@ -819,6 +830,7 @@ mod tests {
819830
world_size: 1,
820831
shrink_only: false,
821832
data: String::new(),
833+
commit_failures: 0,
822834
}],
823835
created: Some(SystemTime::now().into()),
824836
});
@@ -838,6 +850,7 @@ mod tests {
838850
world_size: 1,
839851
shrink_only: false,
840852
data: String::new(),
853+
commit_failures: 0,
841854
},
842855
},
843856
);
@@ -882,6 +895,7 @@ mod tests {
882895
world_size: 1,
883896
shrink_only: false,
884897
data: String::new(),
898+
commit_failures: 0,
885899
},
886900
QuorumMember {
887901
replica_id: "b".to_string(),
@@ -891,6 +905,7 @@ mod tests {
891905
world_size: 1,
892906
shrink_only: false,
893907
data: String::new(),
908+
commit_failures: 0,
894909
},
895910
],
896911
created: Some(SystemTime::now().into()),
@@ -908,6 +923,7 @@ mod tests {
908923
world_size: 1,
909924
shrink_only: true,
910925
data: String::new(),
926+
commit_failures: 0,
911927
},
912928
},
913929
);
@@ -926,6 +942,7 @@ mod tests {
926942
world_size: 1,
927943
shrink_only: true,
928944
data: String::new(),
945+
commit_failures: 0,
929946
},
930947
},
931948
);
@@ -975,6 +992,7 @@ mod tests {
975992
world_size: 1,
976993
shrink_only: false,
977994
data: String::new(),
995+
commit_failures: 0,
978996
}),
979997
});
980998

@@ -1021,6 +1039,7 @@ mod tests {
10211039
world_size: 1,
10221040
shrink_only: false,
10231041
data: String::new(),
1042+
commit_failures: 0,
10241043
},
10251044
},
10261045
);
@@ -1047,6 +1066,7 @@ mod tests {
10471066
world_size: 1,
10481067
shrink_only: false,
10491068
data: String::new(),
1069+
commit_failures: 0,
10501070
}];
10511071
let b = vec![QuorumMember {
10521072
replica_id: "1".to_string(),
@@ -1056,6 +1076,7 @@ mod tests {
10561076
world_size: 1,
10571077
shrink_only: false,
10581078
data: String::new(),
1079+
commit_failures: 0,
10591080
}];
10601081

10611082
// replica_id is the same
@@ -1069,12 +1090,13 @@ mod tests {
10691090
world_size: 1,
10701091
shrink_only: false,
10711092
data: String::new(),
1093+
commit_failures: 0,
10721094
}];
10731095
// replica_id changed
10741096
assert!(quorum_changed(&a, &c));
10751097
}
1076-
#[tokio::test]
10771098

1099+
#[tokio::test]
10781100
async fn test_lighthouse_join_during_shrink() -> Result<()> {
10791101
fn create_member(id: &str, addr_num: &str, step: i64, shrink_only: bool) -> QuorumMember {
10801102
QuorumMember {
@@ -1085,6 +1107,7 @@ mod tests {
10851107
world_size: 1,
10861108
shrink_only,
10871109
data: String::new(),
1110+
commit_failures: 0,
10881111
}
10891112
}
10901113

@@ -1179,4 +1202,76 @@ mod tests {
11791202
lighthouse_task.abort();
11801203
Ok(())
11811204
}
1205+
1206+
#[tokio::test]
1207+
async fn test_lighthouse_commit_failures() -> Result<()> {
1208+
fn create_member(id: &str, commit_failures: i64) -> QuorumMember {
1209+
QuorumMember {
1210+
replica_id: id.to_string(),
1211+
address: format!("addr{}", id),
1212+
store_address: format!("store{}", id),
1213+
step: 10,
1214+
world_size: 1,
1215+
shrink_only: false,
1216+
data: String::new(),
1217+
commit_failures,
1218+
}
1219+
}
1220+
1221+
fn create_request(member: &QuorumMember) -> tonic::Request<LighthouseQuorumRequest> {
1222+
tonic::Request::new(LighthouseQuorumRequest {
1223+
requester: Some(member.clone()),
1224+
})
1225+
}
1226+
1227+
let opt = LighthouseOpt {
1228+
min_replicas: 2,
1229+
bind: "[::]:0".to_string(),
1230+
join_timeout_ms: 1000,
1231+
quorum_tick_ms: 10,
1232+
heartbeat_timeout_ms: 5000,
1233+
};
1234+
1235+
// Start the lighthouse service
1236+
let lighthouse = Lighthouse::new(opt).await?;
1237+
let lighthouse_task = tokio::spawn(lighthouse.clone().run());
1238+
1239+
// Create client to interact with lighthouse
1240+
let mut client = lighthouse_client_new(lighthouse.address()).await?;
1241+
1242+
// First two quorums should be stable
1243+
for _i in 0..2 {
1244+
let first_request = create_request(&create_member("replica0", 0));
1245+
let second_request = create_request(&create_member("replica1", 0));
1246+
1247+
tokio::spawn({
1248+
let mut client = client.clone();
1249+
async move { client.quorum(first_request).await }
1250+
});
1251+
let first_response = client.quorum(second_request).await?;
1252+
let first_quorum = first_response.into_inner().quorum.unwrap();
1253+
assert_eq!(first_quorum.quorum_id, 1);
1254+
assert_eq!(first_quorum.participants.len(), 2);
1255+
assert_eq!(first_quorum.participants[0].commit_failures, 0);
1256+
assert_eq!(first_quorum.participants[1].commit_failures, 0);
1257+
}
1258+
1259+
// commit_failures should increment quorum_id
1260+
let first_request = create_request(&create_member("replica0", 0));
1261+
let second_request = create_request(&create_member("replica1", 2));
1262+
1263+
tokio::spawn({
1264+
let mut client = client.clone();
1265+
async move { client.quorum(first_request).await }
1266+
});
1267+
let first_response = client.quorum(second_request).await?;
1268+
let first_quorum = first_response.into_inner().quorum.unwrap();
1269+
assert_eq!(first_quorum.quorum_id, 2);
1270+
assert_eq!(first_quorum.participants.len(), 2);
1271+
assert_eq!(first_quorum.participants[0].commit_failures, 0);
1272+
assert_eq!(first_quorum.participants[1].commit_failures, 2);
1273+
1274+
lighthouse_task.abort();
1275+
Ok(())
1276+
}
11821277
}

0 commit comments

Comments
 (0)