Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
grandpa: always create and send justification if there are any subscr…
Browse files Browse the repository at this point in the history
…ibers (#6935)

* grandpa: use bytes type for justification rpc notification

* grandpa: always create justification if there are rpc subscribers

* grandpa: wording

* grandpa: replace notify_justification macro with function

* grandpa: prefer Option<&T> over &Option<T>
  • Loading branch information
andresilva authored and bkchr committed Sep 18, 2020
1 parent 508dfcd commit 43cd1d0
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 42 deletions.
3 changes: 2 additions & 1 deletion client/finality-grandpa/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"

[dependencies]
sc-finality-grandpa = { version = "0.8.0-rc6", path = "../" }
sc-rpc = { version = "2.0.0-rc6", path = "../../rpc" }
sp-core = { version = "2.0.0-rc6", path = "../../../primitives/core" }
sp-runtime = { version = "2.0.0-rc6", path = "../../../primitives/runtime" }
sc-finality-grandpa = { version = "0.8.0-rc6", path = "../" }
finality-grandpa = { version = "0.12.3", features = ["derive-codec"] }
jsonrpc-core = "14.2.0"
jsonrpc-core-client = "14.2.0"
Expand Down
4 changes: 2 additions & 2 deletions client/finality-grandpa/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ mod tests {

// Notify with a header and justification
let justification = create_justification();
let _ = justification_sender.notify(justification.clone()).unwrap();
justification_sender.notify(|| Ok(justification.clone())).unwrap();

// Inspect what we received
let recv = receiver.take(1).wait().flatten().collect::<Vec<_>>();
Expand All @@ -418,7 +418,7 @@ mod tests {

let recv_sub_id: String =
serde_json::from_value(json_map["subscription"].take()).unwrap();
let recv_justification: Vec<u8> =
let recv_justification: sp_core::Bytes =
serde_json::from_value(json_map["result"].take()).unwrap();
let recv_justification: GrandpaJustification<Block> =
Decode::decode(&mut &recv_justification[..]).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions client/finality-grandpa/rpc/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use sc_finality_grandpa::GrandpaJustification;

/// An encoded justification proving that the given header has been finalized
#[derive(Clone, Serialize, Deserialize)]
pub struct JustificationNotification(Vec<u8>);
pub struct JustificationNotification(sp_core::Bytes);

impl<Block: BlockT> From<GrandpaJustification<Block>> for JustificationNotification {
fn from(notification: GrandpaJustification<Block>) -> Self {
JustificationNotification(notification.encode())
JustificationNotification(notification.encode().into())
}
}
68 changes: 43 additions & 25 deletions client/finality-grandpa/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,8 @@ pub(crate) fn ancestry<Block: BlockT, Client>(
client: &Arc<Client>,
base: Block::Hash,
block: Block::Hash,
) -> Result<Vec<Block::Hash>, GrandpaError> where
) -> Result<Vec<Block::Hash>, GrandpaError>
where
Client: HeaderMetadata<Block, Error = sp_blockchain::Error>,
{
if base == block { return Err(GrandpaError::NotDescendent) }
Expand All @@ -671,15 +672,14 @@ pub(crate) fn ancestry<Block: BlockT, Client>(
Ok(tree_route.retracted().iter().skip(1).map(|e| e.hash).collect())
}

impl<B, Block: BlockT, C, N, SC, VR>
voter::Environment<Block::Hash, NumberFor<Block>>
for Environment<B, Block, C, N, SC, VR>
impl<B, Block: BlockT, C, N, SC, VR> voter::Environment<Block::Hash, NumberFor<Block>>
for Environment<B, Block, C, N, SC, VR>
where
Block: 'static,
B: Backend<Block>,
C: crate::ClientForGrandpa<Block, B> + 'static,
C::Api: GrandpaApi<Block, Error = sp_blockchain::Error>,
N: NetworkT<Block> + 'static + Send + Sync,
N: NetworkT<Block> + 'static + Send + Sync,
SC: SelectChain<Block> + 'static,
VR: VotingRule<Block, C>,
NumberFor<Block>: BlockNumberOps,
Expand Down Expand Up @@ -1023,7 +1023,7 @@ where
number,
(round, commit).into(),
false,
&self.justification_sender,
self.justification_sender.as_ref(),
)
}

Expand Down Expand Up @@ -1088,9 +1088,10 @@ pub(crate) fn finalize_block<BE, Block, Client>(
number: NumberFor<Block>,
justification_or_commit: JustificationOrCommit<Block>,
initial_sync: bool,
justification_sender: &Option<GrandpaJustificationSender<Block>>,
) -> Result<(), CommandOrError<Block::Hash, NumberFor<Block>>> where
Block: BlockT,
justification_sender: Option<&GrandpaJustificationSender<Block>>,
) -> Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>
where
Block: BlockT,
BE: Backend<Block>,
Client: crate::ClientForGrandpa<Block, BE>,
{
Expand Down Expand Up @@ -1154,14 +1155,29 @@ pub(crate) fn finalize_block<BE, Block, Client>(
}
}

// send a justification notification if a sender exists and in case of error log it.
fn notify_justification<Block: BlockT>(
justification_sender: Option<&GrandpaJustificationSender<Block>>,
justification: impl FnOnce() -> Result<GrandpaJustification<Block>, Error>,
) {
if let Some(sender) = justification_sender {
if let Err(err) = sender.notify(justification) {
warn!(target: "afg", "Error creating justification for subscriber: {:?}", err);
}
}
}

// NOTE: this code assumes that honest voters will never vote past a
// transition block, thus we don't have to worry about the case where
// we have a transition with `effective_block = N`, but we finalize
// `N+1`. this assumption is required to make sure we store
// justifications for transition blocks which will be requested by
// syncing clients.
let justification = match justification_or_commit {
JustificationOrCommit::Justification(justification) => Some(justification),
JustificationOrCommit::Justification(justification) => {
notify_justification(justification_sender, || Ok(justification.clone()));
Some(justification.encode())
},
JustificationOrCommit::Commit((round_number, commit)) => {
let mut justification_required =
// justification is always required when block that enacts new authorities
Expand All @@ -1181,29 +1197,31 @@ pub(crate) fn finalize_block<BE, Block, Client>(
}
}

// NOTE: the code below is a bit more verbose because we
// really want to avoid creating a justification if it isn't
// needed (e.g. if there's no subscribers), and also to avoid
// creating it twice. depending on the vote tree for the round,
// creating a justification might require multiple fetches of
// headers from the database.
let justification = || GrandpaJustification::from_commit(
&client,
round_number,
commit,
);

if justification_required {
let justification = GrandpaJustification::from_commit(
&client,
round_number,
commit,
)?;
let justification = justification()?;
notify_justification(justification_sender, || Ok(justification.clone()));

Some(justification)
Some(justification.encode())
} else {
notify_justification(justification_sender, justification);

None
}
},
};

// Notify any registered listeners in case we have a justification
if let Some(sender) = justification_sender {
if let Some(ref justification) = justification {
let _ = sender.notify(justification.clone());
}
}

let justification = justification.map(|j| j.encode());

debug!(target: "afg", "Finalizing blocks up to ({:?}, {})", number, hash);

// ideally some handle to a synchronization oracle would be used
Expand Down
3 changes: 1 addition & 2 deletions client/finality-grandpa/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,6 @@ where
Client: crate::ClientForGrandpa<Block, BE>,
NumberFor<Block>: finality_grandpa::BlockNumberOps,
{

/// Import a block justification and finalize the block.
///
/// If `enacts_change` is set to true, then finalizing this block *must*
Expand Down Expand Up @@ -653,7 +652,7 @@ where
number,
justification.into(),
initial_sync,
&Some(self.justification_sender.clone()),
Some(&self.justification_sender),
);

match result {
Expand Down
23 changes: 18 additions & 5 deletions client/finality-grandpa/src/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ use std::sync::Arc;
use parking_lot::Mutex;

use sp_runtime::traits::Block as BlockT;
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};

use crate::justification::GrandpaJustification;
use crate::Error;

// Stream of justifications returned when subscribing.
type JustificationStream<Block> = TracingUnboundedReceiver<GrandpaJustification<Block>>;
Expand Down Expand Up @@ -54,10 +55,22 @@ impl<Block: BlockT> GrandpaJustificationSender<Block> {

/// Send out a notification to all subscribers that a new justification
/// is available for a block.
pub fn notify(&self, notification: GrandpaJustification<Block>) -> Result<(), ()> {
self.subscribers.lock().retain(|n| {
!n.is_closed() && n.unbounded_send(notification.clone()).is_ok()
});
pub fn notify(
&self,
justification: impl FnOnce() -> Result<GrandpaJustification<Block>, Error>,
) -> Result<(), Error> {
let mut subscribers = self.subscribers.lock();

// do an initial prune on closed subscriptions
subscribers.retain(|n| !n.is_closed());

// if there's no subscribers we avoid creating
// the justification which is a costly operation
if !subscribers.is_empty() {
let justification = justification()?;
subscribers.retain(|n| n.unbounded_send(justification.clone()).is_ok());
}

Ok(())
}
}
Expand Down
9 changes: 4 additions & 5 deletions client/finality-grandpa/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,10 @@ fn grandpa_observer<BE, Block: BlockT, Client, S, F>(
last_finalized_number: NumberFor<Block>,
commits: S,
note_round: F,
) -> impl Future<Output=Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>> where
) -> impl Future<Output = Result<(), CommandOrError<Block::Hash, NumberFor<Block>>>>
where
NumberFor<Block>: BlockNumberOps,
S: Stream<
Item = Result<CommunicationIn<Block>, CommandOrError<Block::Hash, NumberFor<Block>>>,
>,
S: Stream<Item = Result<CommunicationIn<Block>, CommandOrError<Block::Hash, NumberFor<Block>>>>,
F: Fn(u64),
BE: Backend<Block>,
Client: crate::ClientForGrandpa<Block, BE>,
Expand Down Expand Up @@ -130,7 +129,7 @@ fn grandpa_observer<BE, Block: BlockT, Client, S, F>(
finalized_number,
(round, commit).into(),
false,
&justification_sender,
justification_sender.as_ref(),
) {
Ok(_) => {},
Err(e) => return future::err(e),
Expand Down

0 comments on commit 43cd1d0

Please sign in to comment.