Skip to content

Commit 11a1ce2

Browse files
JohnDonethLucioFranco
authored andcommitted
v0.1.x: Fix UdpFramed with regards to Decode (#1444)
* add test for using LinesCodec with UdpFramed * fix UdpFramed decode * rustfmt
1 parent c9532e4 commit 11a1ce2

File tree

2 files changed

+61
-15
lines changed

2 files changed

+61
-15
lines changed

tokio-udp/src/frame.rs

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ pub struct UdpFramed<C> {
3333
wr: BytesMut,
3434
out_addr: SocketAddr,
3535
flushed: bool,
36+
is_readable: bool,
37+
current_addr: Option<SocketAddr>,
3638
}
3739

3840
impl<C: Decoder> Stream for UdpFramed<C> {
@@ -42,19 +44,37 @@ impl<C: Decoder> Stream for UdpFramed<C> {
4244
fn poll(&mut self) -> Poll<Option<(Self::Item)>, Self::Error> {
4345
self.rd.reserve(INITIAL_RD_CAPACITY);
4446

45-
let (n, addr) = unsafe {
46-
// Read into the buffer without having to initialize the memory.
47-
let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut()));
48-
self.rd.advance_mut(n);
49-
(n, addr)
50-
};
51-
trace!("received {} bytes, decoding", n);
52-
let frame_res = self.codec.decode(&mut self.rd);
53-
self.rd.clear();
54-
let frame = frame_res?;
55-
let result = frame.map(|frame| (frame, addr)); // frame -> (frame, addr)
56-
trace!("frame decoded from buffer");
57-
Ok(Async::Ready(result))
47+
loop {
48+
// Are there are still bytes left in the read buffer to decode?
49+
if self.is_readable {
50+
if let Some(frame) = self.codec.decode(&mut self.rd)? {
51+
trace!("frame decoded from buffer");
52+
53+
let current_addr = self
54+
.current_addr
55+
.expect("will always be set before this line is called");
56+
57+
return Ok(Async::Ready(Some((frame, current_addr))));
58+
}
59+
60+
// if this line has been reached then decode has returned `None`.
61+
self.is_readable = false;
62+
self.rd.clear();
63+
}
64+
65+
// We're out of data. Try and fetch more data to decode
66+
let (n, addr) = unsafe {
67+
// Read into the buffer without having to initialize the memory.
68+
let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut()));
69+
self.rd.advance_mut(n);
70+
(n, addr)
71+
};
72+
73+
self.current_addr = Some(addr);
74+
self.is_readable = true;
75+
76+
trace!("received {} bytes, decoding", n);
77+
}
5878
}
5979
}
6080

@@ -126,6 +146,8 @@ impl<C> UdpFramed<C> {
126146
rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY),
127147
wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY),
128148
flushed: true,
149+
is_readable: false,
150+
current_addr: None,
129151
}
130152
}
131153

tokio-udp/tests/udp.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::net::SocketAddr;
1212
use futures::{Future, Poll, Sink, Stream};
1313

1414
use bytes::{BufMut, BytesMut};
15-
use tokio_codec::{Decoder, Encoder};
15+
use tokio_codec::{Decoder, Encoder, LinesCodec};
1616
use tokio_udp::{UdpFramed, UdpSocket};
1717

1818
macro_rules! t {
@@ -247,7 +247,7 @@ impl Encoder for ByteCodec {
247247
}
248248

249249
#[test]
250-
fn send_framed() {
250+
fn send_framed_byte_codec() {
251251
drop(env_logger::try_init());
252252

253253
let mut a_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse())));
@@ -288,3 +288,27 @@ fn send_framed() {
288288
assert_eq!(a_addr, addr);
289289
}
290290
}
291+
292+
#[test]
293+
fn send_framed_lines_codec() {
294+
drop(env_logger::try_init());
295+
296+
let a_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse())));
297+
let b_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse())));
298+
let a_addr = t!(a_soc.local_addr());
299+
let b_addr = t!(b_soc.local_addr());
300+
301+
let a = UdpFramed::new(a_soc, ByteCodec);
302+
let b = UdpFramed::new(b_soc, LinesCodec::new());
303+
304+
let msg = b"1\r\n2\r\n3\r\n".to_vec();
305+
306+
let send = a.send((msg.clone(), b_addr));
307+
t!(send.wait());
308+
309+
let mut recv = Stream::wait(b).map(|e| e.unwrap());
310+
311+
assert_eq!(recv.next(), Some(("1".to_string(), a_addr)));
312+
assert_eq!(recv.next(), Some(("2".to_string(), a_addr)));
313+
assert_eq!(recv.next(), Some(("3".to_string(), a_addr)));
314+
}

0 commit comments

Comments
 (0)