Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ The minor version will be incremented upon a breaking change and the patch versi

### Features

- richat: support tokenInitSubscribe in pubsub ([#143](https://github.com/lamports-dev/richat/pull/143))

### Breaking

## 2025-10-24
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ solana-transaction-context = "~3.0.4"
solana-transaction-status = "~3.0.4"
solana-version = "~3.0.4"
spl-token-2022-interface = "2.0.0"
spl-token-interface = "2.0.0"
thiserror = "2.0.7"
tikv-jemallocator = { version = "0.6.0", features = ["unprefixed_malloc_on_supported_platforms"] }
tokio = "1.42.0"
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ flowchart LR
- `filter` — library for filtering geyser messages
- `plugin-agave` — Agave validator geyser plugin https://docs.anza.xyz/validator/geyser
- `proto` — library with proto files, re-imports structs from crate `yellowstone-grpc-proto`
- `richat` — app with full stream consumer and producers: gRPC (`Dragon's Mouth`), Solana PubSub
- `richat` — app with full stream consumer and producers: gRPC (`Dragon's Mouth`), Solana PubSub (with [transactionSubscribe](https://github.com/solana-foundation/solana-improvement-documents/pull/69) and [tokenInitSubscribe](./token-init.md))
- `shared` — shared code between components (except `client`)

## Releases
Expand All @@ -125,6 +125,7 @@ flowchart LR
- `agave-v2.0` — development branch for agave v2.0
- `agave-v2.1` — development branch for agave v2.1
- `agave-v2.2` — development branch for agave v2.2
- `agave-v2.3` — development branch for agave v2.3

#### Tags

Expand Down
1 change: 1 addition & 0 deletions richat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ solana-sdk = { workspace = true }
solana-transaction-status = { workspace = true }
solana-version = { workspace = true }
spl-token-2022-interface = { workspace = true }
spl-token-interface = { workspace = true }
thiserror = { workspace = true }
tikv-jemallocator = { workspace = true }
tokio = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions richat/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ apps:
# recv_max_message_size: 4_096
# enable_block_subscription: false
# enable_transaction_subscription: false
# enable_token_init_subscription: false
# clients_requests_channel_size: 8_192
# subscriptions_worker_affinity: null # by default no affinity (taskset syntax)
# subscriptions_workers_count: 2
Expand Down
2 changes: 2 additions & 0 deletions richat/src/pubsub/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub struct ConfigAppsPubsub {
pub recv_max_message_size: usize,
pub enable_block_subscription: bool,
pub enable_transaction_subscription: bool,
pub enable_token_init_subscription: bool,
#[serde(deserialize_with = "deserialize_num_str")]
pub clients_requests_channel_size: usize,
#[serde(deserialize_with = "deserialize_affinity")]
Expand Down Expand Up @@ -52,6 +53,7 @@ impl Default for ConfigAppsPubsub {
recv_max_message_size: 4 * 1024, // 4KiB
enable_block_subscription: false,
enable_transaction_subscription: false,
enable_token_init_subscription: false,
clients_requests_channel_size: 8_192,
subscriptions_worker_affinity: None,
subscriptions_workers_count: 2,
Expand Down
7 changes: 7 additions & 0 deletions richat/src/pubsub/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,10 @@ pub enum RpcBlockUpdateError {
#[error("failed to parse proto: {0}")]
Parse(&'static str),
}

#[derive(Debug, Serialize)]
pub struct RpcTokenInitUpdate {
pub accounts: Vec<String>,
pub signature: String,
pub failed: bool,
}
4 changes: 4 additions & 0 deletions richat/src/pubsub/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl PubSubServer {
let recv_max_message_size = config.recv_max_message_size;
let enable_block_subscription = config.enable_block_subscription;
let enable_transaction_subscription = config.enable_transaction_subscription;
let enable_token_init_subscription = config.enable_token_init_subscription;
let service = service_fn({
let clients_tx = clients_tx.clone();
let notifications = notifications.clone();
Expand Down Expand Up @@ -146,6 +147,7 @@ impl PubSubServer {
recv_max_message_size,
enable_block_subscription,
enable_transaction_subscription,
enable_token_init_subscription,
clients_tx,
notifications,
shutdown,
Expand Down Expand Up @@ -217,6 +219,7 @@ impl PubSubServer {
recv_max_message_size: usize,
enable_block_subscription: bool,
enable_transaction_subscription: bool,
enable_token_init_subscription: bool,
clients_tx: mpsc::Sender<ClientRequest>,
mut notifications: broadcast::Receiver<RpcNotification>,
shutdown: CancellationToken,
Expand Down Expand Up @@ -269,6 +272,7 @@ impl PubSubServer {
payload.as_ref(),
enable_block_subscription,
enable_transaction_subscription,
enable_token_init_subscription,
) {
Ok(Some(msg)) => msg,
Ok(None) => continue,
Expand Down
75 changes: 67 additions & 8 deletions richat/src/pubsub/solana.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::{
channel::ParsedMessage,
pubsub::{filter::TransactionFilter, SubscriptionId},
pubsub::{filter::TransactionFilter, tracker::TokenInitParsedTransactions, SubscriptionId},
},
arrayvec::ArrayVec,
jsonrpsee_types::{
Expand Down Expand Up @@ -50,6 +50,7 @@ impl SubscribeMessage {
message: &[u8],
enable_block_subscription: bool,
enable_transaction_subscription: bool,
enable_token_init_subscription: bool,
) -> Result<Option<Self>, Response<'static, ()>> {
let call: Request = serde_json::from_slice(message).map_err(|_error| Response {
jsonrpc: Some(TwoPointZero),
Expand All @@ -63,6 +64,7 @@ impl SubscribeMessage {
call.params,
enable_block_subscription,
enable_transaction_subscription,
enable_token_init_subscription,
)
.map_err(|error| Response {
jsonrpc: Some(TwoPointZero),
Expand All @@ -89,14 +91,20 @@ pub enum SubscribeMethod {
Block,
Root,
Transaction,
TokenInit,
}

impl SubscribeMethod {
pub const fn get_message_methods(message: &ParsedMessage) -> &[Self] {
match message {
ParsedMessage::Slot(_) => &[Self::Slot, Self::SlotsUpdates, Self::Root],
ParsedMessage::Account(_) => &[Self::Account, Self::Program],
ParsedMessage::Transaction(_) => &[Self::Logs, Self::Signature, Self::Transaction],
ParsedMessage::Transaction(_) => &[
Self::Logs,
Self::Signature,
Self::Transaction,
Self::TokenInit,
],
ParsedMessage::Entry(_) => &[],
ParsedMessage::BlockMeta(_) => &[],
ParsedMessage::Block(_) => &[Self::Block],
Expand All @@ -114,6 +122,7 @@ impl SubscribeMethod {
Self::Block => "block",
Self::Root => "root",
Self::Transaction => "transaction",
Self::TokenInit => "tokeninit",
}
}
}
Expand Down Expand Up @@ -175,6 +184,10 @@ pub enum SubscribeConfig {
max_supported_transaction_version: Option<u8>,
commitment: CommitmentConfig,
},
TokenInit {
pubkey: Pubkey,
commitment: CommitmentConfig,
},
Unsubscribe {
id: SubscriptionId,
},
Expand All @@ -188,6 +201,7 @@ impl SubscribeConfig {
params: Option<Cow<'_, RawValue>>,
enable_block_subscription: bool,
enable_transaction_subscription: bool,
enable_token_init_subscription: bool,
) -> Result<Self, ErrorObjectOwned> {
match method {
"accountSubscribe" => {
Expand Down Expand Up @@ -354,11 +368,11 @@ impl SubscribeConfig {
#[serde(rename_all = "camelCase")]
struct ReqTransactionSubscribeConfig {
#[serde(flatten)]
pub commitment: Option<CommitmentConfig>,
pub encoding: Option<UiTransactionEncoding>,
pub transaction_details: Option<TransactionDetails>,
pub show_rewards: Option<bool>,
pub max_supported_transaction_version: Option<u8>,
commitment: Option<CommitmentConfig>,
encoding: Option<UiTransactionEncoding>,
transaction_details: Option<TransactionDetails>,
show_rewards: Option<bool>,
max_supported_transaction_version: Option<u8>,
}

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -389,6 +403,33 @@ impl SubscribeConfig {
commitment: config.commitment.unwrap_or_default(),
})
}
"tokenInitSubscribe" => {
if !enable_token_init_subscription {
return Err(ErrorCode::MethodNotFound.into());
}

#[derive(Debug, Default, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ReqTokenInitSubscribeConfig {
#[serde(flatten)]
pub commitment: Option<CommitmentConfig>,
}

#[derive(Debug, Deserialize)]
struct ReqParams {
pubkey: String,
#[serde(default)]
config: Option<ReqTokenInitSubscribeConfig>,
}

let ReqParams { pubkey, config } = parse_params(params)?;
let config = config.unwrap_or_default();

Ok(Self::TokenInit {
pubkey: param::<Pubkey>(&pubkey, "pubkey")?,
commitment: config.commitment.unwrap_or_default(),
})
}
"accountUnsubscribe"
| "programUnsubscribe"
| "logsUnsubscribe"
Expand All @@ -398,9 +439,11 @@ impl SubscribeConfig {
| "blockUnsubscribe"
| "voteUnsubscribe"
| "rootUnsubscribe"
| "transactionUnsubscribe" => {
| "transactionUnsubscribe"
| "tokenInitUnsubscribe" => {
if (method == "blockUnsubscribe" && !enable_block_subscription)
|| (method == "transactionUnsubscribe" && !enable_transaction_subscription)
|| (method == "tokenInitUnsubscribe" && !enable_token_init_subscription)
{
return Err(ErrorCode::MethodNotFound.into());
}
Expand Down Expand Up @@ -436,6 +479,7 @@ impl SubscribeConfig {
Self::Block { commitment, .. } => commitment.commitment,
Self::Root => CommitmentLevel::Processed,
Self::Transaction { commitment, .. } => commitment.commitment,
Self::TokenInit { commitment, .. } => commitment.commitment,
Self::Unsubscribe { .. } => unreachable!(),
Self::GetVersion => unreachable!(),
Self::GetVersionRichat => unreachable!(),
Expand All @@ -453,6 +497,7 @@ impl SubscribeConfig {
Self::Block { .. } => SubscribeMethod::Block,
Self::Root => SubscribeMethod::Root,
Self::Transaction { .. } => SubscribeMethod::Transaction,
Self::TokenInit { .. } => SubscribeMethod::TokenInit,
Self::Unsubscribe { .. } => unreachable!(),
Self::GetVersion => unreachable!(),
Self::GetVersionRichat => unreachable!(),
Expand Down Expand Up @@ -591,6 +636,20 @@ impl SubscribeConfig {
_ => None,
}
}

pub fn filter_transaction_token_init(
&self,
message: &MessageTransaction,
token_init: &TokenInitParsedTransactions,
) -> Option<Vec<Pubkey>> {
match self {
Self::TokenInit { pubkey, .. } => {
let accounts = token_init.get_token_init(message, pubkey);
(!accounts.is_empty()).then_some(accounts)
}
_ => None,
}
}
}

fn check_is_at_least_confirmed(commitment: CommitmentConfig) -> Result<(), ErrorObjectOwned> {
Expand Down
Loading
Loading