Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

grandpa: always create and send justification if there are any subscribers #6935

Merged
5 commits merged into from
Aug 24, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion client/finality-grandpa/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"

[dependencies]
sc-finality-grandpa = { version = "0.8.0-rc6", path = "../" }
sc-rpc = { version = "2.0.0-rc6", path = "../../rpc" }
sp-core = { version = "2.0.0-rc6", path = "../../../primitives/core" }
sp-runtime = { version = "2.0.0-rc6", path = "../../../primitives/runtime" }
sc-finality-grandpa = { version = "0.8.0-rc6", path = "../" }
finality-grandpa = { version = "0.12.3", features = ["derive-codec"] }
jsonrpc-core = "14.2.0"
jsonrpc-core-client = "14.2.0"
Expand Down
4 changes: 2 additions & 2 deletions client/finality-grandpa/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ mod tests {

// Notify with a header and justification
let justification = create_justification();
let _ = justification_sender.notify(justification.clone()).unwrap();
justification_sender.notify(|| Ok(justification.clone())).unwrap();

// Inspect what we received
let recv = receiver.take(1).wait().flatten().collect::<Vec<_>>();
Expand All @@ -418,7 +418,7 @@ mod tests {

let recv_sub_id: String =
serde_json::from_value(json_map["subscription"].take()).unwrap();
let recv_justification: Vec<u8> =
let recv_justification: sp_core::Bytes =
serde_json::from_value(json_map["result"].take()).unwrap();
let recv_justification: GrandpaJustification<Block> =
Decode::decode(&mut &recv_justification[..]).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions client/finality-grandpa/rpc/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use sc_finality_grandpa::GrandpaJustification;

/// An encoded justification proving that the given header has been finalized
#[derive(Clone, Serialize, Deserialize)]
pub struct JustificationNotification(Vec<u8>);
pub struct JustificationNotification(sp_core::Bytes);

impl<Block: BlockT> From<GrandpaJustification<Block>> for JustificationNotification {
fn from(notification: GrandpaJustification<Block>) -> Self {
JustificationNotification(notification.encode())
JustificationNotification(notification.encode().into())
}
}
50 changes: 34 additions & 16 deletions client/finality-grandpa/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1154,14 +1154,30 @@ pub(crate) fn finalize_block<BE, Block, Client>(
}
}

// send a justification notification if a sender exists and in case of
// error log it. this is a macro because `justification` below is a
// closure, therefore we can't define this as a closure as well.
macro_rules! notify_justification {
andresilva marked this conversation as resolved.
Show resolved Hide resolved
( $justification:expr ) => {
if let Some(sender) = justification_sender {
if let Err(err) = sender.notify($justification) {
warn!(target: "afg", "Error creating justification for subscriber: {:?}", err);
}
}
}
}

// NOTE: this code assumes that honest voters will never vote past a
// transition block, thus we don't have to worry about the case where
// we have a transition with `effective_block = N`, but we finalize
// `N+1`. this assumption is required to make sure we store
// justifications for transition blocks which will be requested by
// syncing clients.
let justification = match justification_or_commit {
JustificationOrCommit::Justification(justification) => Some(justification),
JustificationOrCommit::Justification(justification) => {
notify_justification!(|| Ok(justification.clone()));
Some(justification.encode())
},
JustificationOrCommit::Commit((round_number, commit)) => {
let mut justification_required =
// justification is always required when block that enacts new authorities
Expand All @@ -1181,29 +1197,31 @@ pub(crate) fn finalize_block<BE, Block, Client>(
}
}

// NOTE: the code below is a bit more complicated because we
// really want to avoid creating a justification if it isn't
// needed (e.g. if there's no subscribers), and also to avoid
// creating it twice. depending on the vote tree for the round,
// creating a justification might require multiple fetches of
// headers from the database.
let justification = || GrandpaJustification::from_commit(
&client,
round_number,
commit,
);

if justification_required {
let justification = GrandpaJustification::from_commit(
&client,
round_number,
commit,
)?;
let justification = justification()?;
notify_justification!(|| Ok(justification.clone()));

Some(justification)
Some(justification.encode())
} else {
notify_justification!(justification);

None
}
},
};

// Notify any registered listeners in case we have a justification
if let Some(sender) = justification_sender {
if let Some(ref justification) = justification {
let _ = sender.notify(justification.clone());
}
}

let justification = justification.map(|j| j.encode());

debug!(target: "afg", "Finalizing blocks up to ({:?}, {})", number, hash);

// ideally some handle to a synchronization oracle would be used
Expand Down
23 changes: 18 additions & 5 deletions client/finality-grandpa/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ use std::sync::Arc;
use parking_lot::Mutex;

use sp_runtime::traits::Block as BlockT;
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};

use crate::justification::GrandpaJustification;
use crate::Error;

// Stream of justifications returned when subscribing.
type JustificationStream<Block> = TracingUnboundedReceiver<GrandpaJustification<Block>>;
Expand Down Expand Up @@ -54,10 +55,22 @@ impl<Block: BlockT> GrandpaJustificationSender<Block> {

/// Send out a notification to all subscribers that a new justification
/// is available for a block.
pub fn notify(&self, notification: GrandpaJustification<Block>) -> Result<(), ()> {
self.subscribers.lock().retain(|n| {
!n.is_closed() && n.unbounded_send(notification.clone()).is_ok()
});
pub fn notify<F>(&self, justification: F) -> Result<(), Error>
where
F: FnOnce() -> Result<GrandpaJustification<Block>, Error>,
{
let mut subscribers = self.subscribers.lock();

// do an initial prune on closed subscriptions
subscribers.retain(|n| !n.is_closed());

// if there's no subscribers we avoid creating
// the justification which is a costly operation
if !subscribers.is_empty() {
let justification = justification()?;
subscribers.retain(|n| n.unbounded_send(justification.clone()).is_ok());
}

Ok(())
}
}
Expand Down