Skip to content

Commit

Permalink
[narwhal] subscriber: use blocking send to notifier instead of waitin…
Browse files Browse the repository at this point in the history
…g for permit (MystenLabs#5138)

The join/try_fut_and_permit pattern seem to be broken, see MystenLabs#5137

This is especially critical for executor as it might skip transactions leading to forks.

Since we don't immediately have a fix for try_fut_and_permit, this PR will simply replace with

This is safe for executor as it is not part of any bounded channel loops(simply removing this pattern is not necessarily safe for other places).

This will make performance just very slightly suboptimal(we don't schedule new downloads when tx_notifier is clogged).
  • Loading branch information
andll authored Oct 11, 2022
1 parent 4dcea7f commit 8f493df
Showing 1 changed file with 3 additions and 6 deletions.
9 changes: 3 additions & 6 deletions narwhal/executor/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use config::{Committee, SharedWorkerCache, WorkerId};
use consensus::ConsensusOutput;
use crypto::{NetworkPublicKey, PublicKey};

use futures::future::join;
use futures::stream::FuturesOrdered;
use futures::FutureExt;
use futures::StreamExt;
Expand Down Expand Up @@ -138,11 +137,9 @@ impl<Network: SubscriberNetwork> Subscriber<Network> {
},

// Receive here consensus messages for which we have downloaded all transactions data.
(message, permit) = join(waiting.next(), self.tx_notifier.reserve()), if !waiting.is_empty() => {
if let Ok(permit) = permit {
permit.send(message.expect("We don't poll empty queue"));
} else {
error!("tx_notifier closed");
message = waiting.next(), if !waiting.is_empty() => {
if let Err(e) = self.tx_notifier.send(message.expect("We don't poll empty queue")).await {
error!("tx_notifier closed: {}", e);
return Ok(());
}
},
Expand Down

0 comments on commit 8f493df

Please sign in to comment.