Skip to content

Commit

Permalink
streams: 5% throughput gain when sending small chunks
Browse files Browse the repository at this point in the history
Improves the performance when moving small buffers by 5%,
and it adds a benchmark to avoid regression in that area.
In all other cases it is equally performant to current master.

Full performance results available at:
https://gist.github.com/mcollina/717c35ad07d15710b6b9.
  • Loading branch information
mcollina committed Feb 3, 2016
1 parent 977159f commit f990d62
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 15 deletions.
96 changes: 96 additions & 0 deletions benchmark/net/net-c2s-cork.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// test the speed of .pipe() with sockets

var common = require('../common.js');
var PORT = common.PORT;

var bench = common.createBenchmark(main, {
len: [4, 8, 16, 32, 64, 128, 512, 1024],
type: ['buf'],
dur: [5],
});

var dur;
var len;
var type;
var chunk;
var encoding;

function main(conf) {
dur = +conf.dur;
len = +conf.len;
type = conf.type;

switch (type) {
case 'buf':
chunk = new Buffer(len);
chunk.fill('x');
break;
case 'utf':
encoding = 'utf8';
chunk = new Array(len / 2 + 1).join('ü');
break;
case 'asc':
encoding = 'ascii';
chunk = new Array(len + 1).join('x');
break;
default:
throw new Error('invalid type: ' + type);
break;
}

server();
}

var net = require('net');

function Writer() {
this.received = 0;
this.writable = true;
}

Writer.prototype.write = function(chunk, encoding, cb) {
this.received += chunk.length;

if (typeof encoding === 'function')
encoding();
else if (typeof cb === 'function')
cb();

return true;
};

// doesn't matter, never emits anything.
Writer.prototype.on = function() {};
Writer.prototype.once = function() {};
Writer.prototype.emit = function() {};

function server() {
var writer = new Writer();

// the actual benchmark.
var server = net.createServer(function(socket) {
socket.pipe(writer);
});

server.listen(PORT, function() {
var socket = net.connect(PORT);
socket.on('connect', function() {
bench.start();

socket.on('drain', send)
send()

setTimeout(function() {
var bytes = writer.received;
var gbits = (bytes * 8) / (1024 * 1024 * 1024);
bench.end(gbits);
}, dur * 1000);

function send() {
socket.cork();
while(socket.write(chunk, encoding)) {}
socket.uncork();
}
});
});
}
63 changes: 48 additions & 15 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ function WritableState(options, stream) {

// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;

// count buffered requests
this.bufferedRequestCount = 0;

// create the two objects needed to store the corked requests
// they are not a linked list, as no new elements are inserted in there
this.corkedRequestsFree = new CorkedRequest(this);
this.corkedRequestsFree.next = new CorkedRequest(this);
}

WritableState.prototype.getBuffer = function writableStateGetBuffer() {
Expand Down Expand Up @@ -274,6 +282,7 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
} else {
state.bufferedRequest = state.lastBufferedRequest;
}
state.bufferedRequestCount += 1;
} else {
doWrite(stream, state, false, len, chunk, encoding, cb);
}
Expand Down Expand Up @@ -357,34 +366,33 @@ function onwriteDrain(stream, state) {
}
}


// if there's something in the buffer waiting, then process it
function clearBuffer(stream, state) {
state.bufferProcessing = true;
var entry = state.bufferedRequest;

if (stream._writev && entry && entry.next) {
// Fast case, write everything using _writev()
var buffer = [];
var cbs = [];
var l = state.bufferedRequestCount;
var buffer = new Array(l);
var holder = state.corkedRequestsFree;
holder.entry = entry;

var count = 0;
while (entry) {
cbs.push(entry.callback);
buffer.push(entry);
buffer[count] = entry;
entry = entry.next;
count += 1;
}

// count the one we are adding, as well.
// TODO(isaacs) clean this up
doWrite(stream, state, true, state.length, buffer, '', holder.finish);

// doWrite is always async, defer these to save a bit of time
// as the hot path ends with doWrite
state.pendingcb++;
state.lastBufferedRequest = null;
doWrite(stream, state, true, state.length, buffer, '', function(err) {
for (var i = 0; i < cbs.length; i++) {
state.pendingcb--;
cbs[i](err);
}
});

// Clear buffer
state.corkedRequestsFree = holder.next;
holder.next = null;
} else {
// Slow case, write chunks one-by-one
while (entry) {
Expand All @@ -407,6 +415,8 @@ function clearBuffer(stream, state) {
if (entry === null)
state.lastBufferedRequest = null;
}

state.bufferedRequestCount = 0;
state.bufferedRequest = entry;
state.bufferProcessing = false;
}
Expand Down Expand Up @@ -485,3 +495,26 @@ function endWritable(stream, state, cb) {
state.ended = true;
stream.writable = false;
}

// It seems a linked list but it is not
// there will be only 2 of these for each stream
function CorkedRequest(state) {
this.next = null;
this.entry = null;

this.finish = (err) => {
var entry = this.entry;
this.entry = null;
while (entry) {
var cb = entry.callback;
state.pendingcb--;
cb(err);
entry = entry.next;
}
if (state.corkedRequestsFree) {
state.corkedRequestsFree.next = this;
} else {
state.corkedRequestsFree = this;
}
};
}

0 comments on commit f990d62

Please sign in to comment.