Skip to content

Commit 1bb5bde

Browse files
feat(cast): subscribe to logs
1 parent 0a51288 commit 1bb5bde

File tree

2 files changed

+195
-45
lines changed

2 files changed

+195
-45
lines changed

crates/cast/bin/cmd/logs.rs

Lines changed: 48 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
1+
use std::{io, str::FromStr};
2+
13
use cast::Cast;
24
use clap::Parser;
3-
use ethers::{
5+
use ethers::{providers::Middleware, types::NameOrAddress};
6+
use ethers_core::{
47
abi::{Address, Event, RawTopicFilter, Topic, TopicFilter},
5-
providers::Middleware,
6-
types::{BlockId, BlockNumber, Filter, FilterBlockOption, NameOrAddress, ValueOrArray, H256},
8+
types::{BlockId, BlockNumber, Filter, FilterBlockOption, ValueOrArray, H256},
79
};
810
use eyre::Result;
911
use foundry_cli::{opts::EthereumOpts, utils};
1012
use foundry_common::abi::{get_event, parse_tokens};
1113
use foundry_config::Config;
1214
use itertools::Itertools;
13-
use std::str::FromStr;
1415

1516
/// CLI arguments for `cast logs`.
1617
#[derive(Debug, Parser)]
@@ -19,32 +20,37 @@ pub struct LogsArgs {
1920
///
2021
/// Can also be the tags earliest, finalized, safe, latest, or pending.
2122
#[clap(long)]
22-
from_block: Option<BlockId>,
23+
pub from_block: Option<BlockId>,
2324

2425
/// The block height to stop query at.
2526
///
2627
/// Can also be the tags earliest, finalized, safe, latest, or pending.
2728
#[clap(long)]
28-
to_block: Option<BlockId>,
29+
pub to_block: Option<BlockId>,
2930

3031
/// The contract address to filter on.
3132
#[clap(
3233
long,
3334
value_parser = NameOrAddress::from_str
3435
)]
35-
address: Option<NameOrAddress>,
36+
pub address: Option<NameOrAddress>,
3637

3738
/// The signature of the event to filter logs by which will be converted to the first topic or
3839
/// a topic to filter on.
3940
#[clap(value_name = "SIG_OR_TOPIC")]
40-
sig_or_topic: Option<String>,
41+
pub sig_or_topic: Option<String>,
4142

4243
/// If used with a signature, the indexed fields of the event to filter by. Otherwise, the
4344
/// remaining topics of the filter.
4445
#[clap(value_name = "TOPICS_OR_ARGS")]
45-
topics_or_args: Vec<String>,
46+
pub topics_or_args: Vec<String>,
47+
48+
/// If the RPC type and endpoints supports `eth_subscribe` stream logs instead of printing and
49+
/// exiting. Will continue until interrupted or TO_BLOCK is reached.
50+
#[clap(long)]
51+
pub subscribe: bool,
4652

47-
/// Print the logs as JSON.
53+
/// Print the logs as JSON.s
4854
#[clap(long, short, help_heading = "Display options")]
4955
json: bool,
5056

@@ -55,12 +61,21 @@ pub struct LogsArgs {
5561
impl LogsArgs {
5662
pub async fn run(self) -> Result<()> {
5763
let LogsArgs {
58-
from_block, to_block, address, topics_or_args, sig_or_topic, json, eth, ..
64+
from_block,
65+
to_block,
66+
address,
67+
sig_or_topic,
68+
topics_or_args,
69+
subscribe,
70+
json,
71+
eth,
5972
} = self;
6073

6174
let config = Config::from(&eth);
6275
let provider = utils::get_provider(&config)?;
6376

77+
let cast = Cast::new(&provider);
78+
6479
let address = match address {
6580
Some(address) => {
6681
let address = match address {
@@ -72,48 +87,34 @@ impl LogsArgs {
7287
None => None,
7388
};
7489

75-
let from_block = convert_block_number(&provider, from_block).await?;
76-
let to_block = convert_block_number(&provider, to_block).await?;
77-
78-
let cast = Cast::new(&provider);
90+
let from_block = cast.convert_block_number(from_block).await?;
91+
let to_block = cast.convert_block_number(to_block).await?;
7992

8093
let filter = build_filter(from_block, to_block, address, sig_or_topic, topics_or_args)?;
8194

82-
let logs = cast.filter_logs(filter, json).await?;
95+
if !subscribe {
96+
let logs = cast.filter_logs(filter, json).await?;
97+
98+
println!("{}", logs);
8399

84-
println!("{}", logs);
100+
return Ok(())
101+
}
102+
103+
let mut stdout = io::stdout();
104+
cast.subscribe(filter, &mut stdout, json, ctrl_c_future()).await?;
85105

86106
Ok(())
87107
}
88108
}
89109

90-
/// Converts a block identifier into a block number.
91-
///
92-
/// If the block identifier is a block number, then this function returns the block number. If the
93-
/// block identifier is a block hash, then this function returns the block number of that block
94-
/// hash. If the block identifier is `None`, then this function returns `None`.
95-
async fn convert_block_number<M: Middleware>(
96-
provider: M,
97-
block: Option<BlockId>,
98-
) -> Result<Option<BlockNumber>, eyre::Error>
99-
where
100-
M::Error: 'static,
101-
{
102-
match block {
103-
Some(block) => match block {
104-
BlockId::Number(block_number) => Ok(Some(block_number)),
105-
BlockId::Hash(hash) => {
106-
let block = provider.get_block(hash).await?;
107-
Ok(block.map(|block| block.number.unwrap()).map(BlockNumber::from))
108-
}
109-
},
110-
None => Ok(None),
111-
}
110+
async fn ctrl_c_future() -> Result<()> {
111+
tokio::signal::ctrl_c().await?;
112+
Ok(())
112113
}
113114

114-
// First tries to parse the `sig_or_topic` as an event signature. If successful, `topics_or_args` is
115-
// parsed as indexed inputs and converted to topics. Otherwise, `sig_or_topic` is prepended to
116-
// `topics_or_args` and used as raw topics.
115+
/// Builds a Filter by first trying to parse the `sig_or_topic` as an event signature. If
116+
/// successful, `topics_or_args` is parsed as indexed inputs and converted to topics. Otherwise,
117+
/// `sig_or_topic` is prepended to `topics_or_args` and used as raw topics.
117118
fn build_filter(
118119
from_block: Option<BlockNumber>,
119120
to_block: Option<BlockNumber>,
@@ -154,7 +155,7 @@ fn build_filter(
154155
Ok(filter)
155156
}
156157

157-
// Creates a TopicFilter for the given event signature and arguments.
158+
/// Creates a TopicFilter from the given event signature and arguments.
158159
fn build_filter_event_sig(event: Event, args: Vec<String>) -> Result<TopicFilter, eyre::Error> {
159160
let args = args.iter().map(|arg| arg.as_str()).collect::<Vec<_>>();
160161

@@ -195,7 +196,7 @@ fn build_filter_event_sig(event: Event, args: Vec<String>) -> Result<TopicFilter
195196
Ok(event.filter(raw)?)
196197
}
197198

198-
// Creates a TopicFilter from raw topic hashes.
199+
/// Creates a TopicFilter from raw topic hashes.
199200
fn build_filter_topics(topics: Vec<String>) -> Result<TopicFilter, eyre::Error> {
200201
let mut topics = topics
201202
.into_iter()
@@ -214,8 +215,11 @@ fn build_filter_topics(topics: Vec<String>) -> Result<TopicFilter, eyre::Error>
214215

215216
#[cfg(test)]
216217
mod tests {
218+
use std::str::FromStr;
219+
217220
use super::*;
218221
use ethers::types::H160;
222+
use ethers_core::types::H256;
219223

220224
const ADDRESS: &str = "0x4D1A2e2bB4F88F0250f26Ffff098B0b30B26BF38";
221225
const TRANSFER_SIG: &str = "Transfer(address indexed,address indexed,uint256)";

crates/cast/src/lib.rs

Lines changed: 147 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,20 @@ use ethers_core::{
1313
},
1414
};
1515
use ethers_etherscan::{errors::EtherscanError, Client};
16-
use ethers_providers::{Middleware, PendingTransaction};
16+
use ethers_providers::{Middleware, PendingTransaction, PubsubClient};
1717
use evm_disassembler::{disassemble_bytes, disassemble_str, format_operations};
1818
use eyre::{Context, Result};
1919
use foundry_common::{abi::encode_args, fmt::*, TransactionReceiptWithRevertReason};
2020
pub use foundry_evm::*;
21+
use futures::{future::Either, pin_mut, Future, FutureExt, StreamExt};
2122
use rayon::prelude::*;
2223
pub use rusoto_core::{
2324
credential::ChainProvider as AwsChainProvider, region::Region as AwsRegion,
2425
request::HttpClient as AwsHttpClient, Client as AwsClient,
2526
};
2627
pub use rusoto_kms::KmsClient;
2728
use std::{
29+
io,
2830
path::PathBuf,
2931
str::FromStr,
3032
sync::atomic::{AtomicBool, Ordering},
@@ -816,6 +818,150 @@ where
816818
};
817819
Ok(res)
818820
}
821+
822+
/// Converts a block identifier into a block number.
823+
///
824+
/// If the block identifier is a block number, then this function returns the block number. If
825+
/// the block identifier is a block hash, then this function returns the block number of
826+
/// that block hash. If the block identifier is `None`, then this function returns `None`.
827+
///
828+
/// # Example
829+
///
830+
/// ```no_run
831+
/// use cast::Cast;
832+
/// use ethers_providers::{Provider, Http};
833+
/// use ethers_core::types::{BlockId, BlockNumber};
834+
/// use std::convert::TryFrom;
835+
///
836+
/// # async fn foo() -> eyre::Result<()> {
837+
/// let provider = Provider::<Http>::try_from("http://localhost:8545")?;
838+
/// let cast = Cast::new(provider);
839+
///
840+
/// let block_number = cast.convert_block_number(Some(BlockId::Number(BlockNumber::from(5)))).await?;
841+
/// assert_eq!(block_number, Some(BlockNumber::from(5)));
842+
///
843+
/// let block_number = cast.convert_block_number(Some(BlockId::Hash("0x1234".parse().unwrap()))).await?;
844+
/// assert_eq!(block_number, Some(BlockNumber::from(1234)));
845+
///
846+
/// let block_number = cast.convert_block_number(None).await?;
847+
/// assert_eq!(block_number, None);
848+
/// # Ok(())
849+
/// # }
850+
/// ```
851+
pub async fn convert_block_number(
852+
&self,
853+
block: Option<BlockId>,
854+
) -> Result<Option<BlockNumber>, eyre::Error> {
855+
match block {
856+
Some(block) => match block {
857+
BlockId::Number(block_number) => Ok(Some(block_number)),
858+
BlockId::Hash(hash) => {
859+
let block = self.provider.get_block(hash).await?;
860+
Ok(block.map(|block| block.number.unwrap()).map(BlockNumber::from))
861+
}
862+
},
863+
None => Ok(None),
864+
}
865+
}
866+
867+
/// Sets up a subscription to the given filter and writes the logs to the given output.
868+
///
869+
/// # Example
870+
///
871+
/// ```no_run
872+
/// use cast::Cast;
873+
/// use ethers_core::abi::Address;
874+
/// use ethers_providers::{Provider, Ws};
875+
/// use ethers_core::types::Filter;
876+
/// use std::{str::FromStr, convert::TryFrom};
877+
/// use std::io;
878+
///
879+
/// # async fn foo() -> eyre::Result<()> {
880+
/// let provider = Provider::new(Ws::connect("wss://localhost:8545").await?);
881+
/// let cast = Cast::new(provider);
882+
///
883+
/// let filter = Filter::new().address(Address::from_str("0x00000000006c3852cbEf3e08E8dF289169EdE581")?);
884+
/// let mut output = io::stdout();
885+
/// cast.subscribe(filter, &mut output, false).await?;
886+
/// # Ok(())
887+
/// # }
888+
/// ```
889+
pub async fn subscribe<F>(
890+
&self,
891+
filter: Filter,
892+
output: &mut dyn io::Write,
893+
to_json: bool,
894+
cancel: F,
895+
) -> Result<()>
896+
where
897+
<M as Middleware>::Provider: PubsubClient,
898+
F: Future<Output = Result<()>> + Send + 'static,
899+
{
900+
// Initialize the subscription stream for logs
901+
let mut subscription = self.provider.subscribe_logs(&filter).await?;
902+
903+
// Check if a to_block is specified, if so, subscribe to blocks
904+
let mut block_subscription = if filter.get_to_block().is_some() {
905+
Some(self.provider.subscribe_blocks().await?)
906+
} else {
907+
None
908+
};
909+
910+
let to_block_number = filter.get_to_block();
911+
912+
// If output should be JSON, start with an opening bracket
913+
if to_json {
914+
write!(output, "[")?;
915+
}
916+
917+
let mut first = true;
918+
pin_mut!(cancel);
919+
920+
loop {
921+
tokio::select! {
922+
// If block subscription is present, listen to it to avoid blocking indefinitely past the desired to_block
923+
block = if let Some(bs) = &mut block_subscription {
924+
Either::Left(bs.next().fuse())
925+
} else {
926+
Either::Right(futures::future::pending())
927+
} => {
928+
if let (Some(block), Some(to_block)) = (block, to_block_number) {
929+
if block.number.map_or(false, |bn| bn > to_block) {
930+
break;
931+
}
932+
}
933+
},
934+
// Process incoming log
935+
log = subscription.next() => {
936+
if to_json {
937+
if !first {
938+
write!(output, ",")?;
939+
}
940+
first = false;
941+
let log_str = serde_json::to_string(&log).unwrap();
942+
write!(output, "{}", log_str)?;
943+
} else {
944+
let log_str = log.pretty()
945+
.replacen('\n', "- ", 1) // Remove empty first line
946+
.replace('\n', "\n "); // Indent
947+
writeln!(output, "{}", log_str)?;
948+
}
949+
},
950+
// Break on cancel signal, to allow for closing JSON bracket
951+
_ = &mut cancel => {
952+
break;
953+
},
954+
else => break,
955+
}
956+
}
957+
958+
// If output was JSON, end with a closing bracket
959+
if to_json {
960+
write!(output, "]")?;
961+
}
962+
963+
Ok(())
964+
}
819965
}
820966

821967
pub struct InterfaceSource {

0 commit comments

Comments
 (0)