Skip to content

Commit 623966c

Browse files
committed
feat: add udp wrappers
1 parent e4a3bc0 commit 623966c

File tree

6 files changed

+248
-2
lines changed

6 files changed

+248
-2
lines changed

planglib/std/__private.pi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ use std::task::executor;
2323
use std::task::helper;
2424
use std::task::delay;
2525
use std::task::tcp;
26+
use std::task::udp;
2627
use std::json::encode;

planglib/std/libuv.pi

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ var UV_ASYNC = 1 as u32;
55
var UV_IDLE = 6 as u32;
66
var UV_TIMER = 13 as u32;
77
var UV_TCP = 12 as u32;
8+
var UV_UDP = 14 as u32;
89

910

1011
// below are request types
@@ -17,6 +18,7 @@ pub struct uv_timer_t {}
1718
pub struct uv_tcp_t {}
1819
pub struct uv_stream_t {}
1920
pub struct uv_write_t {}
21+
pub struct uv_udp_t {}
2022

2123
pub struct uv_buf_t {
2224
pub base:i64;
@@ -164,4 +166,17 @@ pub fn get_data_for_handle_raw<H|T> (handle:*H) *T {
164166
return unsafe_cast<T>(re);
165167
}
166168

169+
pub fn uv_udp_init(loop:*uv_loop_t, handle:*uv_udp_t) i32;
170+
171+
pub fn uv_udp_bind(handle:*uv_udp_t, addr:*sockaddr_in, flags:u32) i32;
172+
173+
pub fn uv_udp_recv_start(handle:*uv_udp_t, alloc_cb:*(), recv_cb:*()) i32;
174+
175+
pub fn uv_udp_send(req:*uv_write_t, handle:*uv_udp_t, bufs:*uv_buf_t, nbufs:u32, addr:*sockaddr_in, cb:*()) i32;
176+
177+
pub fn new_uv_udp_t() *uv_udp_t {
178+
let re = gc::malloc_pinned(uv_handle_size(UV_UDP) as i64);
179+
return unsafe_cast<uv_udp_t>(re);
180+
}
181+
167182

planglib/std/task/reactor.pi

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ struct StopEV {
2525
}
2626

2727

28-
type EV = TCPListenEV | TimerEV | StreamReadEV | ReadStopEV | WriteEV |StopEV;
28+
type EV = TCPListenEV | TimerEV | StreamReadEV | ReadStopEV | WriteEV | StopEV | UDPRecvEV | UDPSendEV;
2929

3030

3131

@@ -54,6 +54,20 @@ struct StreamReadEV {
5454
buf: [u8];
5555
}
5656

57+
struct UDPRecvEV {
58+
handle: *libuv::uv_udp_t;
59+
buf: [u8];
60+
read_cb: |i64|=>void;
61+
total_read: i64;
62+
}
63+
64+
struct UDPSendEV {
65+
handle: *libuv::uv_udp_t;
66+
buf: [u8];
67+
write_cb: |i32|=>void;
68+
addr: libuv::sockaddr_in;
69+
}
70+
5771
var GLOBAL_REACTOR = new_uv_reactor();
5872

5973

@@ -113,7 +127,6 @@ fn write_cb(req:*libuv::uv_write_t, status:i32) void {
113127

114128
use std::io;
115129

116-
117130
fn async_cb(async_t:*libuv::uv_async_t) void {
118131
let reactor = libuv::get_data_for_handle<libuv::uv_async_t|UVReactor>(async_t);
119132
let ch = reactor.ch;
@@ -172,6 +185,25 @@ fn async_cb(async_t:*libuv::uv_async_t) void {
172185
libuv::uv_stop(reactor.loop);
173186
libuv::uv_loop_close(GLOBAL_REACTOR.loop);
174187
}
188+
UDPRecvEV(ev) => {
189+
gc::keep_alive_pinned(ev.handle);
190+
let cb = unsafe_cast<()>(&udp_recv_cb);
191+
let alloc_cb = unsafe_cast<()>(&udp_alloc_cb);
192+
libuv::uv_udp_recv_start(ev.handle, alloc_cb, cb);
193+
libuv::set_data_for_handle(ev.handle, &ev);
194+
}
195+
UDPSendEV(ev) => {
196+
let req = libuv::new_uv_write_t();
197+
gc::keep_alive_pinned(req);
198+
let raw_slice_p = &ev.buf[0];
199+
gc::pin(raw_slice_p);
200+
let buf = libuv::uv_buf_init(&ev.buf[0], ev.buf.len() as u32);
201+
let buf_p = &buf;
202+
gc::pin(buf_p);
203+
libuv::set_data_for_handle(req, &ev);
204+
let cb = unsafe_cast<()>(&udp_send_cb);
205+
libuv::uv_udp_send(req, ev.handle, &buf, 1 as u32, &ev.addr, cb);
206+
}
175207
_ => {
176208
}
177209
}
@@ -290,6 +322,41 @@ impl UVReactor {
290322
return;
291323
}
292324

325+
pub fn new_udp_socket(udp_h:*libuv::uv_udp_t, ip:string, port:i32) void {
326+
let addr = libuv::sockaddr_in{};
327+
libuv::uv_ip4_addr(ip.cstr(), port, &addr);
328+
libuv::uv_udp_init(self.loop, udp_h);
329+
libuv::uv_udp_bind(udp_h, &addr, 0 as u32);
330+
return;
331+
}
332+
333+
pub fn udp_recv(udp:*libuv::uv_udp_t, buf:[u8], readcb:|i64|=>void) void {
334+
let ev = UDPRecvEV{
335+
handle: udp,
336+
buf: buf,
337+
read_cb: readcb,
338+
total_read: 0,
339+
};
340+
let e = ev as EV;
341+
self.ch.send(e);
342+
libuv::uv_async_send(self.async_t);
343+
return;
344+
}
345+
346+
pub fn udp_send(udp:*libuv::uv_udp_t, buf:[u8], ip:string, port:i32, writecb:|i32|=>void) void {
347+
let addr = libuv::sockaddr_in{};
348+
libuv::uv_ip4_addr(ip.cstr(), port, &addr);
349+
let ev = UDPSendEV{
350+
handle: udp,
351+
buf: buf,
352+
write_cb: writecb,
353+
addr: addr,
354+
};
355+
let e = ev as EV;
356+
self.ch.send(e);
357+
libuv::uv_async_send(self.async_t);
358+
return;
359+
}
293360
}
294361

295362

@@ -304,3 +371,27 @@ pub fn stop_global_reactor() void {
304371
GLOBAL_REACTOR.stop();
305372
return;
306373
}
374+
375+
fn udp_recv_cb(handle:*libuv::uv_udp_t, nread:i64, _buf:*libuv::uv_buf_t) void {
376+
let ev = libuv::get_data_for_handle<libuv::uv_udp_t|UDPRecvEV>(handle);
377+
ev.total_read = nread + ev.total_read;
378+
ev.read_cb(nread);
379+
return;
380+
}
381+
382+
fn udp_send_cb(req:*libuv::uv_write_t, status:i32) void {
383+
let ev = libuv::get_data_for_handle<libuv::uv_write_t|UDPSendEV>(req);
384+
ev.write_cb(status);
385+
return;
386+
}
387+
388+
fn udp_alloc_cb(handle:*(), _suggested_size:i64, buf:*libuv::uv_buf_t) void {
389+
let ev = libuv::get_data_for_handle<_|UDPRecvEV>(handle);
390+
let ptr = &ev.buf[ev.total_read];
391+
let l = ev.buf.len() - ev.total_read;
392+
if l<=0 {
393+
l = 0;
394+
}
395+
* buf = libuv::uv_buf_init(ptr, l as u32);
396+
return;
397+
}

planglib/std/task/udp.pi

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
use std::task::Task;
2+
use std::task::reactor;
3+
use std::task::executor;
4+
use std::libuv;
5+
use std::slice::SliceExt;
6+
7+
use std::io;
8+
9+
pub struct UDPSocket {
10+
handle:*libuv::uv_udp_t;
11+
}
12+
13+
pub fn new_udp_socket(handle:*libuv::uv_udp_t) UDPSocket {
14+
return UDPSocket{
15+
handle:handle,
16+
};
17+
}
18+
19+
impl UDPSocket {
20+
pub fn bind(ip:string, port:i32) void {
21+
reactor::GLOBAL_REACTOR.new_udp_socket(self.handle, ip, port);
22+
return;
23+
}
24+
25+
pub fn read_async(buf:[u8]) Task<i64> {
26+
// pin the buffer
27+
let buf_ptr = &buf[0];
28+
gc::pin(buf_ptr);
29+
let read_task = UDPReadTask{
30+
first:true,
31+
ready:false,
32+
handle: self.handle,
33+
read_len:0,
34+
buf:buf,
35+
};
36+
37+
return read_task as Task<i64>;
38+
}
39+
40+
pub fn send_async(buf:[u8], ip:string, port:i32) Task<i32> {
41+
let write_task = UDPWriteTask{
42+
first:true,
43+
ready:false,
44+
handle: self.handle,
45+
buf:buf,
46+
status:0 as i32,
47+
ip:ip,
48+
port:port,
49+
};
50+
return write_task as Task<i32>;
51+
}
52+
}
53+
54+
struct UDPWriteTask {
55+
first:bool;
56+
ready:bool;
57+
handle:*libuv::uv_udp_t;
58+
buf: [u8];
59+
status:i32;
60+
ip:string;
61+
port:i32;
62+
}
63+
64+
impl Task<i32> for UDPWriteTask {
65+
fn poll(wk:||=>void) Option<i32> {
66+
if self.first {
67+
self.first = false;
68+
let write_cb = |status| => {
69+
if self.ready {
70+
return;
71+
}
72+
self.status = status;
73+
self.ready = true;
74+
wk();
75+
return;
76+
};
77+
reactor::GLOBAL_REACTOR.udp_send(self.handle, self.buf, self.ip, self.port, write_cb);
78+
}
79+
80+
if self.ready {
81+
return self.status as Option<i32>;
82+
}
83+
return None{} as Option<i32>;
84+
}
85+
}
86+
87+
struct UDPReadTask {
88+
first:bool;
89+
ready:bool;
90+
handle:*libuv::uv_udp_t;
91+
read_len:i64;
92+
buf: [u8];
93+
}
94+
95+
impl Task<i64> for UDPReadTask {
96+
fn poll(wk:||=>void) Option<i64> {
97+
if self.first {
98+
self.first = false;
99+
let read_cb = |nread| => {
100+
if self.ready {
101+
return;
102+
}
103+
if nread<0 {
104+
// error
105+
self.read_len = -1;
106+
self.ready = true;
107+
wk();
108+
return;
109+
}
110+
self.read_len = nread + self.read_len;
111+
if self.read_len >= self.buf.len() {
112+
self.ready = true;
113+
wk();
114+
}
115+
return;
116+
};
117+
reactor::GLOBAL_REACTOR.udp_recv(self.handle, self.buf, read_cb);
118+
}
119+
if self.ready {
120+
return self.read_len as Option<i64>;
121+
}
122+
return None{} as Option<i64>;
123+
}
124+
}

test/main.pi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ async fn main() Task<()> {
7373
_io::test_io();
7474
// future_test::test_future();
7575
std_test::test_std();
76+
// await std_test::test_udp();
7677

7778
await std_test::test_nested_async_closure();
7879
await std_test::test_nested_async_closure_in_normal_f();

test/test/std_test.pi

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,4 +226,18 @@ pub struct JSONObj<T> {
226226
b: string;
227227
}
228228

229+
use std::task::udp;
230+
use std::libuv;
231+
232+
pub async fn test_udp() Task<()> {
233+
let udp_t = udp::new_udp_socket(libuv::new_uv_udp_t());
234+
udp_t.bind("127.0.0.1", 8080 as i32);
235+
let buf = [u8 * 10;];
236+
let nread = await udp_t.read_async(buf);
237+
println!("read: ", nread);
238+
let s = string_from_bytes(buf);
239+
println!("udp msp:", s);
240+
241+
return ();
242+
}
229243

0 commit comments

Comments
 (0)