Skip to content

Commit a9fde18

Browse files
authored
Merge pull request #289 from Tim-Zhang/fix-client-stream-receiver
stream/client: Fix early closing of socket caused by Client's lifetime
2 parents d713dd4 + c1f56bb commit a9fde18

File tree

4 files changed

+9
-5
lines changed

4 files changed

+9
-5
lines changed

src/asynchronous/client.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ impl Client {
5555
};
5656

5757
let conn = Connection::new(stream, delegate);
58+
// Long-running receiver task
5859
tokio::spawn(async move { conn.run().await });
5960

6061
Client {
@@ -86,9 +87,7 @@ impl Client {
8687
.map_err(|_| Error::LocalClosed)?;
8788

8889
let result = if timeout_nano == 0 {
89-
rx.recv()
90-
.await
91-
.ok_or_else(|| Error::RemoteClosed)?
90+
rx.recv().await.ok_or_else(|| Error::RemoteClosed)?
9291
} else {
9392
tokio::time::timeout(
9493
std::time::Duration::from_nanos(timeout_nano as u64),

src/asynchronous/connection.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ where
5959

6060
let (reader_delegate, mut writer_delegate) = builder.build();
6161

62+
// Long-running sender task
6263
let writer_task = tokio::spawn(async move {
6364
while let Some(mut sending_msg) = writer_delegate.recv().await {
6465
trace!("write message: {:?}", sending_msg.msg);

src/asynchronous/stream.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::sync::{Arc, Mutex};
1111

1212
use tokio::sync::mpsc;
1313

14+
use super::Client;
1415
use crate::error::{Error, Result};
1516
use crate::proto::{
1617
Code, Codec, GenMessage, MessageHeader, Response, FLAG_NO_DATA, FLAG_REMOTE_CLOSED,
@@ -283,17 +284,20 @@ where
283284
pub struct ClientStreamReceiver<P> {
284285
inner: StreamReceiver,
285286
_recv: PhantomData<P>,
287+
// Hold the req_tx in Client to keep receiver task running
288+
_client_guard: Client,
286289
}
287290

288291
impl<P> ClientStreamReceiver<P>
289292
where
290293
P: Codec,
291294
<P as Codec>::E: std::fmt::Display,
292295
{
293-
pub fn new(inner: StreamInner) -> Self {
296+
pub fn new(inner: StreamInner, _client_guard: Client) -> Self {
294297
Self {
295298
inner: inner.split().1,
296299
_recv: PhantomData,
300+
_client_guard,
297301
}
298302
}
299303

src/asynchronous/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ macro_rules! async_client_stream_receive {
226226
}
227227

228228
let inner = $self.client.new_stream(creq, false, true).await?;
229-
let stream = ::ttrpc::r#async::ClientStreamReceiver::new(inner);
229+
let stream = ::ttrpc::r#async::ClientStreamReceiver::new(inner, $self.client.clone());
230230

231231
return Ok(stream);
232232
};

0 commit comments

Comments
 (0)