Skip to content

Commit 7beb55d

Browse files
committed
more review fixes for @sunshowers
1 parent aaad0c4 commit 7beb55d

File tree

1 file changed

+79
-52
lines changed

1 file changed

+79
-52
lines changed

trust-quorum/src/task.rs

Lines changed: 79 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ use trust_quorum_protocol::{
2525
ReconfigurationError, ReconfigureMsg, ReconstructedRackSecret,
2626
};
2727

28+
/// Whether or not a configuration has committed or is still underway.
29+
pub enum CommitStatus {
30+
Committed,
31+
Pending,
32+
}
33+
2834
/// We only expect a handful of messages at a time.
2935
const API_CHANNEL_BOUND: usize = 32;
3036

@@ -96,45 +102,45 @@ pub enum NodeApiRequest {
96102
ClearSecrets,
97103

98104
/// Retrieve connectivity status via the `ConnMgr`
99-
ConnMgrStatus { responder: oneshot::Sender<ConnMgrStatus> },
105+
ConnMgrStatus { tx: oneshot::Sender<ConnMgrStatus> },
100106

101107
/// Return the status of this node if it is a coordinator
102-
CoordinatorStatus { responder: oneshot::Sender<Option<CoordinatorStatus>> },
108+
CoordinatorStatus { tx: oneshot::Sender<Option<CoordinatorStatus>> },
103109

104110
/// Load a rack secret for the given epoch
105111
LoadRackSecret {
106112
epoch: Epoch,
107-
responder: oneshot::Sender<
113+
tx: oneshot::Sender<
108114
Result<Option<ReconstructedRackSecret>, LoadRackSecretError>,
109115
>,
110116
},
111117

112118
/// Coordinate an upgrade from LRTQ at this node
113119
LrtqUpgrade {
114120
msg: LrtqUpgradeMsg,
115-
responder: oneshot::Sender<Result<(), LrtqUpgradeError>>,
121+
tx: oneshot::Sender<Result<(), LrtqUpgradeError>>,
116122
},
117123

118124
/// Get the overall status of the node
119-
NodeStatus { responder: oneshot::Sender<NodeStatus> },
125+
NodeStatus { tx: oneshot::Sender<NodeStatus> },
120126

121127
/// `PrepareAndCommit` a configuration at this node
122128
PrepareAndCommit {
123129
config: Configuration,
124-
responder: oneshot::Sender<Result<bool, PrepareAndCommitError>>,
130+
tx: oneshot::Sender<Result<CommitStatus, PrepareAndCommitError>>,
125131
},
126132

127133
/// `Commit` a configuration at this node
128134
Commit {
129135
rack_id: RackUuid,
130136
epoch: Epoch,
131-
responder: oneshot::Sender<Result<bool, CommitError>>,
137+
tx: oneshot::Sender<Result<CommitStatus, CommitError>>,
132138
},
133139

134140
/// Coordinate a reconfiguration at this node
135141
Reconfigure {
136142
msg: ReconfigureMsg,
137-
responder: oneshot::Sender<Result<(), ReconfigurationError>>,
143+
tx: oneshot::Sender<Result<(), ReconfigurationError>>,
138144
},
139145

140146
/// Shutdown the node's tokio tasks
@@ -197,9 +203,7 @@ impl NodeTaskHandle {
197203
msg: ReconfigureMsg,
198204
) -> Result<(), NodeApiError> {
199205
let (tx, rx) = oneshot::channel();
200-
self.tx
201-
.send(NodeApiRequest::Reconfigure { msg, responder: tx })
202-
.await?;
206+
self.tx.send(NodeApiRequest::Reconfigure { msg, tx: tx }).await?;
203207
rx.await??;
204208
Ok(())
205209
}
@@ -210,9 +214,7 @@ impl NodeTaskHandle {
210214
msg: LrtqUpgradeMsg,
211215
) -> Result<(), NodeApiError> {
212216
let (tx, rx) = oneshot::channel();
213-
self.tx
214-
.send(NodeApiRequest::LrtqUpgrade { msg, responder: tx })
215-
.await?;
217+
self.tx.send(NodeApiRequest::LrtqUpgrade { msg, tx: tx }).await?;
216218
rx.await??;
217219
Ok(())
218220
}
@@ -224,9 +226,7 @@ impl NodeTaskHandle {
224226
&self,
225227
) -> Result<Option<CoordinatorStatus>, NodeApiError> {
226228
let (tx, rx) = oneshot::channel();
227-
self.tx
228-
.send(NodeApiRequest::CoordinatorStatus { responder: tx })
229-
.await?;
229+
self.tx.send(NodeApiRequest::CoordinatorStatus { tx: tx }).await?;
230230
let res = rx.await?;
231231
Ok(res)
232232
}
@@ -240,9 +240,7 @@ impl NodeTaskHandle {
240240
epoch: Epoch,
241241
) -> Result<Option<ReconstructedRackSecret>, NodeApiError> {
242242
let (tx, rx) = oneshot::channel();
243-
self.tx
244-
.send(NodeApiRequest::LoadRackSecret { epoch, responder: tx })
245-
.await?;
243+
self.tx.send(NodeApiRequest::LoadRackSecret { epoch, tx: tx }).await?;
246244
let rs = rx.await??;
247245
Ok(rs)
248246
}
@@ -256,10 +254,10 @@ impl NodeTaskHandle {
256254
pub async fn prepare_and_commit(
257255
&self,
258256
config: Configuration,
259-
) -> Result<bool, NodeApiError> {
257+
) -> Result<CommitStatus, NodeApiError> {
260258
let (tx, rx) = oneshot::channel();
261259
self.tx
262-
.send(NodeApiRequest::PrepareAndCommit { config, responder: tx })
260+
.send(NodeApiRequest::PrepareAndCommit { config, tx: tx })
263261
.await?;
264262
let res = rx.await??;
265263
Ok(res)
@@ -275,11 +273,9 @@ impl NodeTaskHandle {
275273
&self,
276274
rack_id: RackUuid,
277275
epoch: Epoch,
278-
) -> Result<bool, NodeApiError> {
276+
) -> Result<CommitStatus, NodeApiError> {
279277
let (tx, rx) = oneshot::channel();
280-
self.tx
281-
.send(NodeApiRequest::Commit { rack_id, epoch, responder: tx })
282-
.await?;
278+
self.tx.send(NodeApiRequest::Commit { rack_id, epoch, tx: tx }).await?;
283279
let res = rx.await??;
284280
Ok(res)
285281
}
@@ -303,20 +299,23 @@ impl NodeTaskHandle {
303299
Ok(())
304300
}
305301

302+
/// Return information about connectivity to other peers
306303
pub async fn conn_mgr_status(&self) -> Result<ConnMgrStatus, NodeApiError> {
307304
let (tx, rx) = oneshot::channel();
308-
self.tx.send(NodeApiRequest::ConnMgrStatus { responder: tx }).await?;
305+
self.tx.send(NodeApiRequest::ConnMgrStatus { tx: tx }).await?;
309306
let res = rx.await?;
310307
Ok(res)
311308
}
312309

310+
/// Return internal information for the [`Node`]
313311
pub async fn status(&self) -> Result<NodeStatus, NodeApiError> {
314312
let (tx, rx) = oneshot::channel();
315-
self.tx.send(NodeApiRequest::NodeStatus { responder: tx }).await?;
313+
self.tx.send(NodeApiRequest::NodeStatus { tx: tx }).await?;
316314
let res = rx.await?;
317315
Ok(res)
318316
}
319317

318+
/// Shutdown this [`NodeTask`] and all its child tasks
320319
pub async fn shutdown(&self) -> Result<(), NodeApiError> {
321320
self.tx.send(NodeApiRequest::Shutdown).await?;
322321
Ok(())
@@ -458,58 +457,68 @@ impl NodeTask {
458457
NodeApiRequest::ClearSecrets => {
459458
self.node.clear_secrets();
460459
}
461-
NodeApiRequest::Commit { rack_id, epoch, responder } => {
460+
NodeApiRequest::Commit { rack_id, epoch, tx } => {
462461
let res = self
463462
.node
464463
.commit_configuration(&mut self.ctx, rack_id, epoch)
465464
.map(|_| {
466-
self.ctx.persistent_state().commits.contains(&epoch)
465+
if self.ctx.persistent_state().commits.contains(&epoch)
466+
{
467+
CommitStatus::Committed
468+
} else {
469+
CommitStatus::Pending
470+
}
467471
});
468-
let _ = responder.send(res);
472+
let _ = tx.send(res);
469473
}
470-
NodeApiRequest::ConnMgrStatus { responder } => {
474+
NodeApiRequest::ConnMgrStatus { tx } => {
471475
debug!(self.log, "Received Request for ConnMgrStatus");
472-
let _ = responder.send(self.conn_mgr.status());
476+
let _ = tx.send(self.conn_mgr.status());
473477
}
474-
NodeApiRequest::CoordinatorStatus { responder } => {
478+
NodeApiRequest::CoordinatorStatus { tx } => {
475479
let status = self.node.get_coordinator_state().map(|cs| {
476480
CoordinatorStatus {
477481
config: cs.config().clone(),
478482
acked_prepares: cs.op().acked_prepares(),
479483
}
480484
});
481-
let _ = responder.send(status);
485+
let _ = tx.send(status);
482486
}
483-
NodeApiRequest::LoadRackSecret { epoch, responder } => {
487+
NodeApiRequest::LoadRackSecret { epoch, tx } => {
484488
let res = self.node.load_rack_secret(&mut self.ctx, epoch);
485-
let _ = responder.send(res);
489+
let _ = tx.send(res);
486490
}
487-
NodeApiRequest::LrtqUpgrade { msg, responder } => {
491+
NodeApiRequest::LrtqUpgrade { msg, tx } => {
488492
let res =
489493
self.node.coordinate_upgrade_from_lrtq(&mut self.ctx, msg);
490-
let _ = responder.send(res);
494+
let _ = tx.send(res);
491495
}
492-
NodeApiRequest::NodeStatus { responder } => {
493-
let _ = responder.send(NodeStatus {
496+
NodeApiRequest::NodeStatus { tx } => {
497+
let _ = tx.send(NodeStatus {
494498
connected_peers: self.ctx.connected().clone(),
495499
alarms: self.ctx.alarms().clone(),
496500
persistent_state: self.ctx.persistent_state().into(),
497501
});
498502
}
499-
NodeApiRequest::PrepareAndCommit { config, responder } => {
503+
NodeApiRequest::PrepareAndCommit { config, tx } => {
500504
let epoch = config.epoch;
501505
let res = self
502506
.node
503507
.prepare_and_commit(&mut self.ctx, config)
504508
.map(|_| {
505-
self.ctx.persistent_state().commits.contains(&epoch)
509+
if self.ctx.persistent_state().commits.contains(&epoch)
510+
{
511+
CommitStatus::Committed
512+
} else {
513+
CommitStatus::Pending
514+
}
506515
});
507-
let _ = responder.send(res);
516+
let _ = tx.send(res);
508517
}
509-
NodeApiRequest::Reconfigure { msg, responder } => {
518+
NodeApiRequest::Reconfigure { msg, tx } => {
510519
let res =
511520
self.node.coordinate_reconfiguration(&mut self.ctx, msg);
512-
let _ = responder.send(res);
521+
let _ = tx.send(res);
513522
}
514523
NodeApiRequest::Shutdown => {
515524
info!(self.log, "Shutting down Node tokio tasks");
@@ -963,7 +972,10 @@ mod tests {
963972
async || {
964973
let mut acked = 0;
965974
for h in &setup.node_handles {
966-
if h.commit(rack_id, Epoch(1)).await.unwrap() {
975+
if matches!(
976+
h.commit(rack_id, Epoch(1)).await.unwrap(),
977+
CommitStatus::Committed
978+
) {
967979
acked += 1;
968980
}
969981
}
@@ -1068,7 +1080,10 @@ mod tests {
10681080
async || {
10691081
let mut acked = 0;
10701082
for h in &setup.node_handles[0..num_nodes - 1] {
1071-
if h.commit(rack_id, Epoch(1)).await.unwrap() {
1083+
if matches!(
1084+
h.commit(rack_id, Epoch(1)).await.unwrap(),
1085+
CommitStatus::Committed,
1086+
) {
10721087
acked += 1;
10731088
}
10741089
}
@@ -1104,7 +1119,10 @@ mod tests {
11041119
wait_for_condition(
11051120
async || {
11061121
let h = &setup.node_handles.last().unwrap();
1107-
if h.prepare_and_commit(config.clone()).await.unwrap() {
1122+
if matches!(
1123+
h.prepare_and_commit(config.clone()).await.unwrap(),
1124+
CommitStatus::Committed
1125+
) {
11081126
Ok(())
11091127
} else {
11101128
Err(CondCheckError::<()>::NotYet)
@@ -1199,7 +1217,10 @@ mod tests {
11991217
async || {
12001218
let mut acked = 0;
12011219
for h in &setup.node_handles {
1202-
if h.commit(rack_id, Epoch(1)).await.unwrap() {
1220+
if matches!(
1221+
h.commit(rack_id, Epoch(1)).await.unwrap(),
1222+
CommitStatus::Committed
1223+
) {
12031224
acked += 1;
12041225
}
12051226
}
@@ -1305,7 +1326,10 @@ mod tests {
13051326
async || {
13061327
let mut acked = 0;
13071328
for h in &setup.node_handles[0..num_nodes - 1] {
1308-
if h.commit(rack_id, Epoch(2)).await.unwrap() {
1329+
if matches!(
1330+
h.commit(rack_id, Epoch(2)).await.unwrap(),
1331+
CommitStatus::Committed
1332+
) {
13091333
acked += 1;
13101334
}
13111335
}
@@ -1420,7 +1444,10 @@ mod tests {
14201444
async || {
14211445
let mut acked = 0;
14221446
for h in &setup.node_handles {
1423-
if h.commit(rack_id, Epoch(2)).await.unwrap() {
1447+
if matches!(
1448+
h.commit(rack_id, Epoch(2)).await.unwrap(),
1449+
CommitStatus::Committed
1450+
) {
14241451
acked += 1;
14251452
}
14261453
}

0 commit comments

Comments
 (0)