Skip to content

Commit

Permalink
Web: cache decoded image as well
Browse files Browse the repository at this point in the history
  • Loading branch information
daxpedda committed Dec 22, 2023
1 parent 5bb3dd4 commit 1f03bbc
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 65 deletions.
66 changes: 37 additions & 29 deletions src/platform_impl/web/async/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,78 +6,80 @@ use std::sync::mpsc::{self, Receiver, RecvError, SendError, Sender, TryRecvError
use std::sync::{Arc, Mutex};
use std::task::Poll;

// NOTE: This channel doesn't wake up when all senders or receivers are
// dropped. This is acceptable as long as it's only used in `Dispatcher`, which
// has it's own `Drop` behavior.

pub fn channel<T>() -> (AsyncSender<T>, AsyncReceiver<T>) {
let (sender, receiver) = mpsc::channel();
let sender = Arc::new(Mutex::new(sender));
let inner = Arc::new(Inner {
let shared = Arc::new(Shared {
closed: AtomicBool::new(false),
waker: AtomicWaker::new(),
});

let sender = AsyncSender {
sender,
inner: Arc::clone(&inner),
};
let sender = AsyncSender(Arc::new(SenderInner {
sender: Mutex::new(sender),
shared: Arc::clone(&shared),
}));
let receiver = AsyncReceiver {
receiver: Rc::new(receiver),
inner,
shared,
};

(sender, receiver)
}

pub struct AsyncSender<T> {
pub struct AsyncSender<T>(Arc<SenderInner<T>>);

struct SenderInner<T> {
// We need to wrap it into a `Mutex` to make it `Sync`. So the sender can't
// be accessed on the main thread, as it could block. Additionally we need
// to wrap it in an `Arc` to make it clonable on the main thread without
// to wrap `Sender` in an `Arc` to make it clonable on the main thread without
// having to block.
sender: Arc<Mutex<Sender<T>>>,
inner: Arc<Inner>,
sender: Mutex<Sender<T>>,
shared: Arc<Shared>,
}

impl<T> AsyncSender<T> {
pub fn send(&self, event: T) -> Result<(), SendError<T>> {
self.sender.lock().unwrap().send(event)?;
self.inner.waker.wake();
self.0.sender.lock().unwrap().send(event)?;
self.0.shared.waker.wake();

Ok(())
}
}

pub fn close(&self) {
self.inner.closed.store(true, Ordering::Relaxed);
self.inner.waker.wake()
impl<T> SenderInner<T> {
fn close(&self) {
self.shared.closed.store(true, Ordering::Relaxed);
self.shared.waker.wake();
}
}

impl<T> Clone for AsyncSender<T> {
fn clone(&self) -> Self {
Self {
sender: Arc::clone(&self.sender),
inner: Arc::clone(&self.inner),
}
Self(Arc::clone(&self.0))
}
}

impl<T> Drop for SenderInner<T> {
fn drop(&mut self) {
self.close();
}
}

pub struct AsyncReceiver<T> {
receiver: Rc<Receiver<T>>,
inner: Arc<Inner>,
shared: Arc<Shared>,
}

impl<T> AsyncReceiver<T> {
pub async fn next(&self) -> Result<T, RecvError> {
future::poll_fn(|cx| match self.receiver.try_recv() {
Ok(event) => Poll::Ready(Ok(event)),
Err(TryRecvError::Empty) => {
self.inner.waker.register(cx.waker());
self.shared.waker.register(cx.waker());

match self.receiver.try_recv() {
Ok(event) => Poll::Ready(Ok(event)),
Err(TryRecvError::Empty) => {
if self.inner.closed.load(Ordering::Relaxed) {
if self.shared.closed.load(Ordering::Relaxed) {
Poll::Ready(Err(RecvError))
} else {
Poll::Pending
Expand All @@ -104,12 +106,18 @@ impl<T> Clone for AsyncReceiver<T> {
fn clone(&self) -> Self {
Self {
receiver: Rc::clone(&self.receiver),
inner: Arc::clone(&self.inner),
shared: Arc::clone(&self.shared),
}
}
}

struct Inner {
impl<T> Drop for AsyncReceiver<T> {
fn drop(&mut self) {
self.shared.closed.store(true, Ordering::Relaxed);
}
}

struct Shared {
closed: AtomicBool,
waker: AtomicWaker,
}
6 changes: 0 additions & 6 deletions src/platform_impl/web/async/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,6 @@ impl<T> Dispatcher<T> {
}
}

impl<T> Drop for Dispatcher<T> {
fn drop(&mut self) {
self.0.with_sender_data(|sender| sender.close())
}
}

pub struct DispatchRunner<T: 'static> {
wrapper: Wrapper<true, T, AsyncSender<Closure<T>>, Closure<T>>,
receiver: AsyncReceiver<Closure<T>>,
Expand Down
2 changes: 1 addition & 1 deletion src/platform_impl/web/async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod dispatcher;
mod waker;
mod wrapper;

use self::channel::{channel, AsyncReceiver, AsyncSender};
pub use self::channel::{channel, AsyncReceiver, AsyncSender};
pub use self::dispatcher::{DispatchRunner, Dispatcher};
pub use self::waker::{Waker, WakerSpawner};
use self::wrapper::Wrapper;
Loading

0 comments on commit 1f03bbc

Please sign in to comment.