Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Production into main #303

Merged
merged 24 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5d46efc
Merge pull request #258 from blockworks-foundation/main
grooviegermanikus Dec 22, 2023
f2a0522
fix: panic on geyser close, multiplex bug
grooviegermanikus Jan 7, 2024
efde643
update Cargo.lock
grooviegermanikus Jan 7, 2024
5757591
Merge pull request #264 from blockworks-foundation/hotfix/panic-geyse…
grooviegermanikus Jan 7, 2024
e436625
Merging main into prod 12/01/2024
godmodegalactus Jan 12, 2024
ee66d06
reverting cargo.lock
godmodegalactus Jan 12, 2024
10be5de
Fix issues with grpc and postgres
godmodegalactus Jan 12, 2024
9369c0f
Merge remote-tracking branch 'ssh/main' into production
godmodegalactus Jan 15, 2024
ab9511c
Solving merge issues
godmodegalactus Jan 15, 2024
c193f2f
Merge remote-tracking branch 'origin/main' into production
godmodegalactus Jan 16, 2024
18e48bb
Fixing cargo fmt
godmodegalactus Jan 16, 2024
f076d11
Merge remote-tracking branch 'origin/main' into production
godmodegalactus Jan 16, 2024
60c4fdc
Increase finish quic timeout (#280) (#281)
godmodegalactus Jan 17, 2024
f77a89e
integrate geyser slot subscription (#283)
godmodegalactus Jan 17, 2024
c923f3a
Making slot channel unbounded (bug)
godmodegalactus Jan 17, 2024
9d9f8f8
remove block_debug_listen
grooviegermanikus Jan 17, 2024
f5d78ca
Update cargolock file
godmodegalactus Jan 18, 2024
f102eac
Merge branch 'main' into production
godmodegalactus Jan 18, 2024
c828a35
Fixing clippy removing grpc_inspect
godmodegalactus Jan 18, 2024
392ffe3
merging main with production (#290)
godmodegalactus Jan 18, 2024
6cbccd0
Merging MTU changes and setting up transportation config (#293)
godmodegalactus Jan 19, 2024
3139970
Making block subscription processed and moving confirmed block subscr…
godmodegalactus Jan 21, 2024
0fa22d6
Minor bug, subscribing to processed blocks instead of confirmed (#295)
godmodegalactus Jan 22, 2024
cb3647c
Merge branch 'main' into production
godmodegalactus Jan 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Merge branch 'main' into production
  • Loading branch information
godmodegalactus committed Jan 26, 2024
commit cb3647cec55687d4cfb6bbff18b4de2fcfc8e9ff
48 changes: 26 additions & 22 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ impl FromYellowstoneExtractor for BlockMetaHashExtractor {
}
}

fn create_grpc_multiplex_block_stream(
fn create_grpc_multiplex_processed_block_stream(
grpc_sources: &Vec<GrpcSourceConfig>,
confirmed_block_sender: UnboundedSender<ProducedBlock>,
processed_block_sender: UnboundedSender<ProducedBlock>,
) -> Vec<AnyhowJoinHandle> {
let commitment_config = CommitmentConfig::processed();

Expand Down Expand Up @@ -82,8 +82,8 @@ fn create_grpc_multiplex_block_stream(
&& (slots_processed.len() < MAX_SIZE / 2
|| slot > slots_processed.first().cloned().unwrap_or_default())
{
confirmed_block_sender
.send(map_block_update(block, commitment_config))
processed_block_sender
.send(from_grpc_block_update(block, commitment_config))
.context("Issue to send confirmed block")?;
slots_processed.insert(slot);
if slots_processed.len() > MAX_SIZE {
Expand Down Expand Up @@ -112,7 +112,8 @@ fn create_grpc_multiplex_block_meta_stream(
create_multiplexed_stream(streams, BlockMetaHashExtractor(commitment_config))
}

/// connect to multiple grpc sources to consume confirmed blocks and block status update
/// connect to multiple grpc sources to consume processed (full) blocks and block meta for commitment level confirmed and finalized
/// will emit blocks for commitment level processed, confirmed and finalized OR only processed block never gets confirmed
pub fn create_grpc_multiplex_blocks_subscription(
grpc_sources: Vec<GrpcSourceConfig>,
) -> (Receiver<ProducedBlock>, AnyhowJoinHandle) {
Expand All @@ -134,8 +135,10 @@ pub fn create_grpc_multiplex_blocks_subscription(
let (processed_block_sender, mut processed_block_reciever) =
tokio::sync::mpsc::unbounded_channel::<ProducedBlock>();

let confirmed_blocks_tasks =
create_grpc_multiplex_block_stream(&grpc_sources, processed_block_sender);
let processed_blocks_tasks = create_grpc_multiplex_processed_block_stream(
&grpc_sources,
processed_block_sender,
);

let confirmed_blockmeta_stream = create_grpc_multiplex_block_meta_stream(
&grpc_sources,
Expand All @@ -152,14 +155,16 @@ pub fn create_grpc_multiplex_blocks_subscription(
let mut finalized_blockmeta_stream = std::pin::pin!(finalized_blockmeta_stream);

let mut last_finalized_slot: Slot = 0;
let mut cleanup_without_recv_blocks: u8 = 0;
let mut cleanup_without_confirmed_recv_blocks_meta: u8 = 0;
let mut cleanup_without_finalized_recv_blocks_meta: u8 = 0;
let mut cleanup_tick = tokio::time::interval(Duration::from_secs(5));
const MAX_ALLOWED_CLEANUP_WITHOUT_RECV: u32 = 12; // 12*5 = 60s without recving data
const CLEANUP_SLOTS_BEHIND_FINALIZED: u64 = 100;
let mut cleanup_without_recv_blocks: u32 = 0;
let mut cleanup_without_confirmed_recv_blocks_meta: u32 = 0;
let mut cleanup_without_finalized_recv_blocks_meta: u32 = 0;
let mut confirmed_block_not_yet_processed = HashSet::<String>::new();

// start logging errors when we recieve first finalized block
let mut finalized_block_recieved = false;
const MAX_ALLOWED_CLEANUP_WITHOUT_RECV: u8 = 12; // 12*5 = 60s without recving data
let mut startup_completed = false;
loop {
tokio::select! {
processed_block = processed_block_reciever.recv() => {
Expand All @@ -170,12 +175,10 @@ pub fn create_grpc_multiplex_blocks_subscription(
processed_block.slot, processed_block.blockhash.clone());
if let Err(e) = producedblock_sender.send(processed_block.clone()) {
warn!("produced block channel has no receivers {e:?}");
continue
}
if confirmed_block_not_yet_processed.remove(&processed_block.blockhash) {
if let Err(e) = producedblock_sender.send(processed_block.to_confirmed_block()) {
warn!("produced block channel has no receivers {e:?}");
continue
}
}
recent_processed_blocks.insert(processed_block.blockhash.clone(), processed_block);
Expand All @@ -188,8 +191,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
debug!("got confirmed blockmeta {} with blockhash {}",
confirmed_block.slot, confirmed_block.blockhash.clone());
if let Err(e) = producedblock_sender.send(confirmed_block) {
warn!("Finalized block channel has no receivers {e:?}");
continue;
warn!("Confirmed block channel has no receivers {e:?}");
}
} else {
confirmed_block_not_yet_processed.insert(blockhash.clone());
Expand All @@ -202,21 +204,23 @@ pub fn create_grpc_multiplex_blocks_subscription(
if let Some(cached_processed_block) = recent_processed_blocks.remove(&blockhash) {
let finalized_block = cached_processed_block.to_finalized_block();
last_finalized_slot = finalized_block.slot;
finalized_block_recieved = true;
startup_completed = true;
debug!("got finalized blockmeta {} with blockhash {}",
finalized_block.slot, finalized_block.blockhash.clone());
if let Err(e) = producedblock_sender.send(finalized_block) {
warn!("Finalized block channel has no receivers {e:?}");
}
} else if finalized_block_recieved {
} else if startup_completed {
// this warning is ok for first few blocks when we start lrpc
log::error!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash);
}
},
_ = cleanup_tick.tick() => {
if cleanup_without_finalized_recv_blocks_meta > MAX_ALLOWED_CLEANUP_WITHOUT_RECV ||
cleanup_without_recv_blocks > MAX_ALLOWED_CLEANUP_WITHOUT_RECV ||
cleanup_without_confirmed_recv_blocks_meta > MAX_ALLOWED_CLEANUP_WITHOUT_RECV {
// timebased restart
if
cleanup_without_finalized_recv_blocks_meta > MAX_ALLOWED_CLEANUP_WITHOUT_RECV
&& cleanup_without_recv_blocks > MAX_ALLOWED_CLEANUP_WITHOUT_RECV
&& cleanup_without_confirmed_recv_blocks_meta > MAX_ALLOWED_CLEANUP_WITHOUT_RECV {
log::error!("block or block meta stream stopped restaring blocks");
break;
}
Expand All @@ -225,7 +229,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
cleanup_without_confirmed_recv_blocks_meta += 1;
let size_before = recent_processed_blocks.len();
recent_processed_blocks.retain(|_blockhash, block| {
last_finalized_slot == 0 || block.slot > last_finalized_slot - 100
last_finalized_slot == 0 || block.slot > last_finalized_slot - CLEANUP_SLOTS_BEHIND_FINALIZED
});
let cnt_cleaned = size_before - recent_processed_blocks.len();
if cnt_cleaned > 0 {
Expand Down
9 changes: 0 additions & 9 deletions core/src/structures/produced_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,6 @@ impl ProducedBlock {
}
}

/// moving commitment level to finalized
pub fn to_confirmed_block(&self) -> Self {
ProducedBlock {
commitment_config: CommitmentConfig::confirmed(),
..self.clone()
}
}
}

/// moving commitment level to confirmed
pub fn to_confirmed_block(&self) -> Self {
ProducedBlock {
Expand Down
8 changes: 2 additions & 6 deletions lite-rpc/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@ use std::collections::HashMap;
use std::{str::FromStr, sync::Arc};

use anyhow::Context;
use jsonrpsee::{core::SubscriptionResult, server::ServerBuilder, PendingSubscriptionSink};
use prometheus::{opts, register_int_counter, IntCounter};
use solana_lite_rpc_core::{
encoding,
stores::{block_information_store::BlockInformation, data_cache::DataCache, tx_store::TxProps},
AnyhowJoinHandle,
use jsonrpsee::{
core::SubscriptionResult, server::ServerBuilder, DisconnectError, PendingSubscriptionSink,
};
use log::{debug, error, warn};
use prometheus::{opts, register_int_counter, IntCounter};
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.