Skip to content

Commit

Permalink
convert std::sync::mpsc to crossbeam_channel (solana-labs#22264)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbiseda authored Jan 11, 2022
1 parent 3c44d40 commit 8b66625
Show file tree
Hide file tree
Showing 81 changed files with 313 additions and 346 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(clippy::integer_arithmetic)]
use {
clap::{crate_description, crate_name, value_t, App, Arg},
crossbeam_channel::unbounded,
crossbeam_channel::{unbounded, Receiver},
log::*,
rand::{thread_rng, Rng},
rayon::prelude::*,
Expand All @@ -28,7 +28,7 @@ use {
},
solana_streamer::socket::SocketAddrSpace,
std::{
sync::{atomic::Ordering, mpsc::Receiver, Arc, Mutex, RwLock},
sync::{atomic::Ordering, Arc, Mutex, RwLock},
thread::sleep,
time::{Duration, Instant},
},
Expand Down
1 change: 1 addition & 0 deletions banks-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ edition = "2021"

[dependencies]
bincode = "1.3.3"
crossbeam-channel = "0.5"
futures = "0.3"
solana-banks-interface = { path = "../banks-interface", version = "=1.10.0" }
solana-runtime = { path = "../runtime", version = "=1.10.0" }
Expand Down
10 changes: 4 additions & 6 deletions banks-server/src/banks_server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use {
bincode::{deserialize, serialize},
crossbeam_channel::{unbounded, Receiver, Sender},
futures::{future, prelude::stream::StreamExt},
solana_banks_interface::{
Banks, BanksRequest, BanksResponse, BanksTransactionResultWithSimulation,
Expand Down Expand Up @@ -30,10 +31,7 @@ use {
convert::TryFrom,
io,
net::{Ipv4Addr, SocketAddr},
sync::{
mpsc::{channel, Receiver, Sender},
Arc, RwLock,
},
sync::{Arc, RwLock},
thread::Builder,
time::Duration,
},
Expand Down Expand Up @@ -96,7 +94,7 @@ impl BanksServer {
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
poll_signature_status_sleep_duration: Duration,
) -> Self {
let (transaction_sender, transaction_receiver) = channel();
let (transaction_sender, transaction_receiver) = unbounded();
let bank = bank_forks.read().unwrap().working_bank();
let slot = bank.slot();
{
Expand Down Expand Up @@ -392,7 +390,7 @@ pub async fn start_tcp_server(
// serve is generated by the service attribute. It takes as input any type implementing
// the generated Banks trait.
.map(move |chan| {
let (sender, receiver) = channel();
let (sender, receiver) = unbounded();

SendTransactionService::new::<NullTpuInfo>(
tpu_addr,
Expand Down
1 change: 1 addition & 0 deletions bench-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ homepage = "https://solana.com/"
publish = false

[dependencies]
crossbeam-channel = "0.5"
clap = "2.33.1"
solana-streamer = { path = "../streamer", version = "=1.10.0" }
solana-net-utils = { path = "../net-utils", version = "=1.10.0" }
Expand Down
4 changes: 2 additions & 2 deletions bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![allow(clippy::integer_arithmetic)]
use {
clap::{crate_description, crate_name, App, Arg},
crossbeam_channel::unbounded,
solana_streamer::{
packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE},
streamer::{receiver, PacketBatchReceiver},
Expand All @@ -10,7 +11,6 @@ use {
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
mpsc::channel,
Arc,
},
thread::{sleep, spawn, JoinHandle, Result},
Expand Down Expand Up @@ -89,7 +89,7 @@ fn main() -> Result<()> {
addr = read.local_addr().unwrap();
port = addr.port();

let (s_reader, r_reader) = channel();
let (s_reader, r_reader) = unbounded();
read_channels.push(r_reader);
read_threads.push(receiver(
Arc::new(read),
Expand Down
1 change: 1 addition & 0 deletions bench-tps/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ publish = false

[dependencies]
clap = "2.33.1"
crossbeam-channel = "0.5"
log = "0.4.14"
rayon = "1.5.1"
serde_json = "1.0.74"
Expand Down
8 changes: 3 additions & 5 deletions bench-tps/tests/bench_tps.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(clippy::integer_arithmetic)]
use {
crossbeam_channel::unbounded,
serial_test::serial,
solana_bench_tps::{
bench::{do_bench_tps, generate_and_fund_keypairs},
Expand All @@ -15,10 +16,7 @@ use {
},
solana_sdk::signature::{Keypair, Signer},
solana_streamer::socket::SocketAddrSpace,
std::{
sync::{mpsc::channel, Arc},
time::Duration,
},
std::{sync::Arc, time::Duration},
};

fn test_bench_tps_local_cluster(config: Config) {
Expand Down Expand Up @@ -52,7 +50,7 @@ fn test_bench_tps_local_cluster(config: Config) {
VALIDATOR_PORT_RANGE,
));

let (addr_sender, addr_receiver) = channel();
let (addr_sender, addr_receiver) = unbounded();
run_local_faucet_with_port(faucet_keypair, addr_sender, None, 0);
let faucet_addr = addr_receiver
.recv_timeout(Duration::from_secs(2))
Expand Down
1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ criterion-stats = "0.3.0"
ctrlc = { version = "3.2.1", features = ["termination"] }
console = "0.15.0"
const_format = "0.2.22"
crossbeam-channel = "0.5"
log = "0.4.14"
humantime = "2.0.1"
num-traits = "0.2"
Expand Down
3 changes: 2 additions & 1 deletion cli/src/cluster_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
},
clap::{value_t, value_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand},
console::{style, Emoji},
crossbeam_channel::unbounded,
serde::{Deserialize, Serialize},
solana_clap_utils::{
input_parsers::*,
Expand Down Expand Up @@ -1368,7 +1369,7 @@ pub fn process_ping(
println_name_value("Source Account:", &config.signers[0].pubkey().to_string());
println!();

let (signal_sender, signal_receiver) = std::sync::mpsc::channel();
let (signal_sender, signal_receiver) = unbounded();
ctrlc::set_handler(move || {
let _ = signal_sender.send(());
})
Expand Down
1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ base64 = "0.13.0"
bincode = "1.3.3"
bs58 = "0.4.0"
clap = "2.33.0"
crossbeam-channel = "0.5"
indicatif = "0.16.2"
jsonrpc-core = "18.0.0"
log = "0.4.14"
Expand Down
18 changes: 9 additions & 9 deletions client/src/pubsub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use {
RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
},
},
crossbeam_channel::{unbounded, Receiver, Sender},
log::*,
serde::de::DeserializeOwned,
serde_json::{
Expand All @@ -24,7 +25,6 @@ use {
net::TcpStream,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{channel, Receiver, Sender},
Arc, RwLock,
},
thread::{sleep, JoinHandle},
Expand Down Expand Up @@ -242,7 +242,7 @@ impl PubsubClient {
) -> Result<AccountSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel();
let (sender, receiver) = unbounded();

let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
Expand Down Expand Up @@ -283,7 +283,7 @@ impl PubsubClient {
) -> Result<BlockSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel();
let (sender, receiver) = unbounded();

let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
Expand Down Expand Up @@ -322,7 +322,7 @@ impl PubsubClient {
) -> Result<LogsSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel();
let (sender, receiver) = unbounded();

let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
Expand Down Expand Up @@ -361,7 +361,7 @@ impl PubsubClient {
) -> Result<ProgramSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel();
let (sender, receiver) = unbounded();

let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
Expand Down Expand Up @@ -398,7 +398,7 @@ impl PubsubClient {
pub fn vote_subscribe(url: &str) -> Result<VoteSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel();
let (sender, receiver) = unbounded();

let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
Expand Down Expand Up @@ -431,7 +431,7 @@ impl PubsubClient {
pub fn root_subscribe(url: &str) -> Result<RootSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel();
let (sender, receiver) = unbounded();

let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
Expand Down Expand Up @@ -468,7 +468,7 @@ impl PubsubClient {
) -> Result<SignatureSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel();
let (sender, receiver) = unbounded();

let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
Expand Down Expand Up @@ -506,7 +506,7 @@ impl PubsubClient {
pub fn slot_subscribe(url: &str) -> Result<SlotsSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
let (sender, receiver) = channel::<SlotInfo>();
let (sender, receiver) = unbounded::<SlotInfo>();

let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
Expand Down
Loading

0 comments on commit 8b66625

Please sign in to comment.