streams: 5% throughput gain when sending small chunks
Improves the performance when moving small buffers by 5%, and it adds a benchmark to avoid regression in that area. In all other cases it is equally performant to current master. Full performance results available at: https://gist.github.com/mcollina/717c35ad07d15710b6b9. PR-URL: https://github.com/nodejs/node/pull/4354 Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
parent
dccccbbbe7
commit
7764b6cb96
96
benchmark/net/net-c2s-cork.js
Normal file
96
benchmark/net/net-c2s-cork.js
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
// test the speed of .pipe() with sockets
|
||||||
|
|
||||||
|
var common = require('../common.js');
|
||||||
|
var PORT = common.PORT;
|
||||||
|
|
||||||
|
var bench = common.createBenchmark(main, {
|
||||||
|
len: [4, 8, 16, 32, 64, 128, 512, 1024],
|
||||||
|
type: ['buf'],
|
||||||
|
dur: [5],
|
||||||
|
});
|
||||||
|
|
||||||
|
var dur;
|
||||||
|
var len;
|
||||||
|
var type;
|
||||||
|
var chunk;
|
||||||
|
var encoding;
|
||||||
|
|
||||||
|
function main(conf) {
|
||||||
|
dur = +conf.dur;
|
||||||
|
len = +conf.len;
|
||||||
|
type = conf.type;
|
||||||
|
|
||||||
|
switch (type) {
|
||||||
|
case 'buf':
|
||||||
|
chunk = new Buffer(len);
|
||||||
|
chunk.fill('x');
|
||||||
|
break;
|
||||||
|
case 'utf':
|
||||||
|
encoding = 'utf8';
|
||||||
|
chunk = new Array(len / 2 + 1).join('ü');
|
||||||
|
break;
|
||||||
|
case 'asc':
|
||||||
|
encoding = 'ascii';
|
||||||
|
chunk = new Array(len + 1).join('x');
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new Error('invalid type: ' + type);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
server();
|
||||||
|
}
|
||||||
|
|
||||||
|
var net = require('net');
|
||||||
|
|
||||||
|
function Writer() {
|
||||||
|
this.received = 0;
|
||||||
|
this.writable = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
Writer.prototype.write = function(chunk, encoding, cb) {
|
||||||
|
this.received += chunk.length;
|
||||||
|
|
||||||
|
if (typeof encoding === 'function')
|
||||||
|
encoding();
|
||||||
|
else if (typeof cb === 'function')
|
||||||
|
cb();
|
||||||
|
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
|
||||||
|
// doesn't matter, never emits anything.
|
||||||
|
Writer.prototype.on = function() {};
|
||||||
|
Writer.prototype.once = function() {};
|
||||||
|
Writer.prototype.emit = function() {};
|
||||||
|
|
||||||
|
function server() {
|
||||||
|
var writer = new Writer();
|
||||||
|
|
||||||
|
// the actual benchmark.
|
||||||
|
var server = net.createServer(function(socket) {
|
||||||
|
socket.pipe(writer);
|
||||||
|
});
|
||||||
|
|
||||||
|
server.listen(PORT, function() {
|
||||||
|
var socket = net.connect(PORT);
|
||||||
|
socket.on('connect', function() {
|
||||||
|
bench.start();
|
||||||
|
|
||||||
|
socket.on('drain', send)
|
||||||
|
send()
|
||||||
|
|
||||||
|
setTimeout(function() {
|
||||||
|
var bytes = writer.received;
|
||||||
|
var gbits = (bytes * 8) / (1024 * 1024 * 1024);
|
||||||
|
bench.end(gbits);
|
||||||
|
}, dur * 1000);
|
||||||
|
|
||||||
|
function send() {
|
||||||
|
socket.cork();
|
||||||
|
while(socket.write(chunk, encoding)) {}
|
||||||
|
socket.uncork();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
@ -108,6 +108,14 @@ function WritableState(options, stream) {
|
|||||||
|
|
||||||
// True if the error was already emitted and should not be thrown again
|
// True if the error was already emitted and should not be thrown again
|
||||||
this.errorEmitted = false;
|
this.errorEmitted = false;
|
||||||
|
|
||||||
|
// count buffered requests
|
||||||
|
this.bufferedRequestCount = 0;
|
||||||
|
|
||||||
|
// create the two objects needed to store the corked requests
|
||||||
|
// they are not a linked list, as no new elements are inserted in there
|
||||||
|
this.corkedRequestsFree = new CorkedRequest(this);
|
||||||
|
this.corkedRequestsFree.next = new CorkedRequest(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
WritableState.prototype.getBuffer = function writableStateGetBuffer() {
|
WritableState.prototype.getBuffer = function writableStateGetBuffer() {
|
||||||
@ -274,6 +282,7 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
|
|||||||
} else {
|
} else {
|
||||||
state.bufferedRequest = state.lastBufferedRequest;
|
state.bufferedRequest = state.lastBufferedRequest;
|
||||||
}
|
}
|
||||||
|
state.bufferedRequestCount += 1;
|
||||||
} else {
|
} else {
|
||||||
doWrite(stream, state, false, len, chunk, encoding, cb);
|
doWrite(stream, state, false, len, chunk, encoding, cb);
|
||||||
}
|
}
|
||||||
@ -357,7 +366,6 @@ function onwriteDrain(stream, state) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// if there's something in the buffer waiting, then process it
|
// if there's something in the buffer waiting, then process it
|
||||||
function clearBuffer(stream, state) {
|
function clearBuffer(stream, state) {
|
||||||
state.bufferProcessing = true;
|
state.bufferProcessing = true;
|
||||||
@ -365,26 +373,26 @@ function clearBuffer(stream, state) {
|
|||||||
|
|
||||||
if (stream._writev && entry && entry.next) {
|
if (stream._writev && entry && entry.next) {
|
||||||
// Fast case, write everything using _writev()
|
// Fast case, write everything using _writev()
|
||||||
var buffer = [];
|
var l = state.bufferedRequestCount;
|
||||||
var cbs = [];
|
var buffer = new Array(l);
|
||||||
|
var holder = state.corkedRequestsFree;
|
||||||
|
holder.entry = entry;
|
||||||
|
|
||||||
|
var count = 0;
|
||||||
while (entry) {
|
while (entry) {
|
||||||
cbs.push(entry.callback);
|
buffer[count] = entry;
|
||||||
buffer.push(entry);
|
|
||||||
entry = entry.next;
|
entry = entry.next;
|
||||||
|
count += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// count the one we are adding, as well.
|
doWrite(stream, state, true, state.length, buffer, '', holder.finish);
|
||||||
// TODO(isaacs) clean this up
|
|
||||||
|
// doWrite is always async, defer these to save a bit of time
|
||||||
|
// as the hot path ends with doWrite
|
||||||
state.pendingcb++;
|
state.pendingcb++;
|
||||||
state.lastBufferedRequest = null;
|
state.lastBufferedRequest = null;
|
||||||
doWrite(stream, state, true, state.length, buffer, '', function(err) {
|
state.corkedRequestsFree = holder.next;
|
||||||
for (var i = 0; i < cbs.length; i++) {
|
holder.next = null;
|
||||||
state.pendingcb--;
|
|
||||||
cbs[i](err);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Clear buffer
|
|
||||||
} else {
|
} else {
|
||||||
// Slow case, write chunks one-by-one
|
// Slow case, write chunks one-by-one
|
||||||
while (entry) {
|
while (entry) {
|
||||||
@ -407,6 +415,8 @@ function clearBuffer(stream, state) {
|
|||||||
if (entry === null)
|
if (entry === null)
|
||||||
state.lastBufferedRequest = null;
|
state.lastBufferedRequest = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
state.bufferedRequestCount = 0;
|
||||||
state.bufferedRequest = entry;
|
state.bufferedRequest = entry;
|
||||||
state.bufferProcessing = false;
|
state.bufferProcessing = false;
|
||||||
}
|
}
|
||||||
@ -485,3 +495,26 @@ function endWritable(stream, state, cb) {
|
|||||||
state.ended = true;
|
state.ended = true;
|
||||||
stream.writable = false;
|
stream.writable = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// It seems a linked list but it is not
|
||||||
|
// there will be only 2 of these for each stream
|
||||||
|
function CorkedRequest(state) {
|
||||||
|
this.next = null;
|
||||||
|
this.entry = null;
|
||||||
|
|
||||||
|
this.finish = (err) => {
|
||||||
|
var entry = this.entry;
|
||||||
|
this.entry = null;
|
||||||
|
while (entry) {
|
||||||
|
var cb = entry.callback;
|
||||||
|
state.pendingcb--;
|
||||||
|
cb(err);
|
||||||
|
entry = entry.next;
|
||||||
|
}
|
||||||
|
if (state.corkedRequestsFree) {
|
||||||
|
state.corkedRequestsFree.next = this;
|
||||||
|
} else {
|
||||||
|
state.corkedRequestsFree = this;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user