diff --git a/cli/tests/unit_node/net_test.ts b/cli/tests/unit_node/net_test.ts index 3b78cbe325e05a..312271f88f3cf3 100644 --- a/cli/tests/unit_node/net_test.ts +++ b/cli/tests/unit_node/net_test.ts @@ -130,3 +130,60 @@ Deno.test("[node/net] connection event has socket value", async () => { await Promise.all([p, p2]); }); + +// https://github.com/denoland/deno/issues/20188 +Deno.test("[node/net] multiple Sockets should get correct server data", async () => { + const p = deferred(); + const p2 = deferred(); + + const dataReceived1 = deferred(); + const dataReceived2 = deferred(); + + const events1: string[] = []; + const events2: string[] = []; + + const server = net.createServer(); + server.on("connection", (socket) => { + assert(socket !== undefined); + socket.on("data", (data) => { + socket.write(new TextDecoder().decode(data)); + }); + }); + + server.listen(async () => { + // deno-lint-ignore no-explicit-any + const { port } = server.address() as any; + + const socket1 = net.createConnection(port); + const socket2 = net.createConnection(port); + + socket1.on("data", (data) => { + events1.push(new TextDecoder().decode(data)); + dataReceived1.resolve(); + }); + + socket2.on("data", (data) => { + events2.push(new TextDecoder().decode(data)); + dataReceived2.resolve(); + }); + + socket1.write("111"); + socket2.write("222"); + + await Promise.all([dataReceived1, dataReceived2]); + + socket1.end(); + socket2.end(); + + server.close(() => { + p.resolve(); + }); + + p2.resolve(); + }); + + await Promise.all([p, p2]); + + assertEquals(events1, ["111"]); + assertEquals(events2, ["222"]); +}); diff --git a/ext/node/polyfills/internal_binding/stream_wrap.ts b/ext/node/polyfills/internal_binding/stream_wrap.ts index 66ebbe682dfadb..8e976da2c4f6de 100644 --- a/ext/node/polyfills/internal_binding/stream_wrap.ts +++ b/ext/node/polyfills/internal_binding/stream_wrap.ts @@ -311,56 +311,61 @@ export class LibuvStreamWrap extends HandleWrap { /** Internal method for reading from the attached stream. */ async #read() { - let buf = BUF; - - let nread: number | null; - const ridBefore = this[kStreamBaseField]!.rid; + const isOwnedBuf = bufLocked; + let buf = bufLocked ? new Uint8Array(SUGGESTED_SIZE) : BUF; + bufLocked = true; try { - nread = await this[kStreamBaseField]!.read(buf); - } catch (e) { - // Try to read again if the underlying stream resource - // changed. This can happen during TLS upgrades (eg. STARTTLS) - if (ridBefore != this[kStreamBaseField]!.rid) { - return this.#read(); - } + let nread: number | null; + const ridBefore = this[kStreamBaseField]!.rid; + try { + nread = await this[kStreamBaseField]!.read(buf); + } catch (e) { + // Try to read again if the underlying stream resource + // changed. This can happen during TLS upgrades (eg. STARTTLS) + if (ridBefore != this[kStreamBaseField]!.rid) { + return this.#read(); + } - if ( - e instanceof Deno.errors.Interrupted || - e instanceof Deno.errors.BadResource - ) { - nread = codeMap.get("EOF")!; - } else if ( - e instanceof Deno.errors.ConnectionReset || - e instanceof Deno.errors.ConnectionAborted - ) { - nread = codeMap.get("ECONNRESET")!; - } else { - nread = codeMap.get("UNKNOWN")!; - } + if ( + e instanceof Deno.errors.Interrupted || + e instanceof Deno.errors.BadResource + ) { + nread = codeMap.get("EOF")!; + } else if ( + e instanceof Deno.errors.ConnectionReset || + e instanceof Deno.errors.ConnectionAborted + ) { + nread = codeMap.get("ECONNRESET")!; + } else { + nread = codeMap.get("UNKNOWN")!; + } - buf = new Uint8Array(0); - } + buf = new Uint8Array(0); + } - nread ??= codeMap.get("EOF")!; + nread ??= codeMap.get("EOF")!; - streamBaseState[kReadBytesOrError] = nread; + streamBaseState[kReadBytesOrError] = nread; - if (nread > 0) { - this.bytesRead += nread; - } + if (nread > 0) { + this.bytesRead += nread; + } - buf = buf.slice(0, nread); + buf = isOwnedBuf ? buf.subarray(0, nread) : buf.slice(0, nread); - streamBaseState[kArrayBufferOffset] = 0; + streamBaseState[kArrayBufferOffset] = 0; - try { - this.onread!(buf, nread); - } catch { - // swallow callback errors. - } + try { + this.onread!(buf, nread); + } catch { + // swallow callback errors. + } - if (nread >= 0 && this.#reading) { - this.#read(); + if (nread >= 0 && this.#reading) { + this.#read(); + } + } finally { + bufLocked = false; } } @@ -423,4 +428,7 @@ export class LibuvStreamWrap extends HandleWrap { } } +// Used in #read above const BUF = new Uint8Array(SUGGESTED_SIZE); +// We need to ensure that only one inflight read request uses the cached buffer above +let bufLocked = false;