diff --git a/cli/tests/unit_node/net_test.ts b/cli/tests/unit_node/net_test.ts index 312271f88f3cf3..b9d9b796a33c43 100644 --- a/cli/tests/unit_node/net_test.ts +++ b/cli/tests/unit_node/net_test.ts @@ -5,7 +5,7 @@ import { assert, assertEquals, } from "../../../test_util/std/testing/asserts.ts"; -import { deferred } from "../../../test_util/std/async/deferred.ts"; +import { Deferred, deferred } from "../../../test_util/std/async/deferred.ts"; import * as path from "../../../test_util/std/path/mod.ts"; import * as http from "node:http"; @@ -131,17 +131,18 @@ Deno.test("[node/net] connection event has socket value", async () => { await Promise.all([p, p2]); }); +/// We need to make sure that any shared buffers are never used concurrently by two reads. // https://github.com/denoland/deno/issues/20188 Deno.test("[node/net] multiple Sockets should get correct server data", async () => { - const p = deferred(); - const p2 = deferred(); + const socketCount = 9; - const dataReceived1 = deferred(); - const dataReceived2 = deferred(); - - const events1: string[] = []; - const events2: string[] = []; + class TestSocket { + dataReceived: Deferred = deferred(); + events: string[] = []; + socket: net.Socket | undefined; + } + const finished = deferred(); const server = net.createServer(); server.on("connection", (socket) => { assert(socket !== undefined); @@ -150,40 +151,47 @@ Deno.test("[node/net] multiple Sockets should get correct server data", async () }); }); + const sockets: TestSocket[] = []; + for (let i = 0; i < socketCount; i++) { + sockets[i] = new TestSocket(); + } + server.listen(async () => { // deno-lint-ignore no-explicit-any const { port } = server.address() as any; - const socket1 = net.createConnection(port); - const socket2 = net.createConnection(port); - - socket1.on("data", (data) => { - events1.push(new TextDecoder().decode(data)); - dataReceived1.resolve(); - }); - - socket2.on("data", (data) => { - events2.push(new TextDecoder().decode(data)); - dataReceived2.resolve(); - }); + for (let i = 0; i < socketCount; i++) { + const socket = sockets[i].socket = net.createConnection(port); + socket.on("data", (data) => { + const count = sockets[i].events.length; + sockets[i].events.push(new TextDecoder().decode(data)); + if (count === 0) { + // Trigger an immediate second write + sockets[i].socket?.write(`${i}`.repeat(3)); + } else { + sockets[i].dataReceived.resolve(); + } + }); + } - socket1.write("111"); - socket2.write("222"); + for (let i = 0; i < socketCount; i++) { + sockets[i].socket?.write(`${i}`.repeat(3)); + } - await Promise.all([dataReceived1, dataReceived2]); + await Promise.all(sockets.map((socket) => socket.dataReceived)); - socket1.end(); - socket2.end(); + for (let i = 0; i < socketCount; i++) { + sockets[i].socket?.end(); + } server.close(() => { - p.resolve(); + finished.resolve(); }); - - p2.resolve(); }); - await Promise.all([p, p2]); + await finished; - assertEquals(events1, ["111"]); - assertEquals(events2, ["222"]); + for (let i = 0; i < socketCount; i++) { + assertEquals(sockets[i].events, [`${i}`.repeat(3), `${i}`.repeat(3)]); + } }); diff --git a/ext/node/polyfills/internal_binding/stream_wrap.ts b/ext/node/polyfills/internal_binding/stream_wrap.ts index 8e976da2c4f6de..2c2d4dda0c33f8 100644 --- a/ext/node/polyfills/internal_binding/stream_wrap.ts +++ b/ext/node/polyfills/internal_binding/stream_wrap.ts @@ -311,21 +311,36 @@ export class LibuvStreamWrap extends HandleWrap { /** Internal method for reading from the attached stream. */ async #read() { - const isOwnedBuf = bufLocked; - let buf = bufLocked ? new Uint8Array(SUGGESTED_SIZE) : BUF; - bufLocked = true; + // Lock safety: We must hold this lock until we are certain that buf is no longer used + // This setup code is a little verbose, but + let buf, locked; + if (bufLocked) { + buf = new Uint8Array(SUGGESTED_SIZE); + locked = false; + } else { + buf = BUF; + locked = bufLocked = true; + } try { let nread: number | null; const ridBefore = this[kStreamBaseField]!.rid; try { nread = await this[kStreamBaseField]!.read(buf); } catch (e) { + // Lock safety: we know that the buffer will not be used in this function again + // All exits from this block either return or re-assign buf to a different value + if (locked) { + bufLocked = locked = false; + } + // Try to read again if the underlying stream resource // changed. This can happen during TLS upgrades (eg. STARTTLS) if (ridBefore != this[kStreamBaseField]!.rid) { return this.#read(); } + buf = new Uint8Array(0); + if ( e instanceof Deno.errors.Interrupted || e instanceof Deno.errors.BadResource @@ -339,8 +354,6 @@ export class LibuvStreamWrap extends HandleWrap { } else { nread = codeMap.get("UNKNOWN")!; } - - buf = new Uint8Array(0); } nread ??= codeMap.get("EOF")!; @@ -351,7 +364,17 @@ export class LibuvStreamWrap extends HandleWrap { this.bytesRead += nread; } - buf = isOwnedBuf ? buf.subarray(0, nread) : buf.slice(0, nread); + // We release the lock early so a re-entrant read can make use of the shared buffer, but + // we need to make a copy of the data in the shared buffer. + if (locked) { + // Lock safety: we know that the buffer will not be used in this function again + // We're making a copy of data that lives in the shared buffer + buf = buf.slice(0, nread); + bufLocked = locked = false; + } else { + // The buffer isn't owned, so let's create a subarray view + buf = buf.subarray(0, nread); + } streamBaseState[kArrayBufferOffset] = 0; @@ -365,7 +388,10 @@ export class LibuvStreamWrap extends HandleWrap { this.#read(); } } finally { - bufLocked = false; + // Lock safety: we know that the buffer will not be used in this function again + if (locked) { + bufLocked = locked = false; + } } }