Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

feat: add EventStream::select to combine multiple event streams #725

Merged
merged 2 commits into from
Dec 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@

### Unreleased

- Add `EventStream::select` to combine streams with different event types
[#725](https://github.com/gakonst/ethers-rs/pull/725)
- Substitute output tuples with rust struct types for function calls
[#664](https://github.com/gakonst/ethers-rs/pull/664)
- Add AbiType implementation during EthAbiType expansion
Expand Down
137 changes: 136 additions & 1 deletion ethers-contract/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::LogMeta;
use ethers_core::types::{Log, U256};
use futures_util::stream::{Stream, StreamExt};
use futures_util::{
future::Either,
stream::{Stream, StreamExt},
};
use pin_project::pin_project;
use std::{
pin::Pin,
Expand All @@ -23,6 +26,7 @@ pub struct EventStream<'a, T, R, E> {
}

impl<'a, T, R, E> EventStream<'a, T, R, E> {
/// Turns this stream of events into a stream that also yields the event's metadata
pub fn with_meta(self) -> EventStreamMeta<'a, T, R, E> {
EventStreamMeta(self)
}
Expand All @@ -49,9 +53,140 @@ where
}
}

impl<'a, T, R, E> EventStream<'a, T, R, E>
where
T: Stream<Item = Log> + Unpin + 'a,
R: 'a,
E: 'a,
{
/// This function will attempt to pull events from both event streams. Each
/// stream will be polled in a round-robin fashion, and whenever a stream is
/// ready to yield an event that event is yielded.
///
/// After one of the two event streams completes, the remaining one will be
/// polled exclusively. The returned stream completes when both input
/// streams have completed.
///
///
/// Note that this function consumes both streams and returns a wrapped
/// version of them.
/// The item of the wrapped stream is an `Either`, and the items that the `self` streams yields
/// will be stored in the left-hand variant of that `Either` and the other stream's (`st`) items
/// will be wrapped into the right-hand variant of that `Either`.
///
/// # Example
///
/// ```
/// # async fn test<M:ethers_providers::Middleware>(contract: ethers_contract::Contract<M>) {
/// # use ethers_core::types::*;
/// # use futures_util::stream::StreamExt;
/// # use futures_util::future::Either;
/// # use ethers_contract::{Contract, ContractFactory, EthEvent};
///
/// #[derive(Clone, Debug, EthEvent)]
/// pub struct Approval {
/// #[ethevent(indexed)]
/// pub token_owner: Address,
/// #[ethevent(indexed)]
/// pub spender: Address,
/// pub tokens: U256,
/// }
///
/// #[derive(Clone, Debug, EthEvent)]
/// pub struct Transfer {
/// #[ethevent(indexed)]
/// pub from: Address,
/// #[ethevent(indexed)]
/// pub to: Address,
/// pub tokens: U256,
/// }
///
///
/// let ev1 = contract.event::<Approval>().from_block(1337).to_block(2000);
/// let ev2 = contract.event::<Transfer>();
///
/// let mut events = ev1.stream().await.unwrap().select(ev2.stream().await.unwrap()).ok();
///
/// while let Some(either) = events.next().await {
/// match either {
/// Either::Left(approval) => { let Approval{token_owner,spender,tokens} = approval; }
/// Either::Right(transfer) => { let Transfer{from,to,tokens} = transfer; }
/// }
/// }
///
/// # }
/// ```
pub fn select<St>(self, st: St) -> SelectEvent<SelectEither<'a, Result<R, E>, St::Item>>
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if there's a world where we do this with a macro instead of a function (like the futures select!), so that we are not limited to only selecting 2 events?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

driving multiple streams to completion with select! is a bit verbose and the core problem is that we need to return a type that represents all the different events.
you can do this with EventStream::select

let ev3 = contract.event::<OtherEvent>();

let stream = ev3.stream().await().unwrap().select(
   ev1.stream().await.unwrap().select(ev2.stream().await.unwrap())
);

which will return an Either<OtherEvent, Either<Approval, Transfer>>

But there'd be also the possibility to create your own event type and use the Contract::event_with_filter function

 #[derive(Debug, Clone, PartialEq, Eq)]
    pub enum Events {
        Approval(Approval),
        Transfer(Transfer),
        Other(Other),
    }
  impl EthLogDecode for Events {...}

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right! OK if that pattern works then should be fine.

where
St: Stream + Unpin + 'a,
{
SelectEvent(Box::pin(futures_util::stream::select(
self.map(Either::Left),
st.map(Either::Right),
)))
}
}

pub type SelectEither<'a, L, R> = Pin<Box<dyn Stream<Item = Either<L, R>> + 'a>>;

#[pin_project]
pub struct SelectEvent<T>(#[pin] T);

impl<'a, T, L, LE, R, RE> SelectEvent<T>
where
T: Stream<Item = Either<Result<L, LE>, Result<R, RE>>> + 'a,
L: 'a,
LE: 'a,
R: 'a,
RE: 'a,
{
/// Turns a stream of Results to a stream of `Result::ok` for both arms
pub fn ok(self) -> Pin<Box<dyn Stream<Item = Either<L, R>> + 'a>> {
Box::pin(self.filter_map(|e| async move {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want the filter_map here? Won't this silently eat all errors if any are observed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this skips all erroneous values, but is probably useful if you're only interested in the ok values.

match e {
Either::Left(res) => res.ok().map(Either::Left),
Either::Right(res) => res.ok().map(Either::Right),
}
}))
}
}

impl<T: Stream> Stream for SelectEvent<T> {
type Item = T::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
this.0.poll_next(cx)
}
}

/// Wrapper around a `EventStream`, that in addition to the deserialized Event type also yields the
/// `LogMeta`.
#[pin_project]
pub struct EventStreamMeta<'a, T, R, E>(pub EventStream<'a, T, R, E>);

impl<'a, T, R, E> EventStreamMeta<'a, T, R, E>
where
T: Stream<Item = Log> + Unpin + 'a,
R: 'a,
E: 'a,
{
/// See `EventStream::select`
#[allow(clippy::type_complexity)]
pub fn select<St>(
self,
st: St,
) -> SelectEvent<SelectEither<'a, Result<(R, LogMeta), E>, St::Item>>
where
St: Stream + Unpin + 'a,
{
SelectEvent(Box::pin(futures_util::stream::select(
self.map(Either::Left),
st.map(Either::Right),
)))
}
}

impl<'a, T, R, E> Stream for EventStreamMeta<'a, T, R, E>
where
T: Stream<Item = Log> + Unpin,
Expand Down