stream: add destroy and _destroy methods.
Adds destroy() and _destroy() methods to Readable, Writable, Duplex and Transform. It also standardizes the behavior and the implementation of destroy(), which has been inconsistent in userland and core. This PR also updates all the subsystems of core to use the new destroy(). PR-URL: https://github.com/nodejs/node/pull/12925 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Calvin Metcalf <calvin.metcalf@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
This commit is contained in:
parent
d54ec726cc
commit
330c8d743e
@ -499,6 +499,15 @@ write('hello', () => {
|
||||
|
||||
A Writable stream in object mode will always ignore the `encoding` argument.
|
||||
|
||||
##### writable.destroy([error])
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
Destroy the stream, and emit the passed error. After this call, the
|
||||
writible stream has ended. Implementors should not override this method,
|
||||
but instead implement [`writable._destroy`][writable-_destroy].
|
||||
|
||||
### Readable Streams
|
||||
|
||||
Readable streams are an abstraction for a *source* from which data is
|
||||
@ -1070,6 +1079,16 @@ myReader.on('readable', () => {
|
||||
});
|
||||
```
|
||||
|
||||
##### readable.destroy([error])
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
Destroy the stream, and emit `'error'`. After this call, the
|
||||
readable stream will release any internal resources.
|
||||
Implementors should not override this method, but instead implement
|
||||
[`readable._destroy`][readable-_destroy].
|
||||
|
||||
### Duplex and Transform Streams
|
||||
|
||||
#### Class: stream.Duplex
|
||||
@ -1109,6 +1128,16 @@ Examples of Transform streams include:
|
||||
* [zlib streams][zlib]
|
||||
* [crypto streams][crypto]
|
||||
|
||||
##### transform.destroy([error])
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
Destroy the stream, and emit `'error'`. After this call, the
|
||||
transform stream would release any internal resources.
|
||||
implementors should not override this method, but instead implement
|
||||
[`readable._destroy`][readable-_destroy].
|
||||
The default implementation of `_destroy` for `Transform` also emit `'close'`.
|
||||
|
||||
## API for Stream Implementers
|
||||
|
||||
@ -1247,6 +1276,8 @@ constructor and implement the `writable._write()` method. The
|
||||
[`stream._write()`][stream-_write] method.
|
||||
* `writev` {Function} Implementation for the
|
||||
[`stream._writev()`][stream-_writev] method.
|
||||
* `destroy` {Function} Implementation for the
|
||||
[`stream._destroy()`][writable-_destroy] method.
|
||||
|
||||
For example:
|
||||
|
||||
@ -1356,6 +1387,15 @@ The `writable._writev()` method is prefixed with an underscore because it is
|
||||
internal to the class that defines it, and should never be called directly by
|
||||
user programs.
|
||||
|
||||
#### writable.\_destroy(err, callback)
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
* `err` {Error} An error.
|
||||
* `callback` {Function} A callback function that takes an optional error argument
|
||||
which is invoked when the writable is destroyed.
|
||||
|
||||
#### Errors While Writing
|
||||
|
||||
It is recommended that errors occurring during the processing of the
|
||||
@ -1425,6 +1465,8 @@ constructor and implement the `readable._read()` method.
|
||||
a single value instead of a Buffer of size n. Defaults to `false`
|
||||
* `read` {Function} Implementation for the [`stream._read()`][stream-_read]
|
||||
method.
|
||||
* `destroy` {Function} Implementation for the [`stream._destroy()`][readable-_destroy]
|
||||
method.
|
||||
|
||||
For example:
|
||||
|
||||
@ -2073,4 +2115,8 @@ readable buffer so there is nothing for a user to consume.
|
||||
[stream-read]: #stream_readable_read_size
|
||||
[stream-resume]: #stream_readable_resume
|
||||
[stream-write]: #stream_writable_write_chunk_encoding_callback
|
||||
[zlib]: zlib.html
|
||||
[readable-_destroy]: #stream_readable_destroy_err_callback
|
||||
[writable-_destroy]: #stream_writable_destroy_err_callback
|
||||
[TCP sockets]: net.html#net_class_net_socket
|
||||
[Transform]: #stream_class_stream_transform
|
||||
[Writable]: #stream_class_stream_writable
|
||||
|
@ -76,3 +76,33 @@ function onend() {
|
||||
function onEndNT(self) {
|
||||
self.end();
|
||||
}
|
||||
|
||||
Object.defineProperty(Duplex.prototype, 'destroyed', {
|
||||
get() {
|
||||
if (this._readableState === undefined ||
|
||||
this._writableState === undefined) {
|
||||
return false;
|
||||
}
|
||||
return this._readableState.destroyed && this._writableState.destroyed;
|
||||
},
|
||||
set(value) {
|
||||
// we ignore the value if the stream
|
||||
// has not been initialized yet
|
||||
if (this._readableState === undefined ||
|
||||
this._writableState === undefined) {
|
||||
return;
|
||||
}
|
||||
|
||||
// backward compatibility, the user is explicitly
|
||||
// managing destroyed
|
||||
this._readableState.destroyed = value;
|
||||
this._writableState.destroyed = value;
|
||||
}
|
||||
});
|
||||
|
||||
Duplex.prototype._destroy = function(err, cb) {
|
||||
this.push(null);
|
||||
this.end();
|
||||
|
||||
process.nextTick(cb, err);
|
||||
};
|
||||
|
@ -30,6 +30,7 @@ const Buffer = require('buffer').Buffer;
|
||||
const util = require('util');
|
||||
const debug = util.debuglog('stream');
|
||||
const BufferList = require('internal/streams/BufferList');
|
||||
const destroyImpl = require('internal/streams/destroy');
|
||||
var StringDecoder;
|
||||
|
||||
util.inherits(Readable, Stream);
|
||||
@ -99,6 +100,9 @@ function ReadableState(options, stream) {
|
||||
this.readableListening = false;
|
||||
this.resumeScheduled = false;
|
||||
|
||||
// has it been destroyed
|
||||
this.destroyed = false;
|
||||
|
||||
// Crypto is kind of old and crusty. Historically, its default string
|
||||
// encoding is 'binary' so we have to make this configurable.
|
||||
// Everything else in the universe uses 'utf8', though.
|
||||
@ -129,12 +133,44 @@ function Readable(options) {
|
||||
// legacy
|
||||
this.readable = true;
|
||||
|
||||
if (options && typeof options.read === 'function')
|
||||
this._read = options.read;
|
||||
if (options) {
|
||||
if (typeof options.read === 'function')
|
||||
this._read = options.read;
|
||||
|
||||
if (typeof options.destroy === 'function')
|
||||
this._destroy = options.destroy;
|
||||
}
|
||||
|
||||
Stream.call(this);
|
||||
}
|
||||
|
||||
Object.defineProperty(Readable.prototype, 'destroyed', {
|
||||
get() {
|
||||
if (this._readableState === undefined) {
|
||||
return false;
|
||||
}
|
||||
return this._readableState.destroyed;
|
||||
},
|
||||
set(value) {
|
||||
// we ignore the value if the stream
|
||||
// has not been initialized yet
|
||||
if (!this._readableState) {
|
||||
return;
|
||||
}
|
||||
|
||||
// backward compatibility, the user is explicitly
|
||||
// managing destroyed
|
||||
this._readableState.destroyed = value;
|
||||
}
|
||||
});
|
||||
|
||||
Readable.prototype.destroy = destroyImpl.destroy;
|
||||
Readable.prototype._undestroy = destroyImpl.undestroy;
|
||||
Readable.prototype._destroy = function(err, cb) {
|
||||
this.push(null);
|
||||
cb(err);
|
||||
};
|
||||
|
||||
// Manually shove something into the read() buffer.
|
||||
// This returns true if the highWaterMark has not been hit yet,
|
||||
// similar to how Writable.write() returns true if you should
|
||||
|
@ -194,6 +194,14 @@ Transform.prototype._read = function(n) {
|
||||
};
|
||||
|
||||
|
||||
Transform.prototype._destroy = function(err, cb) {
|
||||
Duplex.prototype._destroy.call(this, err, (err2) => {
|
||||
cb(err2);
|
||||
this.emit('close');
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
function done(stream, er, data) {
|
||||
if (er)
|
||||
return stream.emit('error', er);
|
||||
|
@ -32,6 +32,7 @@ const util = require('util');
|
||||
const internalUtil = require('internal/util');
|
||||
const Stream = require('stream');
|
||||
const Buffer = require('buffer').Buffer;
|
||||
const destroyImpl = require('internal/streams/destroy');
|
||||
|
||||
util.inherits(Writable, Stream);
|
||||
|
||||
@ -66,6 +67,9 @@ function WritableState(options, stream) {
|
||||
// when 'finish' is emitted
|
||||
this.finished = false;
|
||||
|
||||
// has it been destroyed
|
||||
this.destroyed = false;
|
||||
|
||||
// should we decode strings into buffers before passing to _write?
|
||||
// this is here so that some node-core streams can optimize string
|
||||
// handling at a lower level.
|
||||
@ -192,6 +196,9 @@ function Writable(options) {
|
||||
|
||||
if (typeof options.writev === 'function')
|
||||
this._writev = options.writev;
|
||||
|
||||
if (typeof options.destroy === 'function')
|
||||
this._destroy = options.destroy;
|
||||
}
|
||||
|
||||
Stream.call(this);
|
||||
@ -563,3 +570,30 @@ function onCorkedFinish(corkReq, state, err) {
|
||||
state.corkedRequestsFree = corkReq;
|
||||
}
|
||||
}
|
||||
|
||||
Object.defineProperty(Writable.prototype, 'destroyed', {
|
||||
get() {
|
||||
if (this._writableState === undefined) {
|
||||
return false;
|
||||
}
|
||||
return this._writableState.destroyed;
|
||||
},
|
||||
set(value) {
|
||||
// we ignore the value if the stream
|
||||
// has not been initialized yet
|
||||
if (!this._writableState) {
|
||||
return;
|
||||
}
|
||||
|
||||
// backward compatibility, the user is explicitly
|
||||
// managing destroyed
|
||||
this._writableState.destroyed = value;
|
||||
}
|
||||
});
|
||||
|
||||
Writable.prototype.destroy = destroyImpl.destroy;
|
||||
Writable.prototype._undestroy = destroyImpl.undestroy;
|
||||
Writable.prototype._destroy = function(err, cb) {
|
||||
this.end();
|
||||
cb(err);
|
||||
};
|
||||
|
11
lib/fs.js
11
lib/fs.js
@ -1986,11 +1986,10 @@ ReadStream.prototype._read = function(n) {
|
||||
};
|
||||
|
||||
|
||||
ReadStream.prototype.destroy = function() {
|
||||
if (this.destroyed)
|
||||
return;
|
||||
this.destroyed = true;
|
||||
this.close();
|
||||
ReadStream.prototype._destroy = function(err, cb) {
|
||||
this.close(function(err2) {
|
||||
cb(err || err2);
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
@ -2157,7 +2156,7 @@ WriteStream.prototype._writev = function(data, cb) {
|
||||
};
|
||||
|
||||
|
||||
WriteStream.prototype.destroy = ReadStream.prototype.destroy;
|
||||
WriteStream.prototype._destroy = ReadStream.prototype._destroy;
|
||||
WriteStream.prototype.close = ReadStream.prototype.close;
|
||||
|
||||
// There is no shutdown() for files.
|
||||
|
@ -18,10 +18,12 @@ function setupStdio() {
|
||||
function getStdout() {
|
||||
if (stdout) return stdout;
|
||||
stdout = createWritableStdioStream(1);
|
||||
stdout.destroy = stdout.destroySoon = function(er) {
|
||||
stdout.destroySoon = stdout.destroy;
|
||||
stdout._destroy = function(er, cb) {
|
||||
// avoid errors if we already emitted
|
||||
const errors = lazyErrors();
|
||||
er = er || new errors.Error('ERR_STDOUT_CLOSE');
|
||||
stdout.emit('error', er);
|
||||
cb(er);
|
||||
};
|
||||
if (stdout.isTTY) {
|
||||
process.on('SIGWINCH', () => stdout._refreshSize());
|
||||
@ -32,10 +34,12 @@ function setupStdio() {
|
||||
function getStderr() {
|
||||
if (stderr) return stderr;
|
||||
stderr = createWritableStdioStream(2);
|
||||
stderr.destroy = stderr.destroySoon = function(er) {
|
||||
stderr.destroySoon = stderr.destroy;
|
||||
stderr._destroy = function(er, cb) {
|
||||
// avoid errors if we already emitted
|
||||
const errors = lazyErrors();
|
||||
er = er || new errors.Error('ERR_STDERR_CLOSE');
|
||||
stderr.emit('error', er);
|
||||
cb(er);
|
||||
};
|
||||
if (stderr.isTTY) {
|
||||
process.on('SIGWINCH', () => stderr._refreshSize());
|
||||
|
65
lib/internal/streams/destroy.js
Normal file
65
lib/internal/streams/destroy.js
Normal file
@ -0,0 +1,65 @@
|
||||
'use strict';
|
||||
|
||||
// undocumented cb() API, needed for core, not for public API
|
||||
function destroy(err, cb) {
|
||||
const readableDestroyed = this._readableState &&
|
||||
this._readableState.destroyed;
|
||||
const writableDestroyed = this._writableState &&
|
||||
this._writableState.destroyed;
|
||||
|
||||
if (readableDestroyed || writableDestroyed) {
|
||||
if (err && (!this._writableState || !this._writableState.errorEmitted)) {
|
||||
process.nextTick(emitErrorNT, this, err);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// we set destroyed to true before firing error callbacks in order
|
||||
// to make it re-entrance safe in case destroy() is called within callbacks
|
||||
|
||||
if (this._readableState) {
|
||||
this._readableState.destroyed = true;
|
||||
}
|
||||
|
||||
// if this is a duplex stream mark the writable part as destroyed as well
|
||||
if (this._writableState) {
|
||||
this._writableState.destroyed = true;
|
||||
}
|
||||
|
||||
this._destroy(err || null, (err) => {
|
||||
if (!cb && err) {
|
||||
process.nextTick(emitErrorNT, this, err);
|
||||
if (this._writableState) {
|
||||
this._writableState.errorEmitted = true;
|
||||
}
|
||||
} else if (cb) {
|
||||
cb(err);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function undestroy() {
|
||||
if (this._readableState) {
|
||||
this._readableState.destroyed = false;
|
||||
this._readableState.reading = false;
|
||||
this._readableState.ended = false;
|
||||
this._readableState.endEmitted = false;
|
||||
}
|
||||
|
||||
if (this._writableState) {
|
||||
this._writableState.destroyed = false;
|
||||
this._writableState.ended = false;
|
||||
this._writableState.ending = false;
|
||||
this._writableState.finished = false;
|
||||
this._writableState.errorEmitted = false;
|
||||
}
|
||||
}
|
||||
|
||||
function emitErrorNT(self, err) {
|
||||
self.emit('error', err);
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
destroy,
|
||||
undestroy
|
||||
};
|
62
lib/net.js
62
lib/net.js
@ -156,7 +156,7 @@ function normalizeArgs(args) {
|
||||
|
||||
// called when creating new Socket, or when re-using a closed Socket
|
||||
function initSocketHandle(self) {
|
||||
self.destroyed = false;
|
||||
self._undestroy();
|
||||
self._bytesDispatched = 0;
|
||||
self._sockname = null;
|
||||
|
||||
@ -295,7 +295,7 @@ function onSocketFinish() {
|
||||
var err = this._handle.shutdown(req);
|
||||
|
||||
if (err)
|
||||
return this._destroy(errnoException(err, 'shutdown'));
|
||||
return this.destroy(errnoException(err, 'shutdown'));
|
||||
}
|
||||
|
||||
|
||||
@ -481,7 +481,7 @@ Socket.prototype._read = function(n) {
|
||||
this._handle.reading = true;
|
||||
var err = this._handle.readStart();
|
||||
if (err)
|
||||
this._destroy(errnoException(err, 'read'));
|
||||
this.destroy(errnoException(err, 'read'));
|
||||
}
|
||||
};
|
||||
|
||||
@ -526,20 +526,6 @@ Socket.prototype.destroySoon = function() {
|
||||
Socket.prototype._destroy = function(exception, cb) {
|
||||
debug('destroy');
|
||||
|
||||
function fireErrorCallbacks(self, exception, cb) {
|
||||
if (cb) cb(exception);
|
||||
if (exception && !self._writableState.errorEmitted) {
|
||||
process.nextTick(emitErrorNT, self, exception);
|
||||
self._writableState.errorEmitted = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (this.destroyed) {
|
||||
debug('already destroyed, fire error callbacks');
|
||||
fireErrorCallbacks(this, exception, cb);
|
||||
return;
|
||||
}
|
||||
|
||||
this.connecting = false;
|
||||
|
||||
this.readable = this.writable = false;
|
||||
@ -564,11 +550,7 @@ Socket.prototype._destroy = function(exception, cb) {
|
||||
this._sockname = null;
|
||||
}
|
||||
|
||||
// we set destroyed to true before firing error callbacks in order
|
||||
// to make it re-entrance safe in case Socket.prototype.destroy()
|
||||
// is called within callbacks
|
||||
this.destroyed = true;
|
||||
fireErrorCallbacks(this, exception, cb);
|
||||
cb(exception);
|
||||
|
||||
if (this._server) {
|
||||
COUNTER_NET_SERVER_CONNECTION_CLOSE(this);
|
||||
@ -581,12 +563,6 @@ Socket.prototype._destroy = function(exception, cb) {
|
||||
};
|
||||
|
||||
|
||||
Socket.prototype.destroy = function(exception) {
|
||||
debug('destroy', exception);
|
||||
this._destroy(exception);
|
||||
};
|
||||
|
||||
|
||||
// This function is called whenever the handle gets a
|
||||
// buffer, or when there's an error reading.
|
||||
function onread(nread, buffer) {
|
||||
@ -614,7 +590,7 @@ function onread(nread, buffer) {
|
||||
debug('readStop');
|
||||
var err = handle.readStop();
|
||||
if (err)
|
||||
self._destroy(errnoException(err, 'read'));
|
||||
self.destroy(errnoException(err, 'read'));
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -628,7 +604,7 @@ function onread(nread, buffer) {
|
||||
|
||||
// Error, possibly EOF.
|
||||
if (nread !== uv.UV_EOF) {
|
||||
return self._destroy(errnoException(nread, 'read'));
|
||||
return self.destroy(errnoException(nread, 'read'));
|
||||
}
|
||||
|
||||
debug('EOF');
|
||||
@ -739,7 +715,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
|
||||
this._unrefTimer();
|
||||
|
||||
if (!this._handle) {
|
||||
this._destroy(new Error('This socket is closed'), cb);
|
||||
this.destroy(new Error('This socket is closed'), cb);
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -771,7 +747,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
|
||||
}
|
||||
|
||||
if (err)
|
||||
return this._destroy(errnoException(err, 'write', req.error), cb);
|
||||
return this.destroy(errnoException(err, 'write', req.error), cb);
|
||||
|
||||
this._bytesDispatched += req.bytes;
|
||||
|
||||
@ -862,7 +838,7 @@ function afterWrite(status, handle, req, err) {
|
||||
if (status < 0) {
|
||||
var ex = errnoException(status, 'write', req.error);
|
||||
debug('write failure', ex);
|
||||
self._destroy(ex, req.cb);
|
||||
self.destroy(ex, req.cb);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -896,13 +872,13 @@ function internalConnect(
|
||||
localAddress = localAddress || '::';
|
||||
err = self._handle.bind6(localAddress, localPort);
|
||||
} else {
|
||||
self._destroy(new TypeError('Invalid addressType: ' + addressType));
|
||||
self.destroy(new TypeError('Invalid addressType: ' + addressType));
|
||||
return;
|
||||
}
|
||||
|
||||
if (err) {
|
||||
const ex = exceptionWithHostPort(err, 'bind', localAddress, localPort);
|
||||
self._destroy(ex);
|
||||
self.destroy(ex);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -944,7 +920,7 @@ function internalConnect(
|
||||
}
|
||||
|
||||
const ex = exceptionWithHostPort(err, 'connect', address, port, details);
|
||||
self._destroy(ex);
|
||||
self.destroy(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@ -971,14 +947,7 @@ Socket.prototype.connect = function() {
|
||||
this.write = Socket.prototype.write;
|
||||
|
||||
if (this.destroyed) {
|
||||
this._readableState.reading = false;
|
||||
this._readableState.ended = false;
|
||||
this._readableState.endEmitted = false;
|
||||
this._writableState.ended = false;
|
||||
this._writableState.ending = false;
|
||||
this._writableState.finished = false;
|
||||
this._writableState.errorEmitted = false;
|
||||
this.destroyed = false;
|
||||
this._undestroy();
|
||||
this._handle = null;
|
||||
this._peername = null;
|
||||
this._sockname = null;
|
||||
@ -1088,8 +1057,7 @@ function lookupAndConnect(self, options) {
|
||||
|
||||
|
||||
function connectErrorNT(self, err) {
|
||||
self.emit('error', err);
|
||||
self._destroy();
|
||||
self.destroy(err);
|
||||
}
|
||||
|
||||
|
||||
@ -1162,7 +1130,7 @@ function afterConnect(status, handle, req, readable, writable) {
|
||||
ex.localAddress = req.localAddress;
|
||||
ex.localPort = req.localPort;
|
||||
}
|
||||
self._destroy(ex);
|
||||
self.destroy(ex);
|
||||
}
|
||||
}
|
||||
|
||||
|
1
node.gyp
1
node.gyp
@ -105,6 +105,7 @@
|
||||
'lib/internal/streams/lazy_transform.js',
|
||||
'lib/internal/streams/BufferList.js',
|
||||
'lib/internal/streams/legacy.js',
|
||||
'lib/internal/streams/destroy.js',
|
||||
'deps/v8/tools/splaytree.js',
|
||||
'deps/v8/tools/codemap.js',
|
||||
'deps/v8/tools/consarray.js',
|
||||
|
31
test/parallel/test-process-external-stdio-close-spawn.js
Normal file
31
test/parallel/test-process-external-stdio-close-spawn.js
Normal file
@ -0,0 +1,31 @@
|
||||
'use strict';
|
||||
// Refs: https://github.com/nodejs/node/issues/947
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
const cp = require('child_process');
|
||||
|
||||
if (process.argv[2] === 'child') {
|
||||
process.on('message', common.mustCall((msg) => {
|
||||
assert.strictEqual(msg, 'go');
|
||||
// the following console.log is an integral part
|
||||
// of the test. If this regress, this call will
|
||||
// cause the process to exit with 1
|
||||
console.log('logging should not cause a crash');
|
||||
process.disconnect();
|
||||
}));
|
||||
} else {
|
||||
// Passing '--inspect', '--inspect-brk' to child.spawn enables
|
||||
// the debugger. This test was added to help debug the fork-based
|
||||
// test with the same name.
|
||||
const child = cp.spawn(process.execPath, [__filename, 'child'], {
|
||||
stdio: ['pipe', 'pipe', 'pipe', 'ipc']
|
||||
});
|
||||
|
||||
child.on('close', common.mustCall((exitCode, signal) => {
|
||||
assert.strictEqual(exitCode, 0, 'exit successfully');
|
||||
assert.strictEqual(signal, null);
|
||||
}));
|
||||
|
||||
child.stdout.destroy();
|
||||
child.send('go');
|
||||
}
|
@ -7,6 +7,9 @@ const cp = require('child_process');
|
||||
if (process.argv[2] === 'child') {
|
||||
process.on('message', common.mustCall((msg) => {
|
||||
assert.strictEqual(msg, 'go');
|
||||
// the following console.log is an integral part
|
||||
// of the test. If this regress, this call will
|
||||
// cause the process to exit with 1
|
||||
console.log('logging should not cause a crash');
|
||||
process.disconnect();
|
||||
}));
|
||||
|
194
test/parallel/test-stream-duplex-destroy.js
Normal file
194
test/parallel/test-stream-duplex-destroy.js
Normal file
@ -0,0 +1,194 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const { Duplex } = require('stream');
|
||||
const assert = require('assert');
|
||||
const { inherits } = require('util');
|
||||
|
||||
{
|
||||
const duplex = new Duplex({
|
||||
write(chunk, enc, cb) { cb(); },
|
||||
read() {}
|
||||
});
|
||||
|
||||
duplex.resume();
|
||||
|
||||
duplex.on('end', common.mustCall());
|
||||
duplex.on('finish', common.mustCall());
|
||||
|
||||
duplex.destroy();
|
||||
assert.strictEqual(duplex.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const duplex = new Duplex({
|
||||
write(chunk, enc, cb) { cb(); },
|
||||
read() {}
|
||||
});
|
||||
duplex.resume();
|
||||
|
||||
const expected = new Error('kaboom');
|
||||
|
||||
duplex.on('end', common.mustCall());
|
||||
duplex.on('finish', common.mustCall());
|
||||
duplex.on('error', common.mustCall((err) => {
|
||||
assert.strictEqual(err, expected);
|
||||
}));
|
||||
|
||||
duplex.destroy(expected);
|
||||
assert.strictEqual(duplex.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const duplex = new Duplex({
|
||||
write(chunk, enc, cb) { cb(); },
|
||||
read() {}
|
||||
});
|
||||
|
||||
duplex._destroy = common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, expected);
|
||||
cb(err);
|
||||
});
|
||||
|
||||
const expected = new Error('kaboom');
|
||||
|
||||
duplex.on('finish', common.mustNotCall('no finish event'));
|
||||
duplex.on('error', common.mustCall((err) => {
|
||||
assert.strictEqual(err, expected);
|
||||
}));
|
||||
|
||||
duplex.destroy(expected);
|
||||
assert.strictEqual(duplex.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const expected = new Error('kaboom');
|
||||
const duplex = new Duplex({
|
||||
write(chunk, enc, cb) { cb(); },
|
||||
read() {},
|
||||
destroy: common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, expected);
|
||||
cb();
|
||||
})
|
||||
});
|
||||
duplex.resume();
|
||||
|
||||
duplex.on('end', common.mustNotCall('no end event'));
|
||||
duplex.on('finish', common.mustNotCall('no finish event'));
|
||||
|
||||
// error is swallowed by the custom _destroy
|
||||
duplex.on('error', common.mustNotCall('no error event'));
|
||||
|
||||
duplex.destroy(expected);
|
||||
assert.strictEqual(duplex.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const duplex = new Duplex({
|
||||
write(chunk, enc, cb) { cb(); },
|
||||
read() {}
|
||||
});
|
||||
|
||||
duplex._destroy = common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, null);
|
||||
cb();
|
||||
});
|
||||
|
||||
duplex.destroy();
|
||||
assert.strictEqual(duplex.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const duplex = new Duplex({
|
||||
write(chunk, enc, cb) { cb(); },
|
||||
read() {}
|
||||
});
|
||||
duplex.resume();
|
||||
|
||||
duplex._destroy = common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, null);
|
||||
process.nextTick(() => {
|
||||
this.push(null);
|
||||
this.end();
|
||||
cb();
|
||||
});
|
||||
});
|
||||
|
||||
const fail = common.mustNotCall('no finish or end event');
|
||||
|
||||
duplex.on('finish', fail);
|
||||
duplex.on('end', fail);
|
||||
|
||||
duplex.destroy();
|
||||
|
||||
duplex.removeListener('end', fail);
|
||||
duplex.removeListener('finish', fail);
|
||||
duplex.on('end', common.mustCall());
|
||||
duplex.on('finish', common.mustCall());
|
||||
assert.strictEqual(duplex.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const duplex = new Duplex({
|
||||
write(chunk, enc, cb) { cb(); },
|
||||
read() {}
|
||||
});
|
||||
|
||||
const expected = new Error('kaboom');
|
||||
|
||||
duplex._destroy = common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, null);
|
||||
cb(expected);
|
||||
});
|
||||
|
||||
duplex.on('finish', common.mustNotCall('no finish event'));
|
||||
duplex.on('end', common.mustNotCall('no end event'));
|
||||
duplex.on('error', common.mustCall((err) => {
|
||||
assert.strictEqual(err, expected);
|
||||
}));
|
||||
|
||||
duplex.destroy();
|
||||
assert.strictEqual(duplex.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const duplex = new Duplex({
|
||||
write(chunk, enc, cb) { cb(); },
|
||||
read() {},
|
||||
allowHalfOpen: true
|
||||
});
|
||||
duplex.resume();
|
||||
|
||||
duplex.on('finish', common.mustCall());
|
||||
duplex.on('end', common.mustCall());
|
||||
|
||||
duplex.destroy();
|
||||
assert.strictEqual(duplex.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const duplex = new Duplex({
|
||||
write(chunk, enc, cb) { cb(); },
|
||||
read() {},
|
||||
});
|
||||
|
||||
duplex.destroyed = true;
|
||||
assert.strictEqual(duplex.destroyed, true);
|
||||
|
||||
// the internal destroy() mechanism should not be triggered
|
||||
duplex.on('finish', common.mustNotCall());
|
||||
duplex.on('end', common.mustNotCall());
|
||||
duplex.destroy();
|
||||
}
|
||||
|
||||
{
|
||||
function MyDuplex() {
|
||||
assert.strictEqual(this.destroyed, false);
|
||||
this.destroyed = false;
|
||||
Duplex.call(this);
|
||||
}
|
||||
|
||||
inherits(MyDuplex, Duplex);
|
||||
|
||||
new MyDuplex();
|
||||
}
|
162
test/parallel/test-stream-readable-destroy.js
Normal file
162
test/parallel/test-stream-readable-destroy.js
Normal file
@ -0,0 +1,162 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const { Readable } = require('stream');
|
||||
const assert = require('assert');
|
||||
const { inherits } = require('util');
|
||||
|
||||
{
|
||||
const read = new Readable({
|
||||
read() {}
|
||||
});
|
||||
read.resume();
|
||||
|
||||
read.on('end', common.mustCall());
|
||||
|
||||
read.destroy();
|
||||
assert.strictEqual(read.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const read = new Readable({
|
||||
read() {}
|
||||
});
|
||||
read.resume();
|
||||
|
||||
const expected = new Error('kaboom');
|
||||
|
||||
read.on('end', common.mustCall());
|
||||
read.on('error', common.mustCall((err) => {
|
||||
assert.strictEqual(err, expected);
|
||||
}));
|
||||
|
||||
read.destroy(expected);
|
||||
assert.strictEqual(read.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const read = new Readable({
|
||||
read() {}
|
||||
});
|
||||
|
||||
read._destroy = common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, expected);
|
||||
cb(err);
|
||||
});
|
||||
|
||||
const expected = new Error('kaboom');
|
||||
|
||||
read.on('end', common.mustNotCall('no end event'));
|
||||
read.on('error', common.mustCall((err) => {
|
||||
assert.strictEqual(err, expected);
|
||||
}));
|
||||
|
||||
read.destroy(expected);
|
||||
assert.strictEqual(read.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const read = new Readable({
|
||||
read() {},
|
||||
destroy: common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, expected);
|
||||
cb();
|
||||
})
|
||||
});
|
||||
|
||||
const expected = new Error('kaboom');
|
||||
|
||||
read.on('end', common.mustNotCall('no end event'));
|
||||
|
||||
// error is swallowed by the custom _destroy
|
||||
read.on('error', common.mustNotCall('no error event'));
|
||||
|
||||
read.destroy(expected);
|
||||
assert.strictEqual(read.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const read = new Readable({
|
||||
read() {}
|
||||
});
|
||||
|
||||
read._destroy = common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, null);
|
||||
cb();
|
||||
});
|
||||
|
||||
read.destroy();
|
||||
assert.strictEqual(read.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const read = new Readable({
|
||||
read() {}
|
||||
});
|
||||
read.resume();
|
||||
|
||||
read._destroy = common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, null);
|
||||
process.nextTick(() => {
|
||||
this.push(null);
|
||||
cb();
|
||||
});
|
||||
});
|
||||
|
||||
const fail = common.mustNotCall('no end event');
|
||||
|
||||
read.on('end', fail);
|
||||
|
||||
read.destroy();
|
||||
|
||||
read.removeListener('end', fail);
|
||||
read.on('end', common.mustCall());
|
||||
assert.strictEqual(read.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const read = new Readable({
|
||||
read() {}
|
||||
});
|
||||
|
||||
const expected = new Error('kaboom');
|
||||
|
||||
read._destroy = common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, null);
|
||||
cb(expected);
|
||||
});
|
||||
|
||||
read.on('end', common.mustNotCall('no end event'));
|
||||
read.on('error', common.mustCall((err) => {
|
||||
assert.strictEqual(err, expected);
|
||||
}));
|
||||
|
||||
read.destroy();
|
||||
assert.strictEqual(read.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const read = new Readable({
|
||||
read() {}
|
||||
});
|
||||
read.resume();
|
||||
|
||||
read.destroyed = true;
|
||||
assert.strictEqual(read.destroyed, true);
|
||||
|
||||
// the internal destroy() mechanism should not be triggered
|
||||
read.on('end', common.mustNotCall());
|
||||
read.destroy();
|
||||
}
|
||||
|
||||
{
|
||||
function MyReadable() {
|
||||
assert.strictEqual(this.destroyed, false);
|
||||
this.destroyed = false;
|
||||
Readable.call(this);
|
||||
}
|
||||
|
||||
inherits(MyReadable, Readable);
|
||||
|
||||
new MyReadable();
|
||||
}
|
143
test/parallel/test-stream-transform-destroy.js
Normal file
143
test/parallel/test-stream-transform-destroy.js
Normal file
@ -0,0 +1,143 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const { Transform } = require('stream');
|
||||
const assert = require('assert');
|
||||
|
||||
{
|
||||
const transform = new Transform({
|
||||
transform(chunk, enc, cb) {}
|
||||
});
|
||||
|
||||
transform.resume();
|
||||
|
||||
transform.on('end', common.mustCall());
|
||||
transform.on('close', common.mustCall());
|
||||
transform.on('finish', common.mustCall());
|
||||
|
||||
transform.destroy();
|
||||
}
|
||||
|
||||
{
|
||||
const transform = new Transform({
|
||||
transform(chunk, enc, cb) {}
|
||||
});
|
||||
transform.resume();
|
||||
|
||||
const expected = new Error('kaboom');
|
||||
|
||||
transform.on('end', common.mustCall());
|
||||
transform.on('finish', common.mustCall());
|
||||
transform.on('close', common.mustCall());
|
||||
transform.on('error', common.mustCall((err) => {
|
||||
assert.strictEqual(err, expected);
|
||||
}));
|
||||
|
||||
transform.destroy(expected);
|
||||
}
|
||||
|
||||
{
|
||||
const transform = new Transform({
|
||||
transform(chunk, enc, cb) {}
|
||||
});
|
||||
|
||||
transform._destroy = common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, expected);
|
||||
cb(err);
|
||||
}, 1);
|
||||
|
||||
const expected = new Error('kaboom');
|
||||
|
||||
transform.on('finish', common.mustNotCall('no finish event'));
|
||||
transform.on('close', common.mustNotCall('no close event'));
|
||||
transform.on('error', common.mustCall((err) => {
|
||||
assert.strictEqual(err, expected);
|
||||
}));
|
||||
|
||||
transform.destroy(expected);
|
||||
}
|
||||
|
||||
{
|
||||
const expected = new Error('kaboom');
|
||||
const transform = new Transform({
|
||||
transform(chunk, enc, cb) {},
|
||||
destroy: common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, expected);
|
||||
cb();
|
||||
}, 1)
|
||||
});
|
||||
transform.resume();
|
||||
|
||||
transform.on('end', common.mustNotCall('no end event'));
|
||||
transform.on('close', common.mustNotCall('no close event'));
|
||||
transform.on('finish', common.mustNotCall('no finish event'));
|
||||
|
||||
// error is swallowed by the custom _destroy
|
||||
transform.on('error', common.mustNotCall('no error event'));
|
||||
|
||||
transform.destroy(expected);
|
||||
}
|
||||
|
||||
{
|
||||
const transform = new Transform({
|
||||
transform(chunk, enc, cb) {}
|
||||
});
|
||||
|
||||
transform._destroy = common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, null);
|
||||
cb();
|
||||
}, 1);
|
||||
|
||||
transform.destroy();
|
||||
}
|
||||
|
||||
{
|
||||
const transform = new Transform({
|
||||
transform(chunk, enc, cb) {}
|
||||
});
|
||||
transform.resume();
|
||||
|
||||
transform._destroy = common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, null);
|
||||
process.nextTick(() => {
|
||||
this.push(null);
|
||||
this.end();
|
||||
cb();
|
||||
});
|
||||
}, 1);
|
||||
|
||||
const fail = common.mustNotCall('no event');
|
||||
|
||||
transform.on('finish', fail);
|
||||
transform.on('end', fail);
|
||||
transform.on('close', fail);
|
||||
|
||||
transform.destroy();
|
||||
|
||||
transform.removeListener('end', fail);
|
||||
transform.removeListener('finish', fail);
|
||||
transform.on('end', common.mustCall());
|
||||
transform.on('finish', common.mustCall());
|
||||
}
|
||||
|
||||
{
|
||||
const transform = new Transform({
|
||||
transform(chunk, enc, cb) {}
|
||||
});
|
||||
|
||||
const expected = new Error('kaboom');
|
||||
|
||||
transform._destroy = common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, null);
|
||||
cb(expected);
|
||||
}, 1);
|
||||
|
||||
transform.on('close', common.mustNotCall('no close event'));
|
||||
transform.on('finish', common.mustNotCall('no finish event'));
|
||||
transform.on('end', common.mustNotCall('no end event'));
|
||||
transform.on('error', common.mustCall((err) => {
|
||||
assert.strictEqual(err, expected);
|
||||
}));
|
||||
|
||||
transform.destroy();
|
||||
}
|
172
test/parallel/test-stream-writable-destroy.js
Normal file
172
test/parallel/test-stream-writable-destroy.js
Normal file
@ -0,0 +1,172 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const { Writable } = require('stream');
|
||||
const assert = require('assert');
|
||||
const { inherits } = require('util');
|
||||
|
||||
{
|
||||
const write = new Writable({
|
||||
write(chunk, enc, cb) { cb(); }
|
||||
});
|
||||
|
||||
write.on('finish', common.mustCall());
|
||||
|
||||
write.destroy();
|
||||
assert.strictEqual(write.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const write = new Writable({
|
||||
write(chunk, enc, cb) { cb(); }
|
||||
});
|
||||
|
||||
const expected = new Error('kaboom');
|
||||
|
||||
write.on('finish', common.mustCall());
|
||||
write.on('error', common.mustCall((err) => {
|
||||
assert.strictEqual(err, expected);
|
||||
}));
|
||||
|
||||
write.destroy(expected);
|
||||
assert.strictEqual(write.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const write = new Writable({
|
||||
write(chunk, enc, cb) { cb(); }
|
||||
});
|
||||
|
||||
write._destroy = function(err, cb) {
|
||||
assert.strictEqual(err, expected);
|
||||
cb(err);
|
||||
};
|
||||
|
||||
const expected = new Error('kaboom');
|
||||
|
||||
write.on('finish', common.mustNotCall('no finish event'));
|
||||
write.on('error', common.mustCall((err) => {
|
||||
assert.strictEqual(err, expected);
|
||||
}));
|
||||
|
||||
write.destroy(expected);
|
||||
assert.strictEqual(write.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const write = new Writable({
|
||||
write(chunk, enc, cb) { cb(); },
|
||||
destroy: common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, expected);
|
||||
cb();
|
||||
})
|
||||
});
|
||||
|
||||
const expected = new Error('kaboom');
|
||||
|
||||
write.on('finish', common.mustNotCall('no finish event'));
|
||||
|
||||
// error is swallowed by the custom _destroy
|
||||
write.on('error', common.mustNotCall('no error event'));
|
||||
|
||||
write.destroy(expected);
|
||||
assert.strictEqual(write.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const write = new Writable({
|
||||
write(chunk, enc, cb) { cb(); }
|
||||
});
|
||||
|
||||
write._destroy = common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, null);
|
||||
cb();
|
||||
});
|
||||
|
||||
write.destroy();
|
||||
assert.strictEqual(write.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const write = new Writable({
|
||||
write(chunk, enc, cb) { cb(); }
|
||||
});
|
||||
|
||||
write._destroy = common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, null);
|
||||
process.nextTick(() => {
|
||||
this.end();
|
||||
cb();
|
||||
});
|
||||
});
|
||||
|
||||
const fail = common.mustNotCall('no finish event');
|
||||
|
||||
write.on('finish', fail);
|
||||
|
||||
write.destroy();
|
||||
|
||||
write.removeListener('finish', fail);
|
||||
write.on('finish', common.mustCall());
|
||||
assert.strictEqual(write.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const write = new Writable({
|
||||
write(chunk, enc, cb) { cb(); }
|
||||
});
|
||||
|
||||
const expected = new Error('kaboom');
|
||||
|
||||
write._destroy = common.mustCall(function(err, cb) {
|
||||
assert.strictEqual(err, null);
|
||||
cb(expected);
|
||||
});
|
||||
|
||||
write.on('finish', common.mustNotCall('no finish event'));
|
||||
write.on('error', common.mustCall((err) => {
|
||||
assert.strictEqual(err, expected);
|
||||
}));
|
||||
|
||||
write.destroy();
|
||||
assert.strictEqual(write.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
// double error case
|
||||
const write = new Writable({
|
||||
write(chunk, enc, cb) { cb(); }
|
||||
});
|
||||
|
||||
write.on('error', common.mustCall());
|
||||
|
||||
write.destroy(new Error('kaboom 1'));
|
||||
write.destroy(new Error('kaboom 2'));
|
||||
assert.strictEqual(write._writableState.errorEmitted, true);
|
||||
assert.strictEqual(write.destroyed, true);
|
||||
}
|
||||
|
||||
{
|
||||
const write = new Writable({
|
||||
write(chunk, enc, cb) { cb(); }
|
||||
});
|
||||
|
||||
write.destroyed = true;
|
||||
assert.strictEqual(write.destroyed, true);
|
||||
|
||||
// the internal destroy() mechanism should not be triggered
|
||||
write.on('finish', common.mustNotCall());
|
||||
write.destroy();
|
||||
}
|
||||
|
||||
{
|
||||
function MyWritable() {
|
||||
assert.strictEqual(this.destroyed, false);
|
||||
this.destroyed = false;
|
||||
Writable.call(this);
|
||||
}
|
||||
|
||||
inherits(MyWritable, Writable);
|
||||
|
||||
new MyWritable();
|
||||
}
|
@ -16,7 +16,7 @@ const server = net.createServer(common.mustCall((c) => {
|
||||
const c = tls.connect({ port: server.address().port });
|
||||
c.on('error', () => {
|
||||
// Otherwise `.write()` callback won't be invoked.
|
||||
c.destroyed = false;
|
||||
c._undestroy();
|
||||
});
|
||||
|
||||
c.write('hello', common.mustCall((err) => {
|
||||
|
@ -1,10 +1,10 @@
|
||||
'use strict';
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
|
||||
assert.throws(() => process.stdout.end(),
|
||||
common.expectsError({
|
||||
code: 'ERR_STDOUT_CLOSE',
|
||||
type: Error,
|
||||
message: 'process.stdout cannot be closed'
|
||||
}));
|
||||
process.on('uncaughtException', common.expectsError({
|
||||
code: 'ERR_STDOUT_CLOSE',
|
||||
type: Error,
|
||||
message: 'process.stdout cannot be closed'
|
||||
}));
|
||||
|
||||
process.stdout.end();
|
||||
|
Loading…
x
Reference in New Issue
Block a user