diff --git a/cli/tests/unit_node/net_test.ts b/cli/tests/unit_node/net_test.ts index 312271f88f3cf3..b9d9b796a33c43 100644 --- a/cli/tests/unit_node/net_test.ts +++ b/cli/tests/unit_node/net_test.ts @@ -5,7 +5,7 @@ import { assert, assertEquals, } from "../../../test_util/std/testing/asserts.ts"; -import { deferred } from "../../../test_util/std/async/deferred.ts"; +import { Deferred, deferred } from "../../../test_util/std/async/deferred.ts"; import * as path from "../../../test_util/std/path/mod.ts"; import * as http from "node:http"; @@ -131,17 +131,18 @@ Deno.test("[node/net] connection event has socket value", async () => { await Promise.all([p, p2]); }); +/// We need to make sure that any shared buffers are never used concurrently by two reads. // https://github.com/denoland/deno/issues/20188 Deno.test("[node/net] multiple Sockets should get correct server data", async () => { - const p = deferred(); - const p2 = deferred(); + const socketCount = 9; - const dataReceived1 = deferred(); - const dataReceived2 = deferred(); - - const events1: string[] = []; - const events2: string[] = []; + class TestSocket { + dataReceived: Deferred = deferred(); + events: string[] = []; + socket: net.Socket | undefined; + } + const finished = deferred(); const server = net.createServer(); server.on("connection", (socket) => { assert(socket !== undefined); @@ -150,40 +151,47 @@ Deno.test("[node/net] multiple Sockets should get correct server data", async () }); }); + const sockets: TestSocket[] = []; + for (let i = 0; i < socketCount; i++) { + sockets[i] = new TestSocket(); + } + server.listen(async () => { // deno-lint-ignore no-explicit-any const { port } = server.address() as any; - const socket1 = net.createConnection(port); - const socket2 = net.createConnection(port); - - socket1.on("data", (data) => { - events1.push(new TextDecoder().decode(data)); - dataReceived1.resolve(); - }); - - socket2.on("data", (data) => { - events2.push(new TextDecoder().decode(data)); - dataReceived2.resolve(); - }); + for (let i = 0; i < socketCount; i++) { + const socket = sockets[i].socket = net.createConnection(port); + socket.on("data", (data) => { + const count = sockets[i].events.length; + sockets[i].events.push(new TextDecoder().decode(data)); + if (count === 0) { + // Trigger an immediate second write + sockets[i].socket?.write(`${i}`.repeat(3)); + } else { + sockets[i].dataReceived.resolve(); + } + }); + } - socket1.write("111"); - socket2.write("222"); + for (let i = 0; i < socketCount; i++) { + sockets[i].socket?.write(`${i}`.repeat(3)); + } - await Promise.all([dataReceived1, dataReceived2]); + await Promise.all(sockets.map((socket) => socket.dataReceived)); - socket1.end(); - socket2.end(); + for (let i = 0; i < socketCount; i++) { + sockets[i].socket?.end(); + } server.close(() => { - p.resolve(); + finished.resolve(); }); - - p2.resolve(); }); - await Promise.all([p, p2]); + await finished; - assertEquals(events1, ["111"]); - assertEquals(events2, ["222"]); + for (let i = 0; i < socketCount; i++) { + assertEquals(sockets[i].events, [`${i}`.repeat(3), `${i}`.repeat(3)]); + } }); diff --git a/ext/node/polyfills/internal_binding/stream_wrap.ts b/ext/node/polyfills/internal_binding/stream_wrap.ts index 8e976da2c4f6de..d8e42bd652279e 100644 --- a/ext/node/polyfills/internal_binding/stream_wrap.ts +++ b/ext/node/polyfills/internal_binding/stream_wrap.ts @@ -312,6 +312,8 @@ export class LibuvStreamWrap extends HandleWrap { /** Internal method for reading from the attached stream. */ async #read() { const isOwnedBuf = bufLocked; + // Lock safety: We must hold this lock until we are certain that buf is no longer used + let locked = !isOwnedBuf; let buf = bufLocked ? new Uint8Array(SUGGESTED_SIZE) : BUF; bufLocked = true; try { @@ -320,12 +322,20 @@ export class LibuvStreamWrap extends HandleWrap { try { nread = await this[kStreamBaseField]!.read(buf); } catch (e) { + // Lock safety: we know that the buffer will not be used in this function again + // All exits from this block either return or re-assign buf to a different value + if (!isOwnedBuf) { + bufLocked = locked = false; + } + // Try to read again if the underlying stream resource // changed. This can happen during TLS upgrades (eg. STARTTLS) if (ridBefore != this[kStreamBaseField]!.rid) { return this.#read(); } + buf = new Uint8Array(0); + if ( e instanceof Deno.errors.Interrupted || e instanceof Deno.errors.BadResource @@ -339,8 +349,6 @@ export class LibuvStreamWrap extends HandleWrap { } else { nread = codeMap.get("UNKNOWN")!; } - - buf = new Uint8Array(0); } nread ??= codeMap.get("EOF")!; @@ -362,10 +370,19 @@ export class LibuvStreamWrap extends HandleWrap { } if (nread >= 0 && this.#reading) { + // Lock safety: we know that the buffer will not be used in this function again + // We release the lock early so a re-entrant read can make use of the shared buffer + if (locked) { + bufLocked = locked = false; + } + this.#read(); } } finally { - bufLocked = false; + // Lock safety: we know that the buffer will not be used in this function again + if (locked) { + bufLocked = locked = false; + } } }