Skip to content

Commit 6d801ad

Browse files
committed
Fix up the potential leaked future/stream.wake()
1 parent b255938 commit 6d801ad

File tree

2 files changed

+70
-53
lines changed

2 files changed

+70
-53
lines changed

src/common.rs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,10 @@ impl<T> LinkedListAsync<T> {
166166
if alive {
167167
self.inner.lock().unwrap().push_back(input);
168168
}
169+
170+
self.wake();
169171
}
170172

171-
self.wake();
172173
return;
173174
}
174175

@@ -183,9 +184,10 @@ impl<T> LinkedListAsync<T> {
183184
}
184185

185186
#[cfg(feature = "for_futures")]
187+
#[inline]
186188
fn wake(&self) {
187189
if let Some(waker) = self.waker.lock().unwrap().take() {
188-
waker.wake()
190+
waker.wake();
189191
}
190192
}
191193

@@ -196,9 +198,12 @@ impl<T> LinkedListAsync<T> {
196198

197199
#[cfg(feature = "for_futures")]
198200
pub fn close_stream(&mut self) {
199-
self.alive.lock().unwrap().store(false, Ordering::SeqCst);
201+
{
202+
let alive = self.alive.lock().unwrap();
203+
alive.store(false, Ordering::SeqCst);
200204

201-
self.wake()
205+
self.wake()
206+
}
202207
}
203208
}
204209

@@ -210,28 +215,30 @@ where
210215
type Item = T;
211216

212217
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
218+
let mut inner = self.inner.lock().unwrap();
219+
let alive = self.alive.lock().unwrap();
220+
let mut waker = self.waker.lock().unwrap();
221+
213222
let picked: Option<T>;
214-
{
215-
picked = self.inner.lock().unwrap().pop_front();
216-
}
223+
// {
224+
// picked = self.pop_front();
225+
// }
226+
picked = inner.pop_front();
217227
if picked.is_some() {
218228
return Poll::Ready(picked);
219229
}
220230

221231
// Check alive
222-
let alive = { self.alive.lock().unwrap().load(Ordering::SeqCst) };
223-
if alive {
232+
if alive.load(Ordering::SeqCst) {
224233
// Check cached
225-
let picked: Option<T>;
226-
{
227-
picked = self.inner.lock().unwrap().pop_front();
228-
}
234+
// let picked: Option<T>;
235+
// picked = inner.pop_front();
229236

230237
// Check Pending(None) or Ready(Some(item))
231238
if picked.is_none() {
232239
// Keep Pending
233240
{
234-
self.waker.lock().unwrap().replace(cx.waker().clone());
241+
waker.replace(cx.waker().clone());
235242
};
236243
return Poll::Pending;
237244
}

src/sync.rs

Lines changed: 49 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -126,23 +126,20 @@ where
126126
T: Clone + Send + Sync + 'static,
127127
{
128128
fn is_started(&mut self) -> bool {
129-
let _started_alive = self.started_alive.clone();
130-
let started_alive = _started_alive.lock().unwrap();
129+
let started_alive = self.started_alive.lock().unwrap();
131130
let &(ref started, _) = &*started_alive;
132131
started.load(Ordering::SeqCst)
133132
}
134133

135134
fn is_alive(&mut self) -> bool {
136-
let _started_alive = self.started_alive.clone();
137-
let started_alive = _started_alive.lock().unwrap();
135+
let started_alive = self.started_alive.lock().unwrap();
138136
let &(_, ref alive) = &*started_alive;
139137
alive.load(Ordering::SeqCst)
140138
}
141139

142140
fn start(&mut self) {
143141
{
144-
let _started_alive = self.started_alive.clone();
145-
let started_alive = _started_alive.lock().unwrap();
142+
let started_alive = self.started_alive.lock().unwrap();
146143
let &(ref started, ref alive) = &*started_alive;
147144

148145
if started.load(Ordering::SeqCst) {
@@ -159,23 +156,23 @@ where
159156
let _effect = self.effect.clone();
160157
let _publisher = self.publisher.clone();
161158

162-
let _started_alive = self.started_alive.clone();
163-
164159
let mut handler = self.handler.lock().unwrap();
165160
handler.start();
166161
handler.post(RawFunc::new(move || {
167-
let effect = &mut *_effect.lock().unwrap();
168-
let result = (effect)();
169-
(*this.result.lock().unwrap()) = Some(result.clone());
170-
_publisher.lock().unwrap().publish(result);
162+
let result = { (_effect.lock().unwrap())() };
163+
{
164+
(*this.result.lock().unwrap()) = Some(result.clone());
165+
}
166+
{
167+
_publisher.lock().unwrap().publish(result);
168+
}
171169
this.stop();
172170
}));
173171
}
174172

175173
fn stop(&mut self) {
176174
{
177-
let _started_alive = self.started_alive.clone();
178-
let started_alive = _started_alive.lock().unwrap();
175+
let started_alive = self.started_alive.lock().unwrap();
179176
let &(ref started, ref alive) = &*started_alive;
180177

181178
if !started.load(Ordering::SeqCst) {
@@ -219,11 +216,9 @@ where
219216
if self.is_started() && (!self.is_alive()) {
220217
Poll::Ready(self.result())
221218
} else {
222-
if !self.is_started() {
223-
self.start();
219+
{
220+
self.waker.lock().unwrap().replace(cx.waker().clone());
224221
}
225-
226-
self.waker.lock().unwrap().replace(cx.waker().clone());
227222
Poll::Pending
228223
}
229224
}
@@ -260,17 +255,19 @@ impl CountDownLatch {
260255
}
261256

262257
pub fn countdown(&self) {
263-
let &(ref lock, ref cvar) = &*self.pair.clone();
264-
let mut started = lock.lock().unwrap();
265-
if *started > 0 {
266-
*started -= 1;
267-
}
268-
cvar.notify_one();
269-
270-
#[cfg(feature = "for_futures")]
271258
{
272-
if let Some(waker) = self.waker.lock().unwrap().take() {
273-
waker.wake()
259+
let &(ref lock, ref cvar) = &*self.pair.clone();
260+
let mut started = lock.lock().unwrap();
261+
if *started > 0 {
262+
*started -= 1;
263+
}
264+
cvar.notify_one();
265+
266+
#[cfg(feature = "for_futures")]
267+
{
268+
if let Some(waker) = self.waker.lock().unwrap().take() {
269+
waker.wake()
270+
}
274271
}
275272
}
276273
}
@@ -306,9 +303,13 @@ impl Future for CountDownLatch {
306303
type Output = ();
307304

308305
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
309-
let &(ref remaining, _) = &*self.pair.clone();
310-
if *remaining.lock().unwrap() > 0 {
311-
self.waker.lock().unwrap().replace(cx.waker().clone());
306+
let pair = self.pair.clone();
307+
let &(ref remaining, _) = &*pair;
308+
let count = remaining.lock().unwrap();
309+
if *count > 0 {
310+
{
311+
self.waker.lock().unwrap().replace(cx.waker().clone());
312+
}
312313
Poll::Pending
313314
} else {
314315
Poll::Ready(())
@@ -466,7 +467,7 @@ where
466467
}
467468

468469
{
469-
let result = self.blocking_recever.lock().unwrap().try_recv();
470+
let result = { self.blocking_recever.lock().unwrap().try_recv() };
470471

471472
match result {
472473
Ok(v) => Ok(v),
@@ -483,15 +484,15 @@ where
483484
{
484485
match self.timeout {
485486
Some(duration) => {
486-
let result = self.blocking_recever.lock().unwrap().recv_timeout(duration);
487+
let result = { self.blocking_recever.lock().unwrap().recv_timeout(duration) };
487488

488489
match result {
489490
Ok(v) => Ok(v),
490491
Err(e) => Err(Box::new(e)),
491492
}
492493
}
493494
None => {
494-
let result = self.blocking_recever.lock().unwrap().recv();
495+
let result = { self.blocking_recever.lock().unwrap().recv() };
495496

496497
match result {
497498
Ok(v) => Ok(v),
@@ -542,7 +543,8 @@ where
542543
#[cfg(feature = "for_futures")]
543544
#[futures_test::test]
544545
async fn test_sync_future() {
545-
let wa = WillAsync::new(move || 1);
546+
let mut wa = WillAsync::new(move || 1);
547+
wa.start();
546548

547549
assert_eq!(Some(1), wa.await);
548550

@@ -552,25 +554,33 @@ async fn test_sync_future() {
552554
let latch = CountDownLatch::new(4);
553555
let latch2 = latch.clone();
554556

555-
let _ = pub1.subscribe(Arc::new(SubscriptionFunc::new(move |_| {
556-
println!("{:?}", "test_sync_future");
557+
let _ = pub1.subscribe(Arc::new(SubscriptionFunc::new(move |y| {
558+
println!("test_sync_future {:?}", y);
557559
latch2.countdown();
558560
})));
559561

562+
println!("test_sync_future before Publisher.start()");
563+
560564
{
561565
let h = &mut _h.lock().unwrap();
562566

563-
println!("hh2");
567+
println!("test_sync_future hh2");
564568
h.start();
565-
println!("hh2 running");
569+
println!("test_sync_future hh2 running");
566570
}
571+
std::thread::sleep(Duration::from_millis(10));
567572

568573
pub1.publish(1);
574+
println!("test_sync_future pub1.publish");
569575
pub1.publish(2);
576+
println!("test_sync_future pub1.publish");
570577
pub1.publish(3);
578+
println!("test_sync_future pub1.publish");
571579
pub1.publish(4);
580+
println!("test_sync_future pub1.publish");
572581

573582
let _ = latch.await;
583+
println!("test_sync_future done");
574584
}
575585

576586
#[test]

0 commit comments

Comments
 (0)