Skip to content

Commit

Permalink
reducing yellowstone dependencies (#389)
Browse files Browse the repository at this point in the history
* WIP - not compiling

* remove connect_hacked

* use reimported types

* cleanup

* fix fmt
  • Loading branch information
grooviegermanikus authored Apr 17, 2024
1 parent 2d61436 commit d10910e
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 227 deletions.
156 changes: 79 additions & 77 deletions Cargo.lock

Large diffs are not rendered by default.

33 changes: 18 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@ license = "AGPL"
edition = "2021"

[workspace.dependencies]
solana-sdk = "~1.17.15"
solana-rpc-client = "~1.17.15"
solana-rpc-client-api = "~1.17.15"
solana-transaction-status = "~1.17.15"
solana-version = "~1.17.15"
solana-client = "~1.17.15"
solana-net-utils = "~1.17.15"
solana-pubsub-client = "~1.17.15"
solana-streamer = "~1.17.15"
solana-account-decoder = "~1.17.15"
solana-ledger = "~1.17.15"
solana-program = "~1.17.15"
solana-address-lookup-table-program = "~1.17.15"
solana-sdk = "~1.17.28"
solana-rpc-client = "~1.17.28"
solana-rpc-client-api = "~1.17.28"
solana-transaction-status = "~1.17.28"
solana-version = "~1.17.28"
solana-client = "~1.17.28"
solana-net-utils = "~1.17.28"
solana-pubsub-client = "~1.17.28"
solana-streamer = "~1.17.28"
solana-account-decoder = "~1.17.28"
solana-ledger = "~1.17.28"
solana-program = "~1.17.28"
solana-address-lookup-table-program = "~1.17.28"
itertools = "0.10.5"
rangetools = "0.1.4"
serde = { version = "1.0.160", features = ["derive"] }
Expand Down Expand Up @@ -87,7 +87,10 @@ solana-lite-rpc-accounts = {path = "accounts", version = "0.2.4"}
solana-lite-rpc-accounts-on-demand = {path = "accounts-on-demand", version = "0.2.4"}
bench = { path = "bench", version="0.2.4" }

yellowstone-grpc-proto = "1.13.0"
#geyser-grpc-connector = { path = "../../geyser-grpc-connector" }
geyser-grpc-connector = { tag = "v0.10.6+yellowstone.1.13+solana.1.17.28", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }


async-trait = "0.1.68"
yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" }
yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" }
tonic-health = "0.10"
1 change: 0 additions & 1 deletion accounts-on-demand/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-accounts = { workspace = true }
solana-lite-rpc-cluster-endpoints = { workspace = true }

yellowstone-grpc-client = { workspace = true }
yellowstone-grpc-proto = { workspace = true }

[dev-dependencies]
Expand Down
4 changes: 1 addition & 3 deletions cluster-endpoints/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ repository = "https://github.com/blockworks-foundation/lite-rpc"
license = "AGPL"

[dependencies]
#geyser-grpc-connector = { path = "../../geyser-grpc-connector" }
geyser-grpc-connector = { tag = "v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize-with-broadcast-exit", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }
geyser-grpc-connector = { workspace = true }

solana-sdk = { workspace = true }
solana-rpc-client-api = { workspace = true }
Expand Down Expand Up @@ -42,7 +41,6 @@ derive_more = "0.99.17"
async-channel = { workspace = true }
solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-util = { workspace = true }
yellowstone-grpc-client = { workspace = true }
yellowstone-grpc-proto = { workspace = true }
itertools = {workspace = true}
prometheus = { workspace = true }
Expand Down
39 changes: 26 additions & 13 deletions cluster-endpoints/src/grpc/grpc_accounts_streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use std::{
time::Duration,
};

use geyser_grpc_connector::GrpcSourceConfig;
use geyser_grpc_connector::yellowstone_grpc_util::{
connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig,
};
use geyser_grpc_connector::{GeyserGrpcClient, GeyserGrpcClientResult, GrpcSourceConfig};
use itertools::Itertools;
use solana_lite_rpc_core::{
commitment_utils::Commitment,
Expand All @@ -24,8 +27,7 @@ use yellowstone_grpc_proto::geyser::{
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter,
SubscribeRequestFilterAccountsFilterMemcmp,
};

use crate::grpc::grpc_utils::connect_with_timeout_hacked;
use yellowstone_grpc_proto::tonic::service::Interceptor;

pub fn start_account_streaming_tasks(
grpc_config: GrpcSourceConfig,
Expand Down Expand Up @@ -111,11 +113,8 @@ pub fn start_account_streaming_tasks(
ping: None,
};

let mut client = connect_with_timeout_hacked(
grpc_config.grpc_addr.clone(),
grpc_config.grpc_x_token.clone(),
)
.await?;
let mut client = create_connection(&grpc_config).await?;

let account_stream = client.subscribe_once2(program_subscription).await.unwrap();

// each account subscription batch will require individual stream
Expand All @@ -136,11 +135,7 @@ pub fn start_account_streaming_tasks(
filters: vec![],
},
);
let mut client = connect_with_timeout_hacked(
grpc_config.grpc_addr.clone(),
grpc_config.grpc_x_token.clone(),
)
.await?;
let mut client = create_connection(&grpc_config).await?;

let account_request = SubscribeRequest {
accounts: accounts_subscription,
Expand Down Expand Up @@ -219,6 +214,24 @@ pub fn start_account_streaming_tasks(
})
}

async fn create_connection(
grpc_config: &GrpcSourceConfig,
) -> GeyserGrpcClientResult<GeyserGrpcClient<impl Interceptor + Sized>> {
connect_with_timeout_with_buffers(
grpc_config.grpc_addr.clone(),
grpc_config.grpc_x_token.clone(),
None,
Some(Duration::from_secs(10)),
Some(Duration::from_secs(10)),
GeyserGrpcClientBufferConfig {
buffer_size: Some(65536),
conn_window: Some(5242880),
stream_window: Some(4194304),
},
)
.await
}

pub fn create_grpc_account_streaming(
grpc_sources: Vec<GrpcSourceConfig>,
accounts_filters: AccountFilters,
Expand Down
38 changes: 0 additions & 38 deletions cluster-endpoints/src/grpc/grpc_utils.rs

This file was deleted.

1 change: 0 additions & 1 deletion cluster-endpoints/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
pub mod grpc_accounts_streaming;
pub mod grpc_utils;
87 changes: 18 additions & 69 deletions cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::endpoint_stremers::EndpointStreaming;
use crate::grpc::grpc_accounts_streaming::create_grpc_account_streaming;
use crate::grpc::grpc_utils::connect_with_timeout_hacked;
use crate::grpc_multiplex::{
create_grpc_multiplex_blocks_subscription, create_grpc_multiplex_processed_slots_subscription,
};
use anyhow::Context;
use futures::StreamExt;
use geyser_grpc_connector::yellowstone_grpc_util::{
connect_with_timeout_with_buffers, GeyserGrpcClientBufferConfig,
};
use geyser_grpc_connector::GrpcSourceConfig;
use itertools::Itertools;
use log::trace;
Expand Down Expand Up @@ -36,13 +38,11 @@ use solana_transaction_status::{Reward, RewardType};
use std::cell::OnceCell;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, Notify};
use tracing::trace_span;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeRequestFilterSlots, SubscribeUpdateSlot,
};
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks};

use crate::rpc_polling::vote_accounts_and_cluster_info_polling::{
poll_cluster_info, poll_vote_accounts,
Expand Down Expand Up @@ -291,8 +291,19 @@ pub fn create_block_processing_task(
);

// connect to grpc
let mut client =
connect_with_timeout_hacked(grpc_addr.clone(), grpc_x_token.clone()).await?;
let mut client = connect_with_timeout_with_buffers(
grpc_addr.clone(),
grpc_x_token.clone(),
None,
Some(Duration::from_secs(10)),
Some(Duration::from_secs(10)),
GeyserGrpcClientBufferConfig {
buffer_size: Some(65536),
conn_window: Some(5242880),
stream_window: Some(4194304),
},
)
.await?;
let mut stream = tokio::select! {
res = client
.subscribe_once(
Expand Down Expand Up @@ -358,68 +369,6 @@ pub fn create_block_processing_task(
})
}

// not used
pub fn create_slot_stream_task(
grpc_addr: String,
grpc_x_token: Option<String>,
slot_sx: tokio::sync::mpsc::Sender<SubscribeUpdateSlot>,
commitment_level: CommitmentLevel,
) -> AnyhowJoinHandle {
tokio::spawn(async move {
loop {
let mut slots = HashMap::new();
slots.insert(
"client_slot".to_string(),
SubscribeRequestFilterSlots {
filter_by_commitment: Some(true),
},
);

// connect to grpc
let mut client =
GeyserGrpcClient::connect(grpc_addr.clone(), grpc_x_token.clone(), None)?;
let mut stream = client
.subscribe_once(
slots,
Default::default(),
HashMap::new(),
Default::default(),
HashMap::new(),
Default::default(),
Some(commitment_level),
Default::default(),
None,
)
.await?;

while let Some(message) = stream.next().await {
let message = message?;

let Some(update) = message.update_oneof else {
continue;
};

match update {
UpdateOneof::Slot(slot) => {
slot_sx
.send(slot)
.await
.context("Problem sending on block channel")?;
}
UpdateOneof::Ping(_) => {
log::trace!("GRPC Ping");
}
_ => {
log::trace!("unknown GRPC notification");
}
};
}
log::error!("Grpc block subscription broken (resubscribing)");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
})
}

pub fn create_grpc_subscription(
rpc_client: Arc<RpcClient>,
grpc_sources: Vec<GrpcSourceConfig>,
Expand Down
1 change: 0 additions & 1 deletion cluster-endpoints/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,3 @@ pub mod json_rpc_subscription;
pub mod rpc_polling;

pub use geyser_grpc_connector;
pub use yellowstone_grpc_proto::geyser::CommitmentLevel;
6 changes: 3 additions & 3 deletions config.example.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"rpc_addr": "http://0.0.0.0:8899",
"ws_addr": "ws://0.0.0.0:8900",
"rpc_addr": "http://rpcnode-upstream:8899",
"ws_addr": "ws://rpcnode-upstream:8900",
"lite_rpc_http_addr": "[::]:8890",
"lite_rpc_ws_addr": "[::]:8891",
"fanout_size": 18,
Expand All @@ -11,7 +11,7 @@
"quic_proxy_addr": null,
"use_grpc": false,
"calculate_leader_schedule_from_geyser": false,
"grpc_addr": "http://127.0.0.0:10000",
"grpc_addr": "http://yellowstone-grpc-upstream:10000",
"grpc_x_token": null,
"postgres": {
"pg_config": "your_postgres_config",
Expand Down
8 changes: 4 additions & 4 deletions lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
}

let timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
connect_timeout: Duration::from_secs(15),
request_timeout: Duration::from_secs(15),
subscribe_timeout: Duration::from_secs(15),
receive_timeout: Duration::from_secs(15),
};

let gprc_sources = grpc_sources
Expand Down
4 changes: 2 additions & 2 deletions stake_vote/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ itertools = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
yellowstone-grpc-client = { workspace = true }
yellowstone-grpc-proto = { workspace = true }
#yellowstone-grpc-client = { workspace = true }
#yellowstone-grpc-proto = { workspace = true }
solana-sdk = { workspace = true }
solana-client = { workspace = true }
solana-ledger = { workspace = true }
Expand Down

0 comments on commit d10910e

Please sign in to comment.