Skip to content

Commit dfc0ef5

Browse files
mscdexBridgeAR
authored andcommitted
net: allow reading data into a static buffer
Co-Authored-By: Anna Henningsen <anna@addaleax.net> PR-URL: #25436 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
1 parent 2fafd63 commit dfc0ef5

File tree

7 files changed

+474
-65
lines changed

7 files changed

+474
-65
lines changed

benchmark/net/net-s2c.js

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,48 +5,84 @@ const common = require('../common.js');
55
const PORT = common.PORT;
66

77
const bench = common.createBenchmark(main, {
8-
len: [64, 102400, 1024 * 1024 * 16],
8+
sendchunklen: [256, 32 * 1024, 128 * 1024, 16 * 1024 * 1024],
99
type: ['utf', 'asc', 'buf'],
10+
recvbuflen: [0, 64 * 1024, 1024 * 1024],
11+
recvbufgenfn: ['true', 'false'],
1012
dur: [5]
1113
});
1214

1315
var chunk;
1416
var encoding;
17+
var recvbuf;
18+
var received = 0;
19+
20+
function main({ dur, sendchunklen, type, recvbuflen, recvbufgenfn }) {
21+
if (isFinite(recvbuflen) && recvbuflen > 0)
22+
recvbuf = Buffer.alloc(recvbuflen);
1523

16-
function main({ dur, len, type }) {
1724
switch (type) {
1825
case 'buf':
19-
chunk = Buffer.alloc(len, 'x');
26+
chunk = Buffer.alloc(sendchunklen, 'x');
2027
break;
2128
case 'utf':
2229
encoding = 'utf8';
23-
chunk = 'ü'.repeat(len / 2);
30+
chunk = 'ü'.repeat(sendchunklen / 2);
2431
break;
2532
case 'asc':
2633
encoding = 'ascii';
27-
chunk = 'x'.repeat(len);
34+
chunk = 'x'.repeat(sendchunklen);
2835
break;
2936
default:
3037
throw new Error(`invalid type: ${type}`);
3138
}
3239

3340
const reader = new Reader();
34-
const writer = new Writer();
41+
var writer;
42+
var socketOpts;
43+
if (recvbuf === undefined) {
44+
writer = new Writer();
45+
socketOpts = { port: PORT };
46+
} else {
47+
let buffer = recvbuf;
48+
if (recvbufgenfn === 'true') {
49+
let bufidx = -1;
50+
const bufpool = [
51+
recvbuf,
52+
Buffer.from(recvbuf),
53+
Buffer.from(recvbuf),
54+
];
55+
buffer = () => {
56+
bufidx = (bufidx + 1) % bufpool.length;
57+
return bufpool[bufidx];
58+
};
59+
}
60+
socketOpts = {
61+
port: PORT,
62+
onread: {
63+
buffer,
64+
callback: function(nread, buf) {
65+
received += nread;
66+
}
67+
}
68+
};
69+
}
3570

3671
// The actual benchmark.
3772
const server = net.createServer((socket) => {
3873
reader.pipe(socket);
3974
});
4075

4176
server.listen(PORT, () => {
42-
const socket = net.connect(PORT);
77+
const socket = net.connect(socketOpts);
4378
socket.on('connect', () => {
4479
bench.start();
4580

46-
socket.pipe(writer);
81+
if (recvbuf === undefined)
82+
socket.pipe(writer);
4783

4884
setTimeout(() => {
49-
const bytes = writer.received;
85+
const bytes = received;
5086
const gbits = (bytes * 8) / (1024 * 1024 * 1024);
5187
bench.end(gbits);
5288
process.exit(0);
@@ -58,12 +94,11 @@ function main({ dur, len, type }) {
5894
const net = require('net');
5995

6096
function Writer() {
61-
this.received = 0;
6297
this.writable = true;
6398
}
6499

65100
Writer.prototype.write = function(chunk, encoding, cb) {
66-
this.received += chunk.length;
101+
received += chunk.length;
67102

68103
if (typeof encoding === 'function')
69104
encoding();

doc/api/net.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,9 @@ for the [`'connect'`][] event **once**.
593593
<!-- YAML
594594
added: v0.1.90
595595
changes:
596+
- version: REPLACEME
597+
pr-url: https://github.com/nodejs/node/pull/25436
598+
description: Added `onread` option.
596599
- version: v6.0.0
597600
pr-url: https://github.com/nodejs/node/pull/6021
598601
description: The `hints` option defaults to `0` in all cases now.
@@ -629,6 +632,39 @@ For [IPC][] connections, available `options` are:
629632
See [Identifying paths for IPC connections][]. If provided, the TCP-specific
630633
options above are ignored.
631634

635+
For both types, available `options` include:
636+
637+
* `onread` {Object} - If specified, incoming data is stored in a single `buffer`
638+
and passed to the supplied `callback` when data arrives on the socket.
639+
Note: this will cause the streaming functionality to not provide any data,
640+
however events like `'error'`, `'end'`, and `'close'` will still be emitted
641+
as normal and methods like `pause()` and `resume()` will also behave as
642+
expected.
643+
* `buffer` {Buffer|Uint8Array|Function} - Either a reusable chunk of memory to
644+
use for storing incoming data or a function that returns such.
645+
* `callback` {Function} This function is called for every chunk of incoming
646+
data. Two arguments are passed to it: the number of bytes written to
647+
`buffer` and a reference to `buffer`. Return `false` from this function to
648+
implicitly `pause()` the socket. This function will be executed in the
649+
global context.
650+
651+
Following is an example of a client using the `onread` option:
652+
653+
```js
654+
const net = require('net');
655+
net.connect({
656+
port: 80,
657+
onread: {
658+
// Reuses a 4KiB Buffer for every read from the socket
659+
buffer: Buffer.alloc(4 * 1024),
660+
callback: function(nread, buf) {
661+
// Received data is available in `buf` from 0 to `nread`
662+
console.log(buf.toString('utf8', 0, nread));
663+
}
664+
}
665+
});
666+
```
667+
632668
#### socket.connect(path[, connectListener])
633669

634670
* `path` {string} Path the client should connect to. See

lib/internal/stream_base_commons.js

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const {
2323
setUnrefTimeout,
2424
getTimerDuration
2525
} = require('internal/timers');
26+
const { isUint8Array } = require('internal/util/types');
2627
const { clearTimeout } = require('timers');
2728

2829
const kMaybeDestroy = Symbol('kMaybeDestroy');
@@ -32,6 +33,9 @@ const kHandle = Symbol('kHandle');
3233
const kSession = Symbol('kSession');
3334

3435
const debug = require('internal/util/debuglog').debuglog('stream');
36+
const kBuffer = Symbol('kBuffer');
37+
const kBufferGen = Symbol('kBufferGen');
38+
const kBufferCb = Symbol('kBufferCb');
3539

3640
function handleWriteReq(req, data, encoding) {
3741
const { handle } = req;
@@ -161,9 +165,23 @@ function onStreamRead(arrayBuffer) {
161165
stream[kUpdateTimer]();
162166

163167
if (nread > 0 && !stream.destroyed) {
164-
const offset = streamBaseState[kArrayBufferOffset];
165-
const buf = new FastBuffer(arrayBuffer, offset, nread);
166-
if (!stream.push(buf)) {
168+
let ret;
169+
let result;
170+
const userBuf = stream[kBuffer];
171+
if (userBuf) {
172+
result = (stream[kBufferCb](nread, userBuf) !== false);
173+
const bufGen = stream[kBufferGen];
174+
if (bufGen !== null) {
175+
const nextBuf = bufGen();
176+
if (isUint8Array(nextBuf))
177+
stream[kBuffer] = ret = nextBuf;
178+
}
179+
} else {
180+
const offset = streamBaseState[kArrayBufferOffset];
181+
const buf = new FastBuffer(arrayBuffer, offset, nread);
182+
result = stream.push(buf);
183+
}
184+
if (!result) {
167185
handle.reading = false;
168186
if (!stream.destroyed) {
169187
const err = handle.readStop();
@@ -172,7 +190,7 @@ function onStreamRead(arrayBuffer) {
172190
}
173191
}
174192

175-
return;
193+
return ret;
176194
}
177195

178196
if (nread === 0) {
@@ -241,5 +259,8 @@ module.exports = {
241259
kUpdateTimer,
242260
kHandle,
243261
kSession,
244-
setStreamTimeout
262+
setStreamTimeout,
263+
kBuffer,
264+
kBufferCb,
265+
kBufferGen
245266
};

0 commit comments

Comments
 (0)