Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Authority active net to use ArcSwap #2391

Merged
merged 1 commit into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

*/

use arc_swap::ArcSwap;
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
Expand Down Expand Up @@ -96,12 +97,11 @@ impl AuthorityHealth {
}
}

#[derive(Clone)]
pub struct ActiveAuthority<A> {
// The local authority state
pub state: Arc<AuthorityState>,
// The network interfaces to other authorities
pub net: Arc<AuthorityAggregator<A>>,
pub net: ArcSwap<AuthorityAggregator<A>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we should do that: it encourages each access to do a load() potentially, leading to different parts of the code using different nets / state / health. Instead: we can have a number of arcwaps to the inner Arcs, and upon restart we make an ActiveAuthority with a consistent set of Arcs, and use that for the lifetime of the service?

Copy link
Contributor Author

@lxfind lxfind Jun 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we could do is instead of passing around AuthorityActive in all the active functions, we pass around net. Then we don't have to restart.
Another general question I have is this: why are the active processes unsafe with potential committee changes? Don't they need to deal with potential byzantine validator anyway?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passive processes should be able to handle a swap but active once are going to need some sort of notification in order to do internal book keeping.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So really I think we're going to need to have a reconfig notification go out on a broadcast channel as well to these arc swaps

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not to mention that we cannot rely on ArcSwap in the future if we end up having multiple processes

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we simply have to store the LifecycleSignalSender struct, and signal a Restart or Exit, and the services will restart (and this is when they should re-load the config etc) or exit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another general question I have is this: why are the active processes unsafe with potential committee changes? Don't they need to deal with potential byzantine validator anyway?

yeah, but they make the assumption that they know the correct committee and that 2/3 of it is correct. So if the committee changes half way through some task, and half the state is according to the old one, and the other half the new one, unknown things may happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a reasonable assumption by checkpoint process, but I assume the gossip process doesn't have that assumption?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder for myself: we probably should also add some safety guards around all validator requests on the epoch number: that is, regardless how we do this, we should be able to handle the case where you are in epoch X, talking to a committee member who thinks it's epoch X + 1.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the gossip process doesn't have that assumption?

It kind of does: for example, it computes how long to wait for responses before trying to reconnect on the basis of the size and stake distribution of the committee to not spam everyone with connection requests before the network is up.

// Network health
pub health: Arc<Mutex<HashMap<AuthorityName, AuthorityHealth>>>,
}
Expand All @@ -122,7 +122,10 @@ impl<A> ActiveAuthority<A> {
.collect(),
)),
state: authority,
net: Arc::new(AuthorityAggregator::new(committee, authority_clients)),
net: ArcSwap::from(Arc::new(AuthorityAggregator::new(
committee,
authority_clients,
))),
})
}

Expand All @@ -133,10 +136,10 @@ impl<A> ActiveAuthority<A> {
/// even if we have a few connections.
pub async fn minimum_wait_for_majority_honest_available(&self) -> Instant {
let lock = self.health.lock().await;
let (_, instant) = self.net.committee.robust_value(
let (_, instant) = self.net.load().committee.robust_value(
lock.iter().map(|(name, h)| (*name, h.no_contact_before)),
// At least one honest node is at or above it.
self.net.committee.quorum_threshold(),
self.net.load().committee.quorum_threshold(),
);
instant
}
Expand Down Expand Up @@ -182,16 +185,17 @@ where

/// Spawn all active tasks.
pub async fn spawn_active_processes(self, gossip: bool, checkpoint: bool) {
let active = Arc::new(self);
// Spawn a task to take care of gossip
let gossip_locals = self.clone();
let gossip_locals = active.clone();
let _gossip_join = tokio::task::spawn(async move {
if gossip {
gossip_process(&gossip_locals, 4).await;
}
});

// Spawn task to take care of checkpointing
let checkpoint_locals = self; // .clone();
let checkpoint_locals = active; // .clone();
let _checkpoint_join = tokio::task::spawn(async move {
if checkpoint {
checkpoint_process(&checkpoint_locals, &CheckpointProcessControl::default()).await;
Expand Down
36 changes: 21 additions & 15 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Deref;
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
Expand Down Expand Up @@ -93,10 +94,16 @@ pub async fn checkpoint_process<A>(
tokio::time::sleep(timing.long_pause_between_checkpoints).await;

loop {
let committee = &active_authority.net.committee;
let net = active_authority.net.load().deref().clone();
let committee = &net.committee;
if committee != active_authority.state.committee.load().deref().deref() {
warn!("Inconsistent committee between authority state and authority active");
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
// (1) Get the latest summaries and proposals
let state_of_world = get_latest_proposal_and_checkpoint_from_all(
active_authority.net.clone(),
net.clone(),
timing.extra_time_after_quorum,
timing.timeout_until_quorum,
)
Expand All @@ -123,7 +130,7 @@ pub async fn checkpoint_process<A>(
// TODO log error
if let Err(err) = sync_to_checkpoint(
active_authority.state.name,
active_authority.net.clone(),
net.clone(),
state_checkpoints.clone(),
checkpoint.clone(),
)
Expand All @@ -144,7 +151,7 @@ pub async fn checkpoint_process<A>(
state_checkpoints.lock().handle_checkpoint_certificate(
&checkpoint,
&None,
&active_authority.state.committee.load(),
committee,
)
}; // unlock

Expand All @@ -156,18 +163,15 @@ pub async fn checkpoint_process<A>(
// the full contents of the checkpoint. So we try to download it.
// TODO: clean up the errors to get here only when the error is
// "No checkpoint set at this sequence."
if let Ok(contents) = get_checkpoint_contents(
active_authority.state.name,
active_authority.net.clone(),
&checkpoint,
)
.await
if let Ok(contents) =
get_checkpoint_contents(active_authority.state.name, net.clone(), &checkpoint)
.await
{
// Retry with contents
let _ = state_checkpoints.lock().handle_checkpoint_certificate(
&checkpoint,
&Some(contents),
&active_authority.state.committee.load(),
committee,
);
}
}
Expand Down Expand Up @@ -603,10 +607,10 @@ pub async fn diff_proposals<A>(
break;
}

let random_authority = active_authority.net.committee.sample();
if available_authorities.remove(random_authority) {
let random_authority = *active_authority.net.load().committee.sample();
if available_authorities.remove(&random_authority) {
// Get a client
let client = active_authority.net.authority_clients[random_authority].clone();
let client = active_authority.net.load().authority_clients[&random_authority].clone();

if let Ok(response) = client
.handle_checkpoint(CheckpointRequest::latest(true))
Expand Down Expand Up @@ -704,6 +708,7 @@ where
// download them from the remote node.
let client = active_authority
.net
.load()
.clone_client(&fragment.other.0.authority);
for tx_digest in &fragment.diff.first.items {
let response = client
Expand Down Expand Up @@ -760,6 +765,7 @@ where
{
active_authority
.net
.load()
.sync_certificate_to_authority_with_timeout(
ConfirmationTransaction::new(cert.clone()),
active_authority.state.name,
Expand All @@ -784,7 +790,7 @@ where
debug!("Try sync for digest: {digest:?}");
if let Err(err) = sync_digest(
active_authority.state.name,
active_authority.net.clone(),
active_authority.net.load().clone(),
digest.transaction,
per_other_authority_delay,
)
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub async fn gossip_process_with_start_seq<A>(
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
// A copy of the committee
let committee = &active_authority.net.committee;
let committee = &active_authority.net.load().committee;

// Number of tasks at most "degree" and no more than committee - 1
let target_num_tasks: usize = usize::min(committee.voting_rights.len() - 1, degree);
Expand Down Expand Up @@ -200,10 +200,10 @@ where
) -> PeerGossip<A> {
PeerGossip {
peer_name,
client: active_authority.net.authority_clients[&peer_name].clone(),
client: active_authority.net.load().authority_clients[&peer_name].clone(),
state: active_authority.state.clone(),
max_seq: start_seq,
aggregator: active_authority.net.clone(),
aggregator: active_authority.net.load().clone(),
}
}

Expand Down