stream: fix destroy() behavior

Ensure errorEmitted is always set. Only emit 'error' once.

PR-URL: https://github.com/nodejs/node/pull/29058
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
This commit is contained in:
Robert Nagy 2019-07-16 00:03:23 +02:00 committed by Rich Trott
parent a890771cd0
commit 4a2bd69db9
10 changed files with 154 additions and 85 deletions

View File

@ -281,6 +281,9 @@ The stream is not closed when the `'error'` event is emitted unless the
[`autoDestroy`][writable-new] option was set to `true` when creating the [`autoDestroy`][writable-new] option was set to `true` when creating the
stream. stream.
After `'error'`, no further events other than `'close'` *should* be emitted
(including `'error'` events).
##### Event: 'finish' ##### Event: 'finish'
<!-- YAML <!-- YAML
added: v0.9.4 added: v0.9.4

View File

@ -117,6 +117,9 @@ function ReadableState(options, stream, isDuplex) {
this.resumeScheduled = false; this.resumeScheduled = false;
this.paused = true; this.paused = true;
// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;
// Should close be emitted on destroy. Defaults to true. // Should close be emitted on destroy. Defaults to true.
this.emitClose = options.emitClose !== false; this.emitClose = options.emitClose !== false;

View File

@ -429,13 +429,11 @@ function onwriteError(stream, state, sync, er, cb) {
// This can emit finish, and it will always happen // This can emit finish, and it will always happen
// after error // after error
process.nextTick(finishMaybe, stream, state); process.nextTick(finishMaybe, stream, state);
stream._writableState.errorEmitted = true;
errorOrDestroy(stream, er); errorOrDestroy(stream, er);
} else { } else {
// The caller expect this to happen before if // The caller expect this to happen before if
// it is async // it is async
cb(er); cb(er);
stream._writableState.errorEmitted = true;
errorOrDestroy(stream, er); errorOrDestroy(stream, er);
// This can emit finish, but finish must // This can emit finish, but finish must
// always follow error // always follow error

View File

@ -1,22 +1,37 @@
'use strict'; 'use strict';
function needError(stream, err) {
if (!err) {
return false;
}
const r = stream._readableState;
const w = stream._writableState;
if ((w && w.errorEmitted) || (r && r.errorEmitted)) {
return false;
}
if (w) {
w.errorEmitted = true;
}
if (r) {
r.errorEmitted = true;
}
return true;
}
// Undocumented cb() API, needed for core, not for public API // Undocumented cb() API, needed for core, not for public API
function destroy(err, cb) { function destroy(err, cb) {
const readableDestroyed = this._readableState && const r = this._readableState;
this._readableState.destroyed; const w = this._writableState;
const writableDestroyed = this._writableState &&
this._writableState.destroyed;
if (readableDestroyed || writableDestroyed) { if ((w && w.destroyed) || (r && r.destroyed)) {
if (cb) { if (cb) {
cb(err); cb(err);
} else if (err) { } else if (needError(this, err)) {
if (!this._writableState) {
process.nextTick(emitErrorNT, this, err); process.nextTick(emitErrorNT, this, err);
} else if (!this._writableState.errorEmitted) {
this._writableState.errorEmitted = true;
process.nextTick(emitErrorNT, this, err);
}
} }
return this; return this;
@ -25,28 +40,19 @@ function destroy(err, cb) {
// We set destroyed to true before firing error callbacks in order // We set destroyed to true before firing error callbacks in order
// to make it re-entrance safe in case destroy() is called within callbacks // to make it re-entrance safe in case destroy() is called within callbacks
if (this._readableState) { if (w) {
this._readableState.destroyed = true; w.destroyed = true;
} }
if (r) {
// If this is a duplex stream mark the writable part as destroyed as well r.destroyed = true;
if (this._writableState) {
this._writableState.destroyed = true;
} }
this._destroy(err || null, (err) => { this._destroy(err || null, (err) => {
if (!cb && err) { if (cb) {
if (!this._writableState) {
process.nextTick(emitErrorAndCloseNT, this, err);
} else if (!this._writableState.errorEmitted) {
this._writableState.errorEmitted = true;
process.nextTick(emitErrorAndCloseNT, this, err);
} else {
process.nextTick(emitCloseNT, this);
}
} else if (cb) {
process.nextTick(emitCloseNT, this); process.nextTick(emitCloseNT, this);
cb(err); cb(err);
} else if (needError(this, err)) {
process.nextTick(emitErrorAndCloseNT, this, err);
} else { } else {
process.nextTick(emitCloseNT, this); process.nextTick(emitCloseNT, this);
} }
@ -61,29 +67,36 @@ function emitErrorAndCloseNT(self, err) {
} }
function emitCloseNT(self) { function emitCloseNT(self) {
if (self._writableState && !self._writableState.emitClose) const r = self._readableState;
const w = self._writableState;
if (w && !w.emitClose)
return; return;
if (self._readableState && !self._readableState.emitClose) if (r && !r.emitClose)
return; return;
self.emit('close'); self.emit('close');
} }
function undestroy() { function undestroy() {
if (this._readableState) { const r = this._readableState;
this._readableState.destroyed = false; const w = this._writableState;
this._readableState.reading = false;
this._readableState.ended = false; if (r) {
this._readableState.endEmitted = false; r.destroyed = false;
r.reading = false;
r.ended = false;
r.endEmitted = false;
r.errorEmitted = false;
} }
if (this._writableState) { if (w) {
this._writableState.destroyed = false; w.destroyed = false;
this._writableState.ended = false; w.ended = false;
this._writableState.ending = false; w.ending = false;
this._writableState.finalCalled = false; w.finalCalled = false;
this._writableState.prefinished = false; w.prefinished = false;
this._writableState.finished = false; w.finished = false;
this._writableState.errorEmitted = false; w.errorEmitted = false;
} }
} }
@ -98,12 +111,12 @@ function errorOrDestroy(stream, err) {
// the error to be emitted nextTick. In a future // the error to be emitted nextTick. In a future
// semver major update we should change the default to this. // semver major update we should change the default to this.
const rState = stream._readableState; const r = stream._readableState;
const wState = stream._writableState; const w = stream._writableState;
if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy)) if ((r && r.autoDestroy) || (w && w.autoDestroy))
stream.destroy(err); stream.destroy(err);
else else if (needError(stream, err))
stream.emit('error', err); stream.emit('error', err);
} }

View File

@ -69,12 +69,14 @@ tcp.listen(0, common.mustCall(function() {
[], [],
{} {}
].forEach((value) => { ].forEach((value) => {
common.expectsError(() => socket.write(value), { // We need to check the callback since 'error' will only
// be emitted once per instance.
socket.write(value, common.expectsError({
code: 'ERR_INVALID_ARG_TYPE', code: 'ERR_INVALID_ARG_TYPE',
type: TypeError, type: TypeError,
message: 'The "chunk" argument must be one of type string or Buffer. ' + message: 'The "chunk" argument must be one of type string or Buffer. ' +
`Received type ${typeof value}` `Received type ${typeof value}`
}); }));
}); });
// Write a string that contains a multi-byte character sequence to test that // Write a string that contains a multi-byte character sequence to test that

View File

@ -0,0 +1,19 @@
'use strict';
const common = require('../common');
const { Writable, Readable } = require('stream');
{
const writable = new Writable();
writable.on('error', common.mustCall());
writable.end();
writable.write('h');
writable.write('h');
}
{
const readable = new Readable();
readable.on('error', common.mustCall());
readable.push(null);
readable.push('h');
readable.push('h');
}

View File

@ -3,17 +3,32 @@
const common = require('../common'); const common = require('../common');
const stream = require('stream'); const stream = require('stream');
function testPushArg(val) {
const readable = new stream.Readable({ const readable = new stream.Readable({
read: () => {} read: () => {}
}); });
readable.on('error', common.expectsError({
function checkError(fn) {
common.expectsError(fn, {
code: 'ERR_INVALID_ARG_TYPE', code: 'ERR_INVALID_ARG_TYPE',
type: TypeError type: TypeError
}); }));
readable.push(val);
} }
checkError(() => readable.push([])); testPushArg([]);
checkError(() => readable.push({})); testPushArg({});
checkError(() => readable.push(0)); testPushArg(0);
function testUnshiftArg(val) {
const readable = new stream.Readable({
read: () => {}
});
readable.on('error', common.expectsError({
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
}));
readable.unshift(val);
}
testUnshiftArg([]);
testUnshiftArg({});
testUnshiftArg(0);

View File

@ -112,23 +112,6 @@ const { Readable } = require('stream');
} }
{
// Check that error is thrown for invalid chunks
const readable = new Readable({ read() {} });
function checkError(fn) {
common.expectsError(fn, {
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
});
}
checkError(() => readable.unshift([]));
checkError(() => readable.unshift({}));
checkError(() => readable.unshift(0));
}
{ {
// Check that ObjectMode works // Check that ObjectMode works
const readable = new Readable({ objectMode: true, read() {} }); const readable = new Readable({ objectMode: true, read() {} });

View File

@ -86,13 +86,7 @@ w._write = function(chunk, encoding, cb) {
}; };
r.on('end', common.mustCall(function() { r.on('end', common.mustCall(function() {
common.expectsError(function() {
r.unshift(Buffer.allocUnsafe(1)); r.unshift(Buffer.allocUnsafe(1));
}, {
code: 'ERR_STREAM_UNSHIFT_AFTER_END_EVENT',
type: Error,
message: 'stream.unshift() after end event'
});
w.end(); w.end();
})); }));

View File

@ -402,3 +402,42 @@ const helloWorldBuffer = Buffer.from('hello world');
w.write(Buffer.allocUnsafe(1)); w.write(Buffer.allocUnsafe(1));
w.end(Buffer.allocUnsafe(0)); w.end(Buffer.allocUnsafe(0));
} }
{
// Verify that error is only emitted once when failing in _finish.
const w = new W();
w._final = common.mustCall(function(cb) {
cb(new Error('test'));
});
w.on('error', common.mustCall((err) => {
assert.strictEqual(w._writableState.errorEmitted, true);
assert.strictEqual(err.message, 'test');
w.on('error', common.mustNotCall());
w.destroy(new Error());
}));
w.end();
}
{
// Verify that error is only emitted once when failing in write.
const w = new W();
w.on('error', common.mustCall((err) => {
assert.strictEqual(w._writableState.errorEmitted, true);
assert.strictEqual(err.code, 'ERR_STREAM_NULL_VALUES');
}));
w.write(null);
w.destroy(new Error());
}
{
// Verify that error is only emitted once when failing in write after end.
const w = new W();
w.on('error', common.mustCall((err) => {
assert.strictEqual(w._writableState.errorEmitted, true);
assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
}));
w.end();
w.write('hello');
w.destroy(new Error());
}