Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using streams as event listener handlers #1158

Closed
chpio opened this issue Jan 8, 2019 · 6 comments
Closed

Using streams as event listener handlers #1158

chpio opened this issue Jan 8, 2019 · 6 comments

Comments

@chpio
Copy link

chpio commented Jan 8, 2019

Hi,

i want to use streams for event handling.

let future = element.create_event_stream("click").for_each(|_| ...);
spawn_local(future); // https://github.com/rustwasm/wasm-bindgen/pull/1148
  • the goal is get something like http://reactivex.io/
  • but i also want compatibility to the rest of the ecosystem, thus i think it should be based on the futures lib
  • i would extend Stream by adding additional adapters for all kinds of functionality (eg debounce as shown on the ReactiveX website)
  • what is the best way to cancel such a stream? say i want to remove the event listeners from the element(s). Maybe by storing some sort of guard on the rust side by dropping it it would drop the future and all event stream sources causing the removal of event listeners from the element?
@Pauan
Copy link
Contributor

Pauan commented Jan 8, 2019

@chpio Try this:

use wasm_bindgen::{JsValue, JsCast};
use wasm_bindgen::closure::Closure;
use web_sys::EventTarget;
use futures::Poll;
use futures::stream::Stream;
use futures::sync::mpsc::{unbounded, UnboundedReceiver};

pub struct EventStream<'a, A> {
    node: EventTarget,
    kind: &'a str,
    callback: Closure<Fn(JsValue)>,
    receiver: UnboundedReceiver<A>,
}

impl<'a, A> EventStream<'a, A> where A: JsCast + 'static {
    pub fn new(node: &EventTarget, kind: &'a str) -> Self {
        let (sender, receiver) = unbounded();

        let callback = Closure::wrap(Box::new(move |event: JsValue| {
            sender.unbounded_send(event.dyn_into().unwrap()).unwrap();
        }) as Box<Fn(JsValue)>);

        node.add_event_listener_with_callback(kind, callback.as_ref().unchecked_ref()).unwrap();

        Self {
            node: node.clone(),
            kind,
            callback,
            receiver,
        }
    }
}

impl<'a, A> Stream for EventStream<'a, A> {
    type Item = A;
    type Error = ();

    #[inline]
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        self.receiver.poll()
    }
}

impl<'a, A> Drop for EventStream<'a, A> {
    #[inline]
    fn drop(&mut self) {
        self.node.remove_event_listener_with_callback(self.kind, self.callback.as_ref().unchecked_ref()).unwrap();
    }
}

This lets you easily create Streams from events:

EventStream::new(&some_element, "click")
    .map(|event: MouseEvent| {
        ...
    })

Also, you should check out my Futures Signals crate. As its name suggests, it is based on top of Futures. It doesn't work for events (you should use Stream for that), but it's useful for other things, like dominator.

@Pauan
Copy link
Contributor

Pauan commented Jan 8, 2019

To answer your questions:

i would extend Stream by adding additional adapters for all kinds of functionality (eg debounce as shown on the ReactiveX website)

You would generally do that by creating a StreamExt trait, like this:

pub trait StreamExt: Stream {
    // define default methods in here
}

impl<T: ?Sized> StreamExt for T where T: Stream {}

Here's a more detailed example. It's for Futures 0.3, but the same principles apply to Futures 0.1

Of course the best option is to contribute directly to the Futures crate, rather than creating a separate StreamExt trait. That way every user of Futures can benefit.

what is the best way to cancel such a stream? say i want to remove the event listeners from the element(s). Maybe by storing some sort of guard on the rust side by dropping it it would drop the future and all event stream sources causing the removal of event listeners from the element?

You just impl Drop, as shown in my code above. It only needs to handle the event listener cleanup, since the UnboundedReceiver and Closure are automatically dropped.

@chpio
Copy link
Author

chpio commented Jan 8, 2019

You just impl Drop, as shown in my code above. It only needs to handle the event listener cleanup, since the UnboundedReceiver and Closure are automatically dropped.

I haven't tested it, so i might be wrong, but i would pass the future to the spawn_local fn, and for this reason lose the control to drop it on the rust side, so it would live for-ever on the js side?

@Pauan
Copy link
Contributor

Pauan commented Jan 8, 2019

I haven't tested it, so i might be wrong, but i would pass the future to the spawn_local fn, and for this reason lose the control to drop it on the rust side, so it would live for-ever on the js side?

Oh, that's what you meant. You can use take_while to stop the Stream early (this will drop everything, which then causes the event listener to be cleaned up).

You could also use zip to end a Stream early.

You can also use abortable in Futures 0.3 to abort a Future. This also works for aborting a Stream, since StreamExt::for_each returns a Future.

However, wasm-bindgen currently uses Futures 0.1, so you'll have to wait for wasm-bindgen to upgrade to 0.3 so you can use abortable. In the meantime you can use take_while instead.

@Pauan
Copy link
Contributor

Pauan commented Jan 8, 2019

(Or you could write your own system for aborting Futures, which is what I did for futures-signals)

@alexcrichton
Copy link
Contributor

I think @Pauan seems to have answered this question here (thanks!), so I'm gonna close. If there's more though please let me know and I'll reopen!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants