net: allow reading data into a static buffer

Co-Authored-By: Anna Henningsen <anna@addaleax.net>

PR-URL: https://github.com/nodejs/node/pull/25436
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
This commit is contained in:
Brian White 2019-01-10 15:52:27 -05:00
parent 9d21b0395c
commit 8292b280ec
No known key found for this signature in database
GPG Key ID: 606D7358F94DA209
7 changed files with 473 additions and 64 deletions

View File

@ -5,33 +5,68 @@ const common = require('../common.js');
const PORT = common.PORT; const PORT = common.PORT;
const bench = common.createBenchmark(main, { const bench = common.createBenchmark(main, {
len: [64, 102400, 1024 * 1024 * 16], sendchunklen: [256, 32 * 1024, 128 * 1024, 16 * 1024 * 1024],
type: ['utf', 'asc', 'buf'], type: ['utf', 'asc', 'buf'],
recvbuflen: [0, 64 * 1024, 1024 * 1024],
recvbufgenfn: ['true', 'false'],
dur: [5] dur: [5]
}); });
var chunk; var chunk;
var encoding; var encoding;
var recvbuf;
var received = 0;
function main({ dur, sendchunklen, type, recvbuflen, recvbufgenfn }) {
if (isFinite(recvbuflen) && recvbuflen > 0)
recvbuf = Buffer.alloc(recvbuflen);
function main({ dur, len, type }) {
switch (type) { switch (type) {
case 'buf': case 'buf':
chunk = Buffer.alloc(len, 'x'); chunk = Buffer.alloc(sendchunklen, 'x');
break; break;
case 'utf': case 'utf':
encoding = 'utf8'; encoding = 'utf8';
chunk = 'ü'.repeat(len / 2); chunk = 'ü'.repeat(sendchunklen / 2);
break; break;
case 'asc': case 'asc':
encoding = 'ascii'; encoding = 'ascii';
chunk = 'x'.repeat(len); chunk = 'x'.repeat(sendchunklen);
break; break;
default: default:
throw new Error(`invalid type: ${type}`); throw new Error(`invalid type: ${type}`);
} }
const reader = new Reader(); const reader = new Reader();
const writer = new Writer(); var writer;
var socketOpts;
if (recvbuf === undefined) {
writer = new Writer();
socketOpts = { port: PORT };
} else {
let buffer = recvbuf;
if (recvbufgenfn === 'true') {
let bufidx = -1;
const bufpool = [
recvbuf,
Buffer.from(recvbuf),
Buffer.from(recvbuf),
];
buffer = () => {
bufidx = (bufidx + 1) % bufpool.length;
return bufpool[bufidx];
};
}
socketOpts = {
port: PORT,
onread: {
buffer,
callback: function(nread, buf) {
received += nread;
}
}
};
}
// The actual benchmark. // The actual benchmark.
const server = net.createServer((socket) => { const server = net.createServer((socket) => {
@ -39,14 +74,15 @@ function main({ dur, len, type }) {
}); });
server.listen(PORT, () => { server.listen(PORT, () => {
const socket = net.connect(PORT); const socket = net.connect(socketOpts);
socket.on('connect', () => { socket.on('connect', () => {
bench.start(); bench.start();
socket.pipe(writer); if (recvbuf === undefined)
socket.pipe(writer);
setTimeout(() => { setTimeout(() => {
const bytes = writer.received; const bytes = received;
const gbits = (bytes * 8) / (1024 * 1024 * 1024); const gbits = (bytes * 8) / (1024 * 1024 * 1024);
bench.end(gbits); bench.end(gbits);
process.exit(0); process.exit(0);
@ -58,12 +94,11 @@ function main({ dur, len, type }) {
const net = require('net'); const net = require('net');
function Writer() { function Writer() {
this.received = 0;
this.writable = true; this.writable = true;
} }
Writer.prototype.write = function(chunk, encoding, cb) { Writer.prototype.write = function(chunk, encoding, cb) {
this.received += chunk.length; received += chunk.length;
if (typeof encoding === 'function') if (typeof encoding === 'function')
encoding(); encoding();

View File

@ -593,6 +593,9 @@ for the [`'connect'`][] event **once**.
<!-- YAML <!-- YAML
added: v0.1.90 added: v0.1.90
changes: changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/25436
description: Added `onread` option.
- version: v6.0.0 - version: v6.0.0
pr-url: https://github.com/nodejs/node/pull/6021 pr-url: https://github.com/nodejs/node/pull/6021
description: The `hints` option defaults to `0` in all cases now. description: The `hints` option defaults to `0` in all cases now.
@ -629,6 +632,39 @@ For [IPC][] connections, available `options` are:
See [Identifying paths for IPC connections][]. If provided, the TCP-specific See [Identifying paths for IPC connections][]. If provided, the TCP-specific
options above are ignored. options above are ignored.
For both types, available `options` include:
* `onread` {Object} - If specified, incoming data is stored in a single `buffer`
and passed to the supplied `callback` when data arrives on the socket.
Note: this will cause the streaming functionality to not provide any data,
however events like `'error'`, `'end'`, and `'close'` will still be emitted
as normal and methods like `pause()` and `resume()` will also behave as
expected.
* `buffer` {Buffer|Uint8Array|Function} - Either a reusable chunk of memory to
use for storing incoming data or a function that returns such.
* `callback` {Function} This function is called for every chunk of incoming
data. Two arguments are passed to it: the number of bytes written to
`buffer` and a reference to `buffer`. Return `false` from this function to
implicitly `pause()` the socket. This function will be executed in the
global context.
Following is an example of a client using the `onread` option:
```js
const net = require('net');
net.connect({
port: 80,
onread: {
// Reuses a 4KiB Buffer for every read from the socket
buffer: Buffer.alloc(4 * 1024),
callback: function(nread, buf) {
// Received data is available in `buf` from 0 to `nread`
console.log(buf.toString('utf8', 0, nread));
}
}
});
```
#### socket.connect(path[, connectListener]) #### socket.connect(path[, connectListener])
* `path` {string} Path the client should connect to. See * `path` {string} Path the client should connect to. See

View File

@ -23,6 +23,7 @@ const {
setUnrefTimeout, setUnrefTimeout,
getTimerDuration getTimerDuration
} = require('internal/timers'); } = require('internal/timers');
const { isUint8Array } = require('internal/util/types');
const { clearTimeout } = require('timers'); const { clearTimeout } = require('timers');
const kMaybeDestroy = Symbol('kMaybeDestroy'); const kMaybeDestroy = Symbol('kMaybeDestroy');
@ -32,6 +33,9 @@ const kHandle = Symbol('kHandle');
const kSession = Symbol('kSession'); const kSession = Symbol('kSession');
const debug = require('internal/util/debuglog').debuglog('stream'); const debug = require('internal/util/debuglog').debuglog('stream');
const kBuffer = Symbol('kBuffer');
const kBufferGen = Symbol('kBufferGen');
const kBufferCb = Symbol('kBufferCb');
function handleWriteReq(req, data, encoding) { function handleWriteReq(req, data, encoding) {
const { handle } = req; const { handle } = req;
@ -161,9 +165,23 @@ function onStreamRead(arrayBuffer) {
stream[kUpdateTimer](); stream[kUpdateTimer]();
if (nread > 0 && !stream.destroyed) { if (nread > 0 && !stream.destroyed) {
const offset = streamBaseState[kArrayBufferOffset]; let ret;
const buf = new FastBuffer(arrayBuffer, offset, nread); let result;
if (!stream.push(buf)) { const userBuf = stream[kBuffer];
if (userBuf) {
result = (stream[kBufferCb](nread, userBuf) !== false);
const bufGen = stream[kBufferGen];
if (bufGen !== null) {
const nextBuf = bufGen();
if (isUint8Array(nextBuf))
stream[kBuffer] = ret = nextBuf;
}
} else {
const offset = streamBaseState[kArrayBufferOffset];
const buf = new FastBuffer(arrayBuffer, offset, nread);
result = stream.push(buf);
}
if (!result) {
handle.reading = false; handle.reading = false;
if (!stream.destroyed) { if (!stream.destroyed) {
const err = handle.readStop(); const err = handle.readStop();
@ -172,7 +190,7 @@ function onStreamRead(arrayBuffer) {
} }
} }
return; return ret;
} }
if (nread === 0) { if (nread === 0) {
@ -241,5 +259,8 @@ module.exports = {
kUpdateTimer, kUpdateTimer,
kHandle, kHandle,
kSession, kSession,
setStreamTimeout setStreamTimeout,
kBuffer,
kBufferCb,
kBufferGen
}; };

View File

@ -67,7 +67,10 @@ const {
kAfterAsyncWrite, kAfterAsyncWrite,
kHandle, kHandle,
kUpdateTimer, kUpdateTimer,
setStreamTimeout setStreamTimeout,
kBuffer,
kBufferCb,
kBufferGen
} = require('internal/stream_base_commons'); } = require('internal/stream_base_commons');
const { const {
codes: { codes: {
@ -86,6 +89,7 @@ const {
exceptionWithHostPort, exceptionWithHostPort,
uvExceptionWithHostPort uvExceptionWithHostPort
} = require('internal/errors'); } = require('internal/errors');
const { isUint8Array } = require('internal/util/types');
const { validateInt32, validateString } = require('internal/validators'); const { validateInt32, validateString } = require('internal/validators');
const kLastWriteQueueSize = Symbol('lastWriteQueueSize'); const kLastWriteQueueSize = Symbol('lastWriteQueueSize');
const { const {
@ -225,6 +229,18 @@ function initSocketHandle(self) {
self._handle[owner_symbol] = self; self._handle[owner_symbol] = self;
self._handle.onread = onStreamRead; self._handle.onread = onStreamRead;
self[async_id_symbol] = getNewAsyncId(self._handle); self[async_id_symbol] = getNewAsyncId(self._handle);
let userBuf = self[kBuffer];
if (userBuf) {
const bufGen = self[kBufferGen];
if (bufGen !== null) {
userBuf = bufGen();
if (!isUint8Array(userBuf))
return;
self[kBuffer] = userBuf;
}
self._handle.useUserBuffer(userBuf);
}
} }
} }
@ -247,6 +263,9 @@ function Socket(options) {
this._host = null; this._host = null;
this[kLastWriteQueueSize] = 0; this[kLastWriteQueueSize] = 0;
this[kTimeout] = null; this[kTimeout] = null;
this[kBuffer] = null;
this[kBufferCb] = null;
this[kBufferGen] = null;
if (typeof options === 'number') if (typeof options === 'number')
options = { fd: options }; // Legacy interface. options = { fd: options }; // Legacy interface.
@ -271,40 +290,55 @@ function Socket(options) {
if (options.handle) { if (options.handle) {
this._handle = options.handle; // private this._handle = options.handle; // private
this[async_id_symbol] = getNewAsyncId(this._handle); this[async_id_symbol] = getNewAsyncId(this._handle);
} else if (options.fd !== undefined) { } else {
const { fd } = options; const onread = options.onread;
let err; if (onread !== null && typeof onread === 'object' &&
(isUint8Array(onread.buffer) || typeof onread.buffer === 'function') &&
typeof onread.callback === 'function') {
if (typeof onread.buffer === 'function') {
this[kBuffer] = true;
this[kBufferGen] = onread.buffer;
} else {
this[kBuffer] = onread.buffer;
}
this[kBufferCb] = onread.callback;
}
if (options.fd !== undefined) {
const { fd } = options;
let err;
// createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not // createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not
// a valid `PIPE` or `TCP` descriptor // a valid `PIPE` or `TCP` descriptor
this._handle = createHandle(fd, false); this._handle = createHandle(fd, false);
err = this._handle.open(fd); err = this._handle.open(fd);
// While difficult to fabricate, in some architectures // While difficult to fabricate, in some architectures
// `open` may return an error code for valid file descriptors // `open` may return an error code for valid file descriptors
// which cannot be opened. This is difficult to test as most // which cannot be opened. This is difficult to test as most
// un-openable fds will throw on `createHandle` // un-openable fds will throw on `createHandle`
if (err)
throw errnoException(err, 'open');
this[async_id_symbol] = this._handle.getAsyncId();
if ((fd === 1 || fd === 2) &&
(this._handle instanceof Pipe) &&
process.platform === 'win32') {
// Make stdout and stderr blocking on Windows
err = this._handle.setBlocking(true);
if (err) if (err)
throw errnoException(err, 'setBlocking'); throw errnoException(err, 'open');
this._writev = null; this[async_id_symbol] = this._handle.getAsyncId();
this._write = makeSyncWrite(fd);
// makeSyncWrite adjusts this value like the original handle would, so if ((fd === 1 || fd === 2) &&
// we need to let it do that by turning it into a writable, own property. (this._handle instanceof Pipe) &&
Object.defineProperty(this._handle, 'bytesWritten', { process.platform === 'win32') {
value: 0, writable: true // Make stdout and stderr blocking on Windows
}); err = this._handle.setBlocking(true);
if (err)
throw errnoException(err, 'setBlocking');
this._writev = null;
this._write = makeSyncWrite(fd);
// makeSyncWrite adjusts this value like the original handle would, so
// we need to let it do that by turning it into a writable, own
// property.
Object.defineProperty(this._handle, 'bytesWritten', {
value: 0, writable: true
});
}
} }
} }
@ -514,6 +548,15 @@ Object.defineProperty(Socket.prototype, kUpdateTimer, {
}); });
function tryReadStart(socket) {
// Not already reading, start the flow
debug('Socket._handle.readStart');
socket._handle.reading = true;
var err = socket._handle.readStart();
if (err)
socket.destroy(errnoException(err, 'read'));
}
// Just call handle.readStart until we have enough in the buffer // Just call handle.readStart until we have enough in the buffer
Socket.prototype._read = function(n) { Socket.prototype._read = function(n) {
debug('_read'); debug('_read');
@ -522,12 +565,7 @@ Socket.prototype._read = function(n) {
debug('_read wait for connection'); debug('_read wait for connection');
this.once('connect', () => this._read(n)); this.once('connect', () => this._read(n));
} else if (!this._handle.reading) { } else if (!this._handle.reading) {
// Not already reading, start the flow tryReadStart(this);
debug('Socket._read readStart');
this._handle.reading = true;
var err = this._handle.readStart();
if (err)
this.destroy(errnoException(err, 'read'));
} }
}; };
@ -539,6 +577,38 @@ Socket.prototype.end = function(data, encoding, callback) {
}; };
Socket.prototype.pause = function() {
if (this[kBuffer] && !this.connecting && this._handle &&
this._handle.reading) {
this._handle.reading = false;
if (!this.destroyed) {
const err = this._handle.readStop();
if (err)
this.destroy(errnoException(err, 'read'));
}
}
return stream.Duplex.prototype.pause.call(this);
};
Socket.prototype.resume = function() {
if (this[kBuffer] && !this.connecting && this._handle &&
!this._handle.reading) {
tryReadStart(this);
}
return stream.Duplex.prototype.resume.call(this);
};
Socket.prototype.read = function(n) {
if (this[kBuffer] && !this.connecting && this._handle &&
!this._handle.reading) {
tryReadStart(this);
}
return stream.Duplex.prototype.read.call(this, n);
};
// Called when the 'end' event is emitted. // Called when the 'end' event is emitted.
function onReadableStreamEnd() { function onReadableStreamEnd() {
if (!this.allowHalfOpen) { if (!this.allowHalfOpen) {

View File

@ -26,6 +26,7 @@ using v8::FunctionCallbackInfo;
using v8::HandleScope; using v8::HandleScope;
using v8::Integer; using v8::Integer;
using v8::Local; using v8::Local;
using v8::MaybeLocal;
using v8::Object; using v8::Object;
using v8::ReadOnly; using v8::ReadOnly;
using v8::String; using v8::String;
@ -50,6 +51,13 @@ int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
return ReadStop(); return ReadStop();
} }
int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
CHECK(Buffer::HasInstance(args[0]));
uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
PushStreamListener(new CustomBufferJSListener(buf));
return 0;
}
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) { int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
CHECK(args[0]->IsObject()); CHECK(args[0]->IsObject());
@ -291,19 +299,22 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
} }
void StreamBase::CallJSOnreadMethod(ssize_t nread, MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
Local<ArrayBuffer> ab, Local<ArrayBuffer> ab,
size_t offset) { size_t offset,
StreamBaseJSChecks checks) {
Environment* env = env_; Environment* env = env_;
DCHECK_EQ(static_cast<int32_t>(nread), nread); DCHECK_EQ(static_cast<int32_t>(nread), nread);
DCHECK_LE(offset, INT32_MAX); DCHECK_LE(offset, INT32_MAX);
if (ab.IsEmpty()) { if (checks == DONT_SKIP_NREAD_CHECKS) {
DCHECK_EQ(offset, 0); if (ab.IsEmpty()) {
DCHECK_LE(nread, 0); DCHECK_EQ(offset, 0);
} else { DCHECK_LE(nread, 0);
DCHECK_GE(nread, 0); } else {
DCHECK_GE(nread, 0);
}
} }
env->stream_base_state()[kReadBytesOrError] = nread; env->stream_base_state()[kReadBytesOrError] = nread;
@ -317,7 +328,7 @@ void StreamBase::CallJSOnreadMethod(ssize_t nread,
CHECK_NOT_NULL(wrap); CHECK_NOT_NULL(wrap);
Local<Value> onread = wrap->object()->GetInternalField(kOnReadFunctionField); Local<Value> onread = wrap->object()->GetInternalField(kOnReadFunctionField);
CHECK(onread->IsFunction()); CHECK(onread->IsFunction());
wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv); return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
} }
@ -366,6 +377,9 @@ void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
env->SetProtoMethod(t, "readStart", JSMethod<&StreamBase::ReadStartJS>); env->SetProtoMethod(t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
env->SetProtoMethod(t, "readStop", JSMethod<&StreamBase::ReadStopJS>); env->SetProtoMethod(t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
env->SetProtoMethod(t, "shutdown", JSMethod<&StreamBase::Shutdown>); env->SetProtoMethod(t, "shutdown", JSMethod<&StreamBase::Shutdown>);
env->SetProtoMethod(t,
"useUserBuffer",
JSMethod<&StreamBase::UseUserBuffer>);
env->SetProtoMethod(t, "writev", JSMethod<&StreamBase::Writev>); env->SetProtoMethod(t, "writev", JSMethod<&StreamBase::Writev>);
env->SetProtoMethod(t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>); env->SetProtoMethod(t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
env->SetProtoMethod( env->SetProtoMethod(
@ -445,6 +459,7 @@ void StreamResource::ClearError() {
// No-op // No-op
} }
uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) { uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
CHECK_NOT_NULL(stream_); CHECK_NOT_NULL(stream_);
Environment* env = static_cast<StreamBase*>(stream_)->stream_env(); Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
@ -472,6 +487,32 @@ void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
} }
uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
return buffer_;
}
void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
CHECK_NOT_NULL(stream_);
CHECK_EQ(buf.base, buffer_.base);
StreamBase* stream = static_cast<StreamBase*>(stream_);
Environment* env = stream->stream_env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread,
Local<ArrayBuffer>(),
0,
StreamBase::SKIP_NREAD_CHECKS);
Local<Value> next_buf_v;
if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) {
buffer_.base = Buffer::Data(next_buf_v);
buffer_.len = Buffer::Length(next_buf_v);
}
}
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished( void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
StreamReq* req_wrap, int status) { StreamReq* req_wrap, int status) {
StreamBase* stream = static_cast<StreamBase*>(stream_); StreamBase* stream = static_cast<StreamBase*>(stream_);

View File

@ -180,6 +180,21 @@ class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
}; };
// An alternative listener that uses a custom, user-provided buffer
// for reading data.
class CustomBufferJSListener : public ReportWritesToJSStreamListener {
public:
uv_buf_t OnStreamAlloc(size_t suggested_size) override;
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
void OnStreamDestroy() override { delete this; }
explicit CustomBufferJSListener(uv_buf_t buffer) : buffer_(buffer) {}
private:
uv_buf_t buffer_;
};
// A generic stream, comparable to JS lands `Duplex` streams. // A generic stream, comparable to JS lands `Duplex` streams.
// A stream is always controlled through one `StreamListener` instance. // A stream is always controlled through one `StreamListener` instance.
class StreamResource { class StreamResource {
@ -273,9 +288,13 @@ class StreamBase : public StreamResource {
virtual bool IsIPCPipe(); virtual bool IsIPCPipe();
virtual int GetFD(); virtual int GetFD();
void CallJSOnreadMethod(ssize_t nread, enum StreamBaseJSChecks { DONT_SKIP_NREAD_CHECKS, SKIP_NREAD_CHECKS };
v8::Local<v8::ArrayBuffer> ab,
size_t offset = 0); v8::MaybeLocal<v8::Value> CallJSOnreadMethod(
ssize_t nread,
v8::Local<v8::ArrayBuffer> ab,
size_t offset = 0,
StreamBaseJSChecks checks = DONT_SKIP_NREAD_CHECKS);
// This is named `stream_env` to avoid name clashes, because a lot of // This is named `stream_env` to avoid name clashes, because a lot of
// subclasses are also `BaseObject`s. // subclasses are also `BaseObject`s.
@ -323,6 +342,7 @@ class StreamBase : public StreamResource {
int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args); int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
template <enum encoding enc> template <enum encoding enc>
int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args); int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
int UseUserBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args); static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args); static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args);

View File

@ -0,0 +1,186 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const net = require('net');
const message = Buffer.from('hello world');
// Test typical usage
net.createServer(common.mustCall(function(socket) {
this.close();
socket.end(message);
})).listen(0, function() {
let received = 0;
const buffers = [];
const sockBuf = Buffer.alloc(8);
net.connect({
port: this.address().port,
onread: {
buffer: sockBuf,
callback: function(nread, buf) {
assert.strictEqual(buf, sockBuf);
received += nread;
buffers.push(Buffer.from(buf.slice(0, nread)));
}
}
}).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
assert.strictEqual(received, message.length);
assert.deepStrictEqual(Buffer.concat(buffers), message);
}));
});
// Test Uint8Array support
net.createServer(common.mustCall(function(socket) {
this.close();
socket.end(message);
})).listen(0, function() {
let received = 0;
let incoming = new Uint8Array(0);
const sockBuf = new Uint8Array(8);
net.connect({
port: this.address().port,
onread: {
buffer: sockBuf,
callback: function(nread, buf) {
assert.strictEqual(buf, sockBuf);
received += nread;
const newIncoming = new Uint8Array(incoming.length + nread);
newIncoming.set(incoming);
newIncoming.set(buf.slice(0, nread), incoming.length);
incoming = newIncoming;
}
}
}).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
assert.strictEqual(received, message.length);
assert.deepStrictEqual(incoming, new Uint8Array(message));
}));
});
// Test Buffer callback usage
net.createServer(common.mustCall(function(socket) {
this.close();
socket.end(message);
})).listen(0, function() {
let received = 0;
const incoming = [];
const bufPool = [ Buffer.alloc(2), Buffer.alloc(2), Buffer.alloc(2) ];
let bufPoolIdx = -1;
let bufPoolUsage = 0;
net.connect({
port: this.address().port,
onread: {
buffer: () => {
++bufPoolUsage;
bufPoolIdx = (bufPoolIdx + 1) % bufPool.length;
return bufPool[bufPoolIdx];
},
callback: function(nread, buf) {
assert.strictEqual(buf, bufPool[bufPoolIdx]);
received += nread;
incoming.push(Buffer.from(buf.slice(0, nread)));
}
}
}).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
assert.strictEqual(received, message.length);
assert.deepStrictEqual(Buffer.concat(incoming), message);
assert.strictEqual(bufPoolUsage, 7);
}));
});
// Test Uint8Array callback support
net.createServer(common.mustCall(function(socket) {
this.close();
socket.end(message);
})).listen(0, function() {
let received = 0;
let incoming = new Uint8Array(0);
const bufPool = [ new Uint8Array(2), new Uint8Array(2), new Uint8Array(2) ];
let bufPoolIdx = -1;
let bufPoolUsage = 0;
net.connect({
port: this.address().port,
onread: {
buffer: () => {
++bufPoolUsage;
bufPoolIdx = (bufPoolIdx + 1) % bufPool.length;
return bufPool[bufPoolIdx];
},
callback: function(nread, buf) {
assert.strictEqual(buf, bufPool[bufPoolIdx]);
received += nread;
const newIncoming = new Uint8Array(incoming.length + nread);
newIncoming.set(incoming);
newIncoming.set(buf.slice(0, nread), incoming.length);
incoming = newIncoming;
}
}
}).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
assert.strictEqual(received, message.length);
assert.deepStrictEqual(incoming, new Uint8Array(message));
assert.strictEqual(bufPoolUsage, 7);
}));
});
// Test explicit socket pause
net.createServer(common.mustCall(function(socket) {
this.close();
socket.end(message);
})).listen(0, function() {
let received = 0;
const buffers = [];
const sockBuf = Buffer.alloc(8);
let paused = false;
net.connect({
port: this.address().port,
onread: {
buffer: sockBuf,
callback: function(nread, buf) {
assert.strictEqual(paused, false);
assert.strictEqual(buf, sockBuf);
received += nread;
buffers.push(Buffer.from(buf.slice(0, nread)));
paused = true;
this.pause();
setTimeout(() => {
paused = false;
this.resume();
}, 100);
}
}
}).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
assert.strictEqual(received, message.length);
assert.deepStrictEqual(Buffer.concat(buffers), message);
}));
});
// Test implicit socket pause
net.createServer(common.mustCall(function(socket) {
this.close();
socket.end(message);
})).listen(0, function() {
let received = 0;
const buffers = [];
const sockBuf = Buffer.alloc(8);
let paused = false;
net.connect({
port: this.address().port,
onread: {
buffer: sockBuf,
callback: function(nread, buf) {
assert.strictEqual(paused, false);
assert.strictEqual(buf, sockBuf);
received += nread;
buffers.push(Buffer.from(buf.slice(0, nread)));
paused = true;
setTimeout(() => {
paused = false;
this.resume();
}, 100);
return false;
}
}
}).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
assert.strictEqual(received, message.length);
assert.deepStrictEqual(Buffer.concat(buffers), message);
}));
});