Description
Hi, I know the version 15.0 is in the progress, but I still work on 14.2.0 which depends on futures 0.1.29
.
I wrote a AsyncResponse
which implement future::Future
, and it wait response from tokio::sync::mpsc::channel
.
The problem is that it only polled twice when response is not ready.
#[derive(Debug)]
struct AsyncResponse {
recv: Receiver<usize>,
}
impl RpcFuture for AsyncResponse {
type Item = Result<String>;
type Error = types::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.recv.try_recv() {
Err(TryRecvError::Empty) => {
info!("AsyncResponse not ready.");
Ok(Async::NotReady)
}
Err(TryRecvError::Closed) => {
info!("AsyncResponse closed.");
Err(Error::new(ErrorCode::InternalError).into())
}
Ok(value) => {
info!("AsyncResponse ready, response value: {}", value);
Ok(Async::Ready(Ok(value.to_string())))
}
}
}
}
#[derive(Default, Clone)]
pub struct Meta(pub Option<UnboundedSender<(usize, Sender<usize>)>>);
impl Metadata for Meta {}
#[rpc(server)]
pub trait Rpc {
type Metadata;
/// Performs asynchronous operation
#[rpc(meta, name = "beFancy")]
fn call(&self, meta: Self::Metadata) -> BoxFuture<Result<String>>;
}
#[derive(Default, Clone)]
struct RpcImpl;
impl Rpc for RpcImpl {
type Metadata = Meta;
fn call(&self, meta: Self::Metadata) -> BoxFuture<Result<String>> {
let id = REQ_ID.fetch_add(1, Ordering::SeqCst);
let (tx, rx) = mpsc::channel::<usize>(1);
if let Some(sender) = meta.0 {
sender.send((id, tx)).unwrap();
}
Box::new(AsyncResponse { recv: rx })
}
}
fn main() {
env::set_var("RUST_LOG", "info");
pretty_env_logger::init();
let mut rt = runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.expect("Runtime build failed.");
let (broker_sender, mut broker_receiver) = mpsc::unbounded_channel();
{
let sender = broker_sender.clone();
rt.spawn(async {
let mut io = MetaIoHandler::default();
let rpc = RpcImpl;
io.extend_with(rpc.to_delegate());
let _server = ServerBuilder::new(io)
.meta_extractor(move |_: &hyper::Request<hyper::Body>| {
info!("Meta extractor called.");
Meta(Some(sender.clone()))
})
.start_http(&"127.0.0.1:9527".parse().unwrap())
.expect("Unable to start RPC server");
_server.wait();
});
}
rt.block_on(async move {
let mut rpc_resps: HashMap<usize, Sender<usize>> = HashMap::new();
info!("Borker loop start...");
loop {
if let Some((id, mut sender)) = broker_receiver.recv().await {
info!("Broker received: id({}).", id);
// Sleep for awhile
thread::sleep(Duration::from_secs(2));
sender.send(id * id).await.unwrap();
info!("Broker sent: id({})", id);
rpc_resps.insert(id, sender);
} else {
info!("Broker channel broken.");
break;
}
}
info!("Broker loop finished.");
});
}
The broker loop should send reply from other broker channel events, but for the simplicity, it just sleep for awhile.
Output shows that AsyncResponse can receive succeed ONLY if broker send immediately(without sleep).
No sleep:
INFO jrpc > Borker loop start...
INFO jrpc > Meta extractor called.
INFO jrpc > AsyncResponse not ready.
INFO jrpc > Broker received: id(0).
INFO jrpc > Broker sent: id(0)
INFO jrpc > AsyncResponse ready, response value: 0
Wait for awhile:
INFO jrpc > Borker loop start...
INFO jrpc > Meta extractor called.
INFO jrpc > AsyncResponse not ready.
INFO jrpc > Broker received: id(0).
INFO jrpc > AsyncResponse not ready.
INFO jrpc > Broker sent: id(0)
^C
No more poll
function called, the future just hang in there.
Full code can be found here.
Am I doing anything wrong?
Second question is that the example codes show Rpc
trait method should return Box::pin
for FutureResult, but it get compile error, say it should pin in the heap:
70 | fn call(&self, meta: Self::Metadata) -> BoxFuture<Result<String>> {
| ------------------------- expected `std::boxed::Box<(dyn futures::future::Future<Error = jsonrpc_core::types::error::Error, Item = std::result::Result<std::string::String, jsonrpc_core::types::error::Error>> + std::marker::Send + 'static)>` because of return type
...
76 | Box::pin(AsyncResponse { recv: rx })
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| |
| expected struct `std::boxed::Box`, found struct `std::pin::Pin`
| help: store this in the heap by calling `Box::new`: `Box::new(Box::pin(AsyncResponse { recv: rx }))`
= note: expected struct `std::boxed::Box<(dyn futures::future::Future<Error = jsonrpc_core::types::error::Error, Item = std::result::Result<std::string::String, jsonrpc_core::types::error::Error>> + std::marker::Send + 'static)>`
found struct `std::pin::Pin<std::boxed::Box<AsyncResponse>>`
= note: for more on the distinction between the stack and the heap, read https://doc.rust-lang.org/book/ch15-01-box.html, https://doc.rust-lang.org/rust-by-example/std/box.html, and https://doc.rust-lang.org/std/boxed/index.html
Any help will be appreciate!