Skip to content

Commit

Permalink
perf(ext/websocket): optimize op_ws_next_event (#16325)
Browse files Browse the repository at this point in the history
Towards #16315
  • Loading branch information
littledivy authored Oct 19, 2022
1 parent 743fcc0 commit e3a3095
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 43 deletions.
40 changes: 27 additions & 13 deletions ext/websocket/01_websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
ArrayPrototypeJoin,
ArrayPrototypeMap,
ArrayPrototypeSome,
Uint32Array,
ErrorPrototypeToString,
ObjectDefineProperties,
ObjectPrototypeIsPrototypeOf,
Expand Down Expand Up @@ -82,6 +83,10 @@
const _idleTimeoutDuration = Symbol("[[idleTimeout]]");
const _idleTimeoutTimeout = Symbol("[[idleTimeoutTimeout]]");
const _serverHandleIdleTimeout = Symbol("[[serverHandleIdleTimeout]]");

/* [event type, close code] */
const eventBuf = new Uint32Array(2);

class WebSocket extends EventTarget {
[_rid];

Expand Down Expand Up @@ -410,13 +415,15 @@

async [_eventLoop]() {
while (this[_readyState] !== CLOSED) {
const { kind, value } = await core.opAsync(
const value = await core.opAsync(
"op_ws_next_event",
this[_rid],
eventBuf,
);

const kind = eventBuf[0];
switch (kind) {
case "string": {
/* string */
case 0: {
this[_serverHandleIdleTimeout]();
const event = new MessageEvent("message", {
data: value,
Expand All @@ -425,7 +432,8 @@
this.dispatchEvent(event);
break;
}
case "binary": {
/* binary */
case 1: {
this[_serverHandleIdleTimeout]();
let data;

Expand All @@ -442,18 +450,23 @@
this.dispatchEvent(event);
break;
}
case "ping": {
/* ping */
case 3: {
core.opAsync("op_ws_send", this[_rid], {
kind: "pong",
});
break;
}
case "pong": {
/* pong */
case 4: {
this[_serverHandleIdleTimeout]();
break;
}
case "closed":
case "close": {
/* closed */
case 6: // falls through
/* close */
case 2: {
const code = eventBuf[1];
const prevState = this[_readyState];
this[_readyState] = CLOSED;
clearTimeout(this[_idleTimeoutTimeout]);
Expand All @@ -463,8 +476,8 @@
await core.opAsync(
"op_ws_close",
this[_rid],
value.code,
value.reason,
code,
value,
);
} catch {
// ignore failures
Expand All @@ -473,14 +486,15 @@

const event = new CloseEvent("close", {
wasClean: true,
code: value.code,
reason: value.reason,
code,
reason: value,
});
this.dispatchEvent(event);
core.tryClose(this[_rid]);
break;
}
case "error": {
/* error */
case 5: {
this[_readyState] = CLOSED;

const errorEv = new ErrorEvent("error", {
Expand Down
40 changes: 28 additions & 12 deletions ext/websocket/02_websocketstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
SymbolFor,
TypeError,
Uint8ArrayPrototype,
Uint32Array,
} = window.__bootstrap.primordials;

webidl.converters.WebSocketStreamOptions = webidl.createDictionaryConverter(
Expand Down Expand Up @@ -168,12 +169,15 @@
PromisePrototypeThen(
(async () => {
while (true) {
const { kind } = await core.opAsync(
const kind = new Uint32Array(2);
await core.opAsync(
"op_ws_next_event",
create.rid,
kind,
);

if (kind === "close") {
/* close */
if (kind[0] === 2) {
break;
}
}
Expand Down Expand Up @@ -237,35 +241,46 @@
await this.closed;
},
});

const pull = async (controller) => {
const { kind, value } = await core.opAsync(
/* [event type, close code] */
const eventBuf = new Uint32Array(2);
const value = await core.opAsync(
"op_ws_next_event",
this[_rid],
eventBuf,
);

const kind = eventBuf[0];
switch (kind) {
case "string": {
/* string */
case 0: {
controller.enqueue(value);
break;
}
case "binary": {
/* binary */
case 1: {
controller.enqueue(value);
break;
}
case "ping": {
/* ping */
case 3: {
await core.opAsync("op_ws_send", this[_rid], {
kind: "pong",
});
await pull(controller);
break;
}
case "closed":
case "close": {
this[_closed].resolve(value);
/* closed */
case 6: // falls through
/* close */
case 2: {
const code = eventBuf[1];
this[_closed].resolve({ code, reason: value });
core.tryClose(this[_rid]);
break;
}
case "error": {
/* error */
case 5: {
const err = new Error(value);
this[_closed].reject(err);
controller.error(err);
Expand All @@ -285,7 +300,8 @@
return pull(controller);
}

this[_closed].resolve(value);
const code = eventBuf[1];
this[_closed].resolve({ code, reason: value });
core.tryClose(this[_rid]);
}
};
Expand Down
66 changes: 48 additions & 18 deletions ext/websocket/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::StringOrBuffer;
use deno_core::ZeroCopyBuf;
use deno_tls::create_client_config;
use http::header::HeaderName;
Expand Down Expand Up @@ -555,40 +556,69 @@ pub enum NextEventResponse {
Closed,
}

#[op]
#[repr(u32)]
enum NextEventKind {
String = 0,
Binary = 1,
Close = 2,
Ping = 3,
Pong = 4,
Error = 5,
Closed = 6,
}

#[op(deferred)]
pub async fn op_ws_next_event(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
) -> Result<NextEventResponse, AnyError> {
kind_out: &mut [u32],
) -> Result<Option<StringOrBuffer>, AnyError> {
let resource = state
.borrow_mut()
.resource_table
.get::<WsStreamResource>(rid)?;

let cancel = RcRef::map(&resource, |r| &r.cancel);
let val = resource.next_message(cancel).await?;
let res = match val {
Some(Ok(Message::Text(text))) => NextEventResponse::String(text),
Some(Ok(Message::Binary(data))) => NextEventResponse::Binary(data.into()),
Some(Ok(Message::Close(Some(frame)))) => NextEventResponse::Close {
code: frame.code.into(),
reason: frame.reason.to_string(),
},
Some(Ok(Message::Close(None))) => NextEventResponse::Close {
code: 1005,
reason: String::new(),
},
Some(Ok(Message::Ping(_))) => NextEventResponse::Ping,
Some(Ok(Message::Pong(_))) => NextEventResponse::Pong,
Some(Err(e)) => NextEventResponse::Error(e.to_string()),
let (kind, value) = match val {
Some(Ok(Message::Text(text))) => (
NextEventKind::String as u32,
Some(StringOrBuffer::String(text)),
),
Some(Ok(Message::Binary(data))) => (
NextEventKind::Binary as u32,
Some(StringOrBuffer::Buffer(data.into())),
),
Some(Ok(Message::Close(Some(frame)))) => {
let code: u16 = frame.code.into();
kind_out[1] = code as u32;
(
NextEventKind::Close as u32,
Some(StringOrBuffer::String(frame.reason.to_string())),
)
}
Some(Ok(Message::Close(None))) => {
kind_out[1] = 1005;
(
NextEventKind::Close as u32,
Some(StringOrBuffer::String(String::new())),
)
}
Some(Ok(Message::Ping(_))) => (NextEventKind::Ping as u32, None),
Some(Ok(Message::Pong(_))) => (NextEventKind::Pong as u32, None),
Some(Err(e)) => (
NextEventKind::Error as u32,
Some(StringOrBuffer::String(e.to_string())),
),
None => {
// No message was received, presumably the socket closed while we waited.
// Try close the stream, ignoring any errors, and report closed status to JavaScript.
let _ = state.borrow_mut().resource_table.close(rid);
NextEventResponse::Closed
(NextEventKind::Closed as u32, None)
}
};
Ok(res)
kind_out[0] = kind as u32;
Ok(value)
}

pub fn init<P: WebSocketPermissions + 'static>(
Expand Down

0 comments on commit e3a3095

Please sign in to comment.