Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrate geyser slot subscription #283

Merged
merged 2 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 36 additions & 57 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::grpc_subscription::{create_block_processing_task, map_block_update};
use crate::grpc_subscription::{
create_block_processing_task, create_slot_stream_task, map_block_update,
};
use anyhow::Context;
use futures::StreamExt;
use geyser_grpc_connector::grpc_subscription_autoreconnect::{
Expand All @@ -17,9 +19,7 @@ use std::collections::{BTreeSet, HashMap};
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{
SubscribeRequest, SubscribeRequestFilterSlots, SubscribeUpdate,
};
use yellowstone_grpc_proto::geyser::SubscribeUpdate;

struct BlockExtractor(CommitmentConfig);

Expand Down Expand Up @@ -71,7 +71,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
loop {
let (confirmed_block_sender, mut confirmed_block_reciever) =
tokio::sync::mpsc::unbounded_channel::<ProducedBlock>();
let _confirmed_blocks_tasks = {
let confirmed_blocks_tasks = {
let commitment_config = CommitmentConfig::confirmed();

let mut tasks = Vec::new();
Expand Down Expand Up @@ -189,6 +189,8 @@ pub fn create_grpc_multiplex_blocks_subscription(
}
}
}
// abort all the tasks
confirmed_blocks_tasks.iter().for_each(|task| task.abort());
}
})
};
Expand Down Expand Up @@ -225,65 +227,42 @@ pub fn create_grpc_multiplex_slots_subscription(
info!("- connection to {}", grpc_source);
}

let (multiplexed_messages_sender, multiplexed_messages) = tokio::sync::broadcast::channel(1000);
let (multiplexed_messages_sender, multiplexed_messages_rx) =
tokio::sync::broadcast::channel(1000);

let jh = tokio::spawn(async move {
loop {
let multiplex_stream = {
let mut streams = Vec::new();
for grpc_source in &grpc_sources {
let mut slots = HashMap::new();
slots.insert(
"client".to_string(),
SubscribeRequestFilterSlots {
filter_by_commitment: Some(true),
},
);

let filter = SubscribeRequest {
slots,
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
blocks: HashMap::new(),
blocks_meta: HashMap::new(),
commitment: Some(
yellowstone_grpc_proto::geyser::CommitmentLevel::Processed as i32,
),
accounts_data_slice: Default::default(),
ping: None,
};

let stream = create_geyser_reconnecting_stream(grpc_source.clone(), filter);
streams.push(stream);
}

create_multiplexed_stream(streams, SlotExtractor {})
};
let mut streams_tasks = Vec::new();
let mut recievers = Vec::new();
for grpc_source in &grpc_sources {
let (sx, rx) = async_channel::bounded(1);
let task = create_slot_stream_task(
grpc_source.grpc_addr.clone(),
grpc_source.grpc_x_token.clone(),
sx,
yellowstone_grpc_proto::geyser::CommitmentLevel::Processed,
);
streams_tasks.push(task);
recievers.push(rx);
}

let mut multiplex_stream = std::pin::pin!(multiplex_stream);
loop {
tokio::select! {
slot_data = multiplex_stream.next() => {
if let Some(slot_data) = slot_data {
match multiplexed_messages_sender.send(slot_data) {
Ok(receivers) => {
trace!("sent data to {} receivers", receivers);
}
Err(send_error) => log::error!("Get error while sending on slot channel {}", send_error),
};
} else {
debug!("Slot stream send None type");
}
},
_ = tokio::time::sleep(Duration::from_secs(30)) => {
log::error!("Slots timedout restarting subscription");
break;
}
while let Ok(slot_update) = tokio::time::timeout(
Duration::from_secs(30),
futures::stream::select_all(recievers.clone()).next(),
)
.await
{
if let Some(slot_update) = slot_update {
multiplexed_messages_sender.send(SlotNotification {
processed_slot: slot_update.slot,
estimated_processed_slot: slot_update.slot,
})?;
}
}

streams_tasks.iter().for_each(|task| task.abort());
}
});

(multiplexed_messages, jh)
(multiplexed_messages_rx, jh)
}
62 changes: 62 additions & 0 deletions cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use solana_sdk::{
use solana_transaction_status::{Reward, RewardType};
use std::{collections::HashMap, sync::Arc};
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::{SubscribeRequestFilterSlots, SubscribeUpdateSlot};

use yellowstone_grpc_proto::prelude::{
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks,
Expand Down Expand Up @@ -304,6 +305,67 @@ pub fn create_block_processing_task(
})
}

pub fn create_slot_stream_task(
grpc_addr: String,
grpc_x_token: Option<String>,
slot_sx: async_channel::Sender<SubscribeUpdateSlot>,
commitment_level: CommitmentLevel,
) -> AnyhowJoinHandle {
tokio::spawn(async move {
loop {
let mut slots = HashMap::new();
slots.insert(
"client_slot".to_string(),
SubscribeRequestFilterSlots {
filter_by_commitment: Some(true),
},
);

// connect to grpc
let mut client =
GeyserGrpcClient::connect(grpc_addr.clone(), grpc_x_token.clone(), None)?;
let mut stream = client
.subscribe_once(
slots,
Default::default(),
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
Some(commitment_level),
Default::default(),
None,
)
.await?;

while let Some(message) = stream.next().await {
let message = message?;

let Some(update) = message.update_oneof else {
continue;
};

match update {
UpdateOneof::Slot(slot) => {
slot_sx
.send(slot)
.await
.context("Problem sending on block channel")?;
}
UpdateOneof::Ping(_) => {
log::trace!("GRPC Ping");
}
_ => {
log::trace!("unknown GRPC notification");
}
};
}
log::error!("Grpc block subscription broken (resubscribing)");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
})
}

pub fn create_grpc_subscription(
rpc_client: Arc<RpcClient>,
grpc_sources: Vec<GrpcSourceConfig>,
Expand Down
2 changes: 1 addition & 1 deletion lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
quic_connection_params: QuicConnectionParameters {
connection_timeout: Duration::from_secs(1),
connection_retry_count: 10,
finalize_timeout: Duration::from_millis(200),
finalize_timeout: Duration::from_millis(1000),
max_number_of_connections: 8,
unistream_timeout: Duration::from_millis(500),
write_timeout: Duration::from_secs(1),
Expand Down
Loading