Skip to content

Commit f4b87c7

Browse files
robarnoldbrson
authored andcommitted
Basic async IO module using libuv
1 parent b64a52d commit f4b87c7

16 files changed

+722
-14
lines changed

mk/rt.mk

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ RUNTIME_CS := rt/sync/timer.cpp \
1717
rt/rust_chan.cpp \
1818
rt/rust_port.cpp \
1919
rt/rust_upcall.cpp \
20+
rt/rust_uv.cpp \
2021
rt/rust_log.cpp \
2122
rt/rust_timer.cpp \
2223
rt/circular_buffer.cpp \

src/lib/aio.rs

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
import str::sbuf;
2+
import vec::vbuf;
3+
4+
native "rust" mod rustrt {
5+
type socket;
6+
type server;
7+
fn aio_init();
8+
fn aio_run();
9+
fn aio_stop();
10+
fn aio_connect(host: sbuf, port: int, connected: chan[socket]);
11+
fn aio_serve(host: sbuf, port: int, acceptChan: chan[socket]) -> server;
12+
fn aio_writedata(s: socket, buf: *u8, size: uint, status: chan[bool]);
13+
fn aio_read(s: socket, reader: chan[u8[]]);
14+
fn aio_close_server(s: server, status: chan[bool]);
15+
fn aio_close_socket(s: socket);
16+
fn aio_is_null_client(s: socket) -> bool;
17+
}
18+
19+
type server = rustrt::server;
20+
type client = rustrt::socket;
21+
tag pending_connection {
22+
remote(str,int);
23+
incoming(server);
24+
}
25+
26+
tag socket_event {
27+
connected(client);
28+
closed;
29+
received(u8[]);
30+
}
31+
32+
tag server_event {
33+
pending(chan[chan[socket_event]]);
34+
}
35+
36+
tag request {
37+
quit;
38+
connect(pending_connection,chan[socket_event]);
39+
serve(str,int,chan[server_event],chan[server]);
40+
write(client,u8[],chan[bool]);
41+
close_server(server, chan[bool]);
42+
close_client(client);
43+
}
44+
45+
type ctx = chan[request];
46+
47+
fn connect_task(ip: str, portnum: int, evt: chan[socket_event]) {
48+
let connecter: port[client] = port();
49+
rustrt::aio_connect(str::buf(ip), portnum, chan(connecter));
50+
let client: client;
51+
connecter |> client;
52+
new_client(client, evt);
53+
}
54+
55+
fn new_client(client: client, evt: chan[socket_event]) {
56+
// Start the read before notifying about the connect. This avoids a race
57+
// condition where the receiver can close the socket before we start
58+
// reading.
59+
let reader: port[u8[]] = port();
60+
rustrt::aio_read(client, chan(reader));
61+
62+
evt <| connected(client);
63+
64+
while (true) {
65+
log "waiting for bytes";
66+
let data: u8[];
67+
reader |> data;
68+
log "got some bytes";
69+
log ivec::len[u8](data);
70+
if (ivec::len[u8](data) == 0u) {
71+
log "got empty buffer, bailing";
72+
break;
73+
}
74+
log "got non-empty buffer, sending";
75+
evt <| received(data);
76+
log "sent non-empty buffer";
77+
}
78+
log "done reading";
79+
evt <| closed;
80+
log "close message sent";
81+
}
82+
83+
fn accept_task(client: client, events: chan[server_event]) {
84+
log "accept task was spawned";
85+
let p: port[chan[socket_event]] = port();
86+
events <| pending(chan(p));
87+
let evt: chan[socket_event];
88+
p |> evt;
89+
new_client(client, evt);
90+
log "done accepting";
91+
}
92+
93+
fn server_task(ip: str, portnum: int, events: chan[server_event],
94+
server: chan[server]) {
95+
let accepter: port[client] = port();
96+
server <| rustrt::aio_serve(str::buf(ip), portnum, chan(accepter));
97+
98+
let client: client;
99+
while (true) {
100+
log "preparing to accept a client";
101+
accepter |> client;
102+
if (rustrt::aio_is_null_client(client)) {
103+
log "client was actually null, returning";
104+
ret;
105+
} else {
106+
spawn accept_task(client, events);
107+
}
108+
}
109+
}
110+
111+
fn request_task(c: chan[ctx]) {
112+
// Create a port to accept IO requests on
113+
let p: port[request] = port();
114+
// Hand of its channel to our spawner
115+
c <| chan(p);
116+
log "uv run task spawned";
117+
// Spin for requests
118+
let req: request;
119+
while (true) {
120+
p |> req;
121+
alt req {
122+
quit. {
123+
log "got quit message";
124+
125+
log "stopping libuv";
126+
rustrt::aio_stop();
127+
ret;
128+
}
129+
connect(remote(ip,portnum),client) {
130+
spawn connect_task(ip, portnum, client);
131+
}
132+
serve(ip,portnum,events,server) {
133+
spawn server_task(ip, portnum, events, server);
134+
}
135+
write(socket,v,status) {
136+
rustrt::aio_writedata(socket,
137+
ivec::to_ptr[u8](v), ivec::len[u8](v),
138+
status);
139+
}
140+
close_server(server,status) {
141+
log "closing server";
142+
rustrt::aio_close_server(server,status);
143+
}
144+
close_client(client) {
145+
log "closing client";
146+
rustrt::aio_close_socket(client);
147+
}
148+
}
149+
}
150+
}
151+
152+
fn iotask(c: chan[ctx]) {
153+
log "io task spawned";
154+
// Initialize before accepting requests
155+
rustrt::aio_init();
156+
157+
log "io task init";
158+
// Spawn our request task
159+
let reqtask: task = spawn request_task(c);
160+
161+
log "uv run task init";
162+
// Enter IO loop. This never returns until aio_stop is called.
163+
rustrt::aio_run();
164+
log "waiting for request task to finish";
165+
166+
task::join(reqtask);
167+
}
168+
169+
fn new() -> ctx {
170+
let p: port[ctx] = port();
171+
let t: task = spawn iotask(chan(p));
172+
let cx: ctx;
173+
p |> cx;
174+
ret cx;
175+
}
176+
177+
// Local Variables:
178+
// mode: rust;
179+
// fill-column: 78;
180+
// indent-tabs-mode: nil
181+
// c-basic-offset: 4
182+
// buffer-file-coding-system: utf-8-unix
183+
// compile-command: "make -k -C .. 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
184+
// End:

src/lib/sio.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
type ctx = aio::ctx;
2+
type client = { ctx: ctx, client: aio::client, evt: port[aio::socket_event] };
3+
type server = { ctx: ctx, server: aio::server, evt: port[aio::server_event] };
4+
5+
fn new() -> ctx {
6+
ret aio::new();
7+
}
8+
9+
fn destroy(ctx: ctx) {
10+
ctx <| aio::quit;
11+
}
12+
13+
fn make_socket(ctx: ctx, p: port[aio::socket_event]) -> client {
14+
let evt: aio::socket_event;
15+
p |> evt;
16+
alt evt {
17+
aio::connected(client) {
18+
ret { ctx: ctx, client: client, evt: p };
19+
}
20+
}
21+
log_err ("Could not connect to client");
22+
fail;
23+
}
24+
25+
fn connect_to(ctx: ctx, ip: str, portnum: int) -> client {
26+
let p: port[aio::socket_event] = port();
27+
ctx <| aio::connect(aio::remote(ip, portnum), chan(p));
28+
ret make_socket(ctx, p);
29+
}
30+
31+
fn read(c: client) -> u8[] {
32+
let evt: aio::socket_event;
33+
c.evt |> evt;
34+
alt evt {
35+
aio::closed. {
36+
ret ~[];
37+
}
38+
aio::received(buf) {
39+
ret buf;
40+
}
41+
}
42+
}
43+
44+
fn create_server(ctx: ctx, ip: str, portnum: int) -> server {
45+
let evt: port[aio::server_event] = port();
46+
let p: port[aio::server] = port();
47+
ctx <| aio::serve(ip, portnum, chan(evt), chan(p));
48+
let srv: aio::server;
49+
p |> srv;
50+
ret { ctx: ctx, server: srv, evt: evt };
51+
}
52+
53+
fn accept_from(server: server) -> client {
54+
let evt: aio::server_event;
55+
server.evt |> evt;
56+
alt evt {
57+
aio::pending(callback) {
58+
let p: port[aio::socket_event] = port();
59+
callback <| chan(p);
60+
ret make_socket(server.ctx, p);
61+
}
62+
}
63+
}
64+
65+
fn write_data(c: client, data: u8[]) -> bool {
66+
let p: port[bool] = port();
67+
c.ctx <| aio::write(c.client, data, chan(p));
68+
let success: bool;
69+
p |> success;
70+
ret success;
71+
}
72+
73+
fn close_server(server: server) {
74+
// TODO: make this unit once we learn to send those from native code
75+
let p: port[bool] = port();
76+
server.ctx <| aio::close_server(server.server, chan(p));
77+
let success: bool;
78+
log "Waiting for close";
79+
p |> success;
80+
log "Got close";
81+
}
82+
83+
fn close_client(client: client) {
84+
client.ctx <| aio::close_client(client.client);
85+
let evt: aio::socket_event;
86+
do {
87+
client.evt |> evt;
88+
alt evt {
89+
aio::closed. {
90+
ret;
91+
}
92+
_ {}
93+
}
94+
} while (true);
95+
}
96+
97+
// Local Variables:
98+
// mode: rust;
99+
// fill-column: 78;
100+
// indent-tabs-mode: nil
101+
// c-basic-offset: 4
102+
// buffer-file-coding-system: utf-8-unix
103+
// compile-command: "make -k -C .. 2>&1 | sed -e 's/\\/x\\//x:\\//g'";
104+
// End:

src/lib/std.rc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ mod os_fs = "posix_fs.rs";
7070

7171
mod run = "run_program.rs";
7272
mod fs;
73+
mod aio;
74+
mod sio;
7375

7476
// FIXME: parametric
7577
mod map;

src/rt/rust_log.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ size_t log_rt_gc;
197197
size_t log_rt_stdlib;
198198
size_t log_rt_kern;
199199
size_t log_rt_backtrace;
200+
size_t log_rt_callback;
200201

201202
static const mod_entry _rt_module_map[] =
202203
{{"::rt::mem", &log_rt_mem},
@@ -211,6 +212,7 @@ static const mod_entry _rt_module_map[] =
211212
{"::rt::stdlib", &log_rt_stdlib},
212213
{"::rt::kern", &log_rt_kern},
213214
{"::rt::backtrace", &log_rt_backtrace},
215+
{"::rt::callback", &log_rt_callback},
214216
{NULL, NULL}};
215217

216218
void update_log_settings(void* crate_map, char* settings) {

src/rt/rust_log.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,5 +66,6 @@ extern size_t log_rt_gc;
6666
extern size_t log_rt_stdlib;
6767
extern size_t log_rt_kern;
6868
extern size_t log_rt_backtrace;
69+
extern size_t log_rt_callback;
6970

7071
#endif /* RUST_LOG_H */

src/rt/rust_upcall.cpp

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,8 @@
11
#include "rust_internal.h"
2+
#include "rust_upcall.h"
23

34
// Upcalls.
45

5-
#ifdef __GNUC__
6-
#define LOG_UPCALL_ENTRY(task) \
7-
LOG(task, upcall, \
8-
"> UPCALL %s - task: %s 0x%" PRIxPTR \
9-
" retpc: x%" PRIxPTR, \
10-
__FUNCTION__, \
11-
(task)->name, (task), \
12-
__builtin_return_address(0));
13-
#else
14-
#define LOG_UPCALL_ENTRY(task) \
15-
LOG(task, upcall, "> UPCALL task: %s @x%" PRIxPTR, \
16-
(task)->name, (task));
17-
#endif
18-
196
extern "C" CDECL char const *
207
str_buf(rust_task *task, rust_str *s);
218

src/rt/rust_upcall.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#pragma once
2+
3+
#ifdef __GNUC__
4+
#define LOG_UPCALL_ENTRY(task) \
5+
LOG(task, upcall, \
6+
"> UPCALL %s - task: %s 0x%" PRIxPTR \
7+
" retpc: x%" PRIxPTR, \
8+
__FUNCTION__, \
9+
(task)->name, (task), \
10+
__builtin_return_address(0));
11+
#else
12+
#define LOG_UPCALL_ENTRY(task) \
13+
LOG(task, upcall, "> UPCALL task: %s @x%" PRIxPTR, \
14+
(task)->name, (task));
15+
#endif
16+
17+

0 commit comments

Comments
 (0)