Skip to content

Commit dcf48db

Browse files
committed
Use LinkedList for SubscriptionFunc caching
1 parent 825add3 commit dcf48db

File tree

1 file changed

+3
-22
lines changed

1 file changed

+3
-22
lines changed

src/common.rs

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use futures::executor::ThreadPool;
1414
#[cfg(feature = "for_futures")]
1515
use futures::stream::Stream;
1616
#[cfg(feature = "for_futures")]
17-
use std::collections::VecDeque;
17+
use std::collections::LinkedList;
1818
#[cfg(feature = "for_futures")]
1919
use std::mem;
2020
#[cfg(feature = "for_futures")]
@@ -30,9 +30,6 @@ use std::task::{Context, Poll, Waker};
3030
// pub trait FnMutReceiveThreadSafe<X>: FnMut(Arc<X>) + Send + Sync + 'static {}
3131
// pub trait FnMutReturnThreadSafe<X>: FnMut() -> X + Send + Sync + 'static {}
3232

33-
#[cfg(feature = "for_futures")]
34-
static DEFAULT_STREAM_CACHED_CAPACITY_BLOCK_SIZE: usize = 10;
35-
3633
#[cfg(feature = "for_futures")]
3734
#[derive(Clone)]
3835
pub struct SharedThreadPoolReader {
@@ -255,9 +252,7 @@ pub struct SubscriptionFunc<T> {
255252
pub receiver: RawReceiver<T>,
256253

257254
#[cfg(feature = "for_futures")]
258-
cached: Option<Arc<Mutex<VecDeque<Arc<T>>>>>,
259-
#[cfg(feature = "for_futures")]
260-
cached_capacity_block_size: usize,
255+
cached: Option<Arc<Mutex<LinkedList<Arc<T>>>>>,
261256
#[cfg(feature = "for_futures")]
262257
alive: Option<Arc<Mutex<AtomicBool>>>,
263258
#[cfg(feature = "for_futures")]
@@ -277,8 +272,6 @@ impl<T> SubscriptionFunc<T> {
277272
#[cfg(feature = "for_futures")]
278273
cached: None,
279274
#[cfg(feature = "for_futures")]
280-
cached_capacity_block_size: DEFAULT_STREAM_CACHED_CAPACITY_BLOCK_SIZE,
281-
#[cfg(feature = "for_futures")]
282275
alive: None,
283276
#[cfg(feature = "for_futures")]
284277
waker: Arc::new(Mutex::new(None)),
@@ -298,8 +291,6 @@ impl<T> Clone for SubscriptionFunc<T> {
298291
#[cfg(feature = "for_futures")]
299292
cached: self.cached.clone(),
300293
#[cfg(feature = "for_futures")]
301-
cached_capacity_block_size: self.cached_capacity_block_size,
302-
#[cfg(feature = "for_futures")]
303294
alive: self.alive.clone(),
304295
#[cfg(feature = "for_futures")]
305296
waker: self.waker.clone(),
@@ -309,13 +300,6 @@ impl<T> Clone for SubscriptionFunc<T> {
309300

310301
#[cfg(feature = "for_futures")]
311302
impl<T> SubscriptionFunc<T> {
312-
#[cfg(feature = "for_futures")]
313-
pub fn set_stream_cached_capacity_block_size(&mut self, size: usize) {
314-
if size > 0 {
315-
self.cached_capacity_block_size = size;
316-
}
317-
}
318-
319303
pub fn close_stream(&mut self) {
320304
if let Some(alive) = &self.alive {
321305
{
@@ -361,9 +345,7 @@ where
361345
}
362346

363347
if self.cached.is_none() {
364-
self.cached = Some(Arc::new(Mutex::new(VecDeque::with_capacity(
365-
self.cached_capacity_block_size,
366-
))));
348+
self.cached = Some(Arc::new(Mutex::new(LinkedList::new())));
367349
}
368350
}
369351

@@ -398,7 +380,6 @@ impl<T: Send + Sync + 'static> Subscription<T> for SubscriptionFunc<T> {
398380
if alive {
399381
{
400382
let mut cached = cached.lock().unwrap();
401-
cached.reserve_exact(self.cached_capacity_block_size);
402383
cached.push_back(x.clone())
403384
};
404385
{

0 commit comments

Comments
 (0)