Skip to content

Commit 907a68d

Browse files
committed
Implement BlockingQueue.take_result()/poll_result() as Future. Avoid dead locking of sahred_thread_pool()
1 parent 60da9e1 commit 907a68d

File tree

3 files changed

+103
-27
lines changed

3 files changed

+103
-27
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "fp_rust"
3-
version = "0.2.4"
3+
version = "0.2.5"
44
license = "MIT"
55
authors = ["JunYi JohnTeee Lee <johnteee@gmail.com>"]
66
edition = "2018"

src/monadio.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,14 @@ impl<Y: 'static + Send + Sync + Clone> MonadIO<Y> {
6262
pub async fn to_future(&self) -> Result<Arc<Y>, Box<dyn Error>> {
6363
// let mio = self.map(|y| y);
6464
let mio = self.clone();
65-
let result = shared_thread_pool()
66-
.inner
67-
.lock()
68-
.unwrap()
69-
.spawn_with_handle(async move { mio.eval() })?
70-
.await;
65+
let future = {
66+
shared_thread_pool()
67+
.inner
68+
.lock()
69+
.unwrap()
70+
.spawn_with_handle(async move { mio.eval() })?
71+
};
72+
let result = future.await;
7173
Ok(result)
7274
}
7375
}

src/sync.rs

Lines changed: 94 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,18 @@ of general async handling features.
55

66
use std::sync::{
77
atomic::{AtomicBool, Ordering},
8-
mpsc, Arc, Condvar, Mutex,
8+
mpsc,
9+
mpsc::RecvTimeoutError,
10+
Arc, Condvar, Mutex,
911
};
1012
use std::time::Duration;
1113

14+
#[cfg(feature = "for_futures")]
15+
use super::common::shared_thread_pool;
16+
#[cfg(feature = "for_futures")]
17+
use crate::futures::task::SpawnExt;
18+
// #[cfg(feature = "for_futures")]
19+
use std::error::Error;
1220
#[cfg(feature = "for_futures")]
1321
use std::future::Future;
1422
#[cfg(feature = "for_futures")]
@@ -412,18 +420,16 @@ where
412420
}
413421

414422
fn poll(&mut self) -> Option<T> {
415-
if !self.is_alive() {
416-
return None::<T>;
417-
}
418-
419-
{
420-
let result = self.blocking_recever.lock().unwrap().try_recv();
423+
let result = self.poll_result();
421424

422-
if self.panic && result.is_err() {
423-
std::panic::panic_any(result.err());
424-
}
425+
if self.panic && result.is_err() {
426+
std::panic::panic_any(result.err());
427+
// return None;
428+
}
425429

426-
result.ok()
430+
match result {
431+
Ok(v) => Some(v),
432+
Err(_) => None,
427433
}
428434
}
429435

@@ -434,34 +440,102 @@ where
434440
}
435441

436442
fn take(&mut self) -> Option<T> {
443+
let result = self.take_result();
444+
445+
if self.panic && result.is_err() {
446+
std::panic::panic_any(result.err());
447+
// return None;
448+
}
449+
450+
match result {
451+
Ok(v) => Some(v),
452+
Err(_) => None,
453+
}
454+
}
455+
}
456+
457+
impl<T> BlockingQueue<T>
458+
where
459+
T: Send + 'static,
460+
{
461+
pub fn poll_result(&mut self) -> Result<T, Box<dyn Error + Send>> {
437462
if !self.is_alive() {
438-
return None::<T>;
463+
return Err(Box::new(RecvTimeoutError::Disconnected));
464+
}
465+
466+
{
467+
let result = self.blocking_recever.lock().unwrap().try_recv();
468+
469+
match result {
470+
Ok(v) => Ok(v),
471+
Err(e) => Err(Box::new(e)),
472+
}
473+
}
474+
}
475+
476+
pub fn take_result(&mut self) -> Result<T, Box<dyn Error + Send>> {
477+
if !self.is_alive() {
478+
return Err(Box::new(RecvTimeoutError::Disconnected));
439479
}
440480

441481
{
442482
match self.timeout {
443483
Some(duration) => {
444484
let result = self.blocking_recever.lock().unwrap().recv_timeout(duration);
445485

446-
if self.panic && result.is_err() {
447-
std::panic::panic_any(result.err());
486+
match result {
487+
Ok(v) => Ok(v),
488+
Err(e) => Err(Box::new(e)),
448489
}
449-
450-
result.ok()
451490
}
452491
None => {
453492
let result = self.blocking_recever.lock().unwrap().recv();
454493

455-
if self.panic && result.is_err() {
456-
std::panic::panic_any(result.err());
494+
match result {
495+
Ok(v) => Ok(v),
496+
Err(e) => Err(Box::new(e)),
457497
}
458-
459-
result.ok()
460498
}
461499
}
462500
}
463501
}
464502
}
503+
#[cfg(feature = "for_futures")]
504+
impl<T> BlockingQueue<T>
505+
where
506+
T: Send + 'static + Clone,
507+
{
508+
pub async fn poll_result_as_future(&mut self) -> Result<T, Box<dyn Error + Send>> {
509+
let mut queue = self.clone();
510+
511+
let spawn_future_result = {
512+
shared_thread_pool()
513+
.inner
514+
.lock()
515+
.unwrap()
516+
.spawn_with_handle(async move { queue.poll_result() })
517+
};
518+
match spawn_future_result {
519+
Ok(future) => future.await,
520+
Err(e) => Err(Box::new(e)),
521+
}
522+
}
523+
pub async fn take_result_as_future(&mut self) -> Result<T, Box<dyn Error + Send>> {
524+
let mut queue = self.clone();
525+
526+
let spawn_future_result = {
527+
shared_thread_pool()
528+
.inner
529+
.lock()
530+
.unwrap()
531+
.spawn_with_handle(async move { queue.take_result() })
532+
};
533+
match spawn_future_result {
534+
Ok(future) => future.await,
535+
Err(e) => Err(Box::new(e)),
536+
}
537+
}
538+
}
465539

466540
#[cfg(feature = "for_futures")]
467541
#[futures_test::test]

0 commit comments

Comments
 (0)