From 5e3f51648ed5de36b01d53bde13fb6fb7b965667 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Mon, 29 Jan 2018 19:32:34 +0100 Subject: [PATCH] stream: updated streams error handling This improves error handling for streams in a few ways. 1. It ensures that no user defined methods (_read, _write, ...) are run after .destroy has been called. 2. It introduces an explicit error to tell the user if they are write to write, etc to the stream after it has been destroyed. 3. It makes streams always emit close as the last thing after they have been destroyed 4. Changes the default _destroy to not gracefully end streams. It also updates net, http2, zlib and fs to the new error handling. PR-URL: https://github.com/nodejs/node/pull/18438 Reviewed-By: Matteo Collina Reviewed-By: James M Snell Reviewed-By: Ruben Bridgewater Reviewed-By: Anna Henningsen --- doc/api/errors.md | 11 ++++++----- doc/api/stream.md | 19 +++++++++++++++---- lib/_stream_duplex.js | 7 ------- lib/_stream_readable.js | 6 +++++- lib/_stream_transform.js | 3 +-- lib/_stream_writable.js | 10 +++++++--- lib/fs.js | 6 ++++++ lib/internal/errors.js | 2 +- lib/internal/http2/core.js | 1 + lib/internal/streams/destroy.js | 9 +++++++++ lib/net.js | 5 +++++ lib/zlib.js | 9 ++------- test/parallel/test-net-socket-destroy-send.js | 8 ++++---- test/parallel/test-stream-duplex-destroy.js | 14 ++++++++------ test/parallel/test-stream-readable-destroy.js | 19 +++++++++++++++++-- .../parallel/test-stream-transform-destroy.js | 16 ++++++++-------- test/parallel/test-stream-writable-destroy.js | 13 ++++++++++--- test/parallel/test-zlib-write-after-close.js | 4 ++-- 18 files changed, 107 insertions(+), 55 deletions(-) diff --git a/doc/api/errors.md b/doc/api/errors.md index b9c52ad0d9a..70ac01de610 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -1449,6 +1449,12 @@ An unspecified or non-specific system error has occurred within the Node.js process. The error object will have an `err.info` object property with additional details. + +### ERR_STREAM_DESTROYED + +A stream method was called that cannot complete because the stream was +destroyed using `stream.destroy()`. + ### ERR_TLS_CERT_ALTNAME_INVALID @@ -1615,11 +1621,6 @@ The fulfilled value of a linking promise is not a `vm.Module` object. The current module's status does not allow for this operation. The specific meaning of the error depends on the specific function. - -### ERR_ZLIB_BINDING_CLOSED - -An attempt was made to use a `zlib` object after it has already been closed. - ### ERR_ZLIB_INITIALIZATION_FAILED diff --git a/doc/api/stream.md b/doc/api/stream.md index 5db990d4d2c..32e368f05f1 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -543,8 +543,10 @@ added: v8.0.0 * Returns: {this} -Destroy the stream, and emit the passed error. After this call, the -writable stream has ended. Implementors should not override this method, +Destroy the stream, and emit the passed `error` and a `close` event. +After this call, the writable stream has ended and subsequent calls +to `write` / `end` will give an `ERR_STREAM_DESTROYED` error. +Implementors should not override this method, but instead implement [`writable._destroy`][writable-_destroy]. ### Readable Streams @@ -1167,8 +1169,9 @@ myReader.on('readable', () => { added: v8.0.0 --> -Destroy the stream, and emit `'error'`. After this call, the -readable stream will release any internal resources. +Destroy the stream, and emit `'error'` and `close`. After this call, the +readable stream will release any internal resources and subsequent calls +to `push` will be ignored. Implementors should not override this method, but instead implement [`readable._destroy`][readable-_destroy]. @@ -1382,6 +1385,12 @@ constructor and implement the `writable._write()` method. The `writable._writev()` method *may* also be implemented. #### Constructor: new stream.Writable([options]) + * `options` {Object} * `highWaterMark` {number} Buffer level when @@ -1395,6 +1404,8 @@ constructor and implement the `writable._write()` method. The it becomes possible to write JavaScript values other than string, `Buffer` or `Uint8Array` if supported by the stream implementation. Defaults to `false` + * `emitClose` {boolean} Whether or not the stream should emit `close` + after it has been destroyed. Defaults to `true` * `write` {Function} Implementation for the [`stream._write()`][stream-_write] method. * `writev` {Function} Implementation for the diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js index 59ce8329278..1ccb931260d 100644 --- a/lib/_stream_duplex.js +++ b/lib/_stream_duplex.js @@ -135,10 +135,3 @@ Object.defineProperty(Duplex.prototype, 'destroyed', { this._writableState.destroyed = value; } }); - -Duplex.prototype._destroy = function(err, cb) { - this.push(null); - this.end(); - - process.nextTick(cb, err); -}; diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index ba231ccda90..5781dfd471e 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -106,6 +106,9 @@ function ReadableState(options, stream) { this.readableListening = false; this.resumeScheduled = false; + // Should close be emitted on destroy. Defaults to true. + this.emitClose = options.emitClose !== false; + // has it been destroyed this.destroyed = false; @@ -177,7 +180,6 @@ Object.defineProperty(Readable.prototype, 'destroyed', { Readable.prototype.destroy = destroyImpl.destroy; Readable.prototype._undestroy = destroyImpl.undestroy; Readable.prototype._destroy = function(err, cb) { - this.push(null); cb(err); }; @@ -236,6 +238,8 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { addChunk(stream, state, chunk, true); } else if (state.ended) { stream.emit('error', new errors.Error('ERR_STREAM_PUSH_AFTER_EOF')); + } else if (state.destroyed) { + return false; } else { state.reading = false; if (state.decoder && !encoding) { diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index a9fcddda2d9..b82114ecaec 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -132,7 +132,7 @@ function Transform(options) { } function prefinish() { - if (typeof this._flush === 'function') { + if (typeof this._flush === 'function' && !this._readableState.destroyed) { this._flush((er, data) => { done(this, er, data); }); @@ -194,7 +194,6 @@ Transform.prototype._read = function(n) { Transform.prototype._destroy = function(err, cb) { Duplex.prototype._destroy.call(this, err, (err2) => { cb(err2); - this.emit('close'); }); }; diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 2b765881359..d5cfe07f171 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -134,6 +134,9 @@ function WritableState(options, stream) { // True if the error was already emitted and should not be thrown again this.errorEmitted = false; + // Should close be emitted on destroy. Defaults to true. + this.emitClose = options.emitClose !== false; + // count buffered requests this.bufferedRequestCount = 0; @@ -390,7 +393,9 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) { state.writecb = cb; state.writing = true; state.sync = true; - if (writev) + if (state.destroyed) + state.onwrite(new errors.Error('ERR_STREAM_DESTROYED', 'write')); + else if (writev) stream._writev(chunk, state.onwrite); else stream._write(chunk, encoding, state.onwrite); @@ -604,7 +609,7 @@ function callFinal(stream, state) { } function prefinish(stream, state) { if (!state.prefinished && !state.finalCalled) { - if (typeof stream._final === 'function') { + if (typeof stream._final === 'function' && !state.destroyed) { state.pendingcb++; state.finalCalled = true; process.nextTick(callFinal, stream, state); @@ -681,6 +686,5 @@ Object.defineProperty(Writable.prototype, 'destroyed', { Writable.prototype.destroy = destroyImpl.destroy; Writable.prototype._undestroy = destroyImpl.undestroy; Writable.prototype._destroy = function(err, cb) { - this.end(); cb(err); }; diff --git a/lib/fs.js b/lib/fs.js index 3771efad10d..917c3eb3a9f 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -1929,6 +1929,9 @@ function ReadStream(path, options) { if (options.highWaterMark === undefined) options.highWaterMark = 64 * 1024; + // for backwards compat do not emit close on destroy. + options.emitClose = false; + Readable.call(this, options); // path will be ignored when fd is specified, so it can be falsy @@ -2084,6 +2087,9 @@ function WriteStream(path, options) { options = copyObject(getOptions(options, {})); + // for backwards compat do not emit close on destroy. + options.emitClose = false; + Writable.call(this, options); // path will be ignored when fd is specified, so it can be falsy diff --git a/lib/internal/errors.js b/lib/internal/errors.js index a4a79d671e4..11f32ccdc17 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -843,6 +843,7 @@ E('ERR_SOCKET_DGRAM_NOT_RUNNING', 'Not running', Error); E('ERR_STDERR_CLOSE', 'process.stderr cannot be closed', Error); E('ERR_STDOUT_CLOSE', 'process.stdout cannot be closed', Error); E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error); +E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed'); E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError); E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error); E('ERR_STREAM_READ_NOT_IMPLEMENTED', '_read() is not implemented', Error); @@ -908,7 +909,6 @@ E('ERR_VM_MODULE_NOT_LINKED', E('ERR_VM_MODULE_NOT_MODULE', 'Provided module is not an instance of Module', Error); E('ERR_VM_MODULE_STATUS', 'Module status %s', Error); -E('ERR_ZLIB_BINDING_CLOSED', 'zlib binding closed', Error); E('ERR_ZLIB_INITIALIZATION_FAILED', 'Initialization failed', Error); function sysError(code, syscall, path, dest, diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 71bb55ee23c..f60c6388af6 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -1475,6 +1475,7 @@ class Http2Stream extends Duplex { constructor(session, options) { options.allowHalfOpen = true; options.decodeStrings = false; + options.emitClose = false; super(options); this[async_id_symbol] = -1; diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 985332ac460..5d29e182041 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -30,6 +30,7 @@ function destroy(err, cb) { } this._destroy(err || null, (err) => { + process.nextTick(emitCloseNT, this); if (!cb && err) { process.nextTick(emitErrorNT, this, err); if (this._writableState) { @@ -43,6 +44,14 @@ function destroy(err, cb) { return this; } +function emitCloseNT(self) { + if (self._writableState && !self._writableState.emitClose) + return; + if (self._readableState && !self._readableState.emitClose) + return; + self.emit('close'); +} + function undestroy() { if (this._readableState) { this._readableState.destroyed = false; diff --git a/lib/net.js b/lib/net.js index 7583fcb27d1..f2cb423f300 100644 --- a/lib/net.js +++ b/lib/net.js @@ -232,6 +232,11 @@ function Socket(options) { options = { fd: options }; // Legacy interface. else if (options === undefined) options = {}; + else + options = util._extend({}, options); + + // For backwards compat do not emit close on destroy. + options.emitClose = false; stream.Duplex.call(this, options); diff --git a/lib/zlib.js b/lib/zlib.js index 93f878712ad..4adfd1ffa28 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -25,7 +25,6 @@ const { ERR_BUFFER_TOO_LARGE, ERR_INVALID_ARG_TYPE, ERR_OUT_OF_RANGE, - ERR_ZLIB_BINDING_CLOSED, ERR_ZLIB_INITIALIZATION_FAILED } = require('internal/errors').codes; const Transform = require('_stream_transform'); @@ -392,7 +391,7 @@ Zlib.prototype.flush = function flush(kind, callback) { Zlib.prototype.close = function close(callback) { _close(this, callback); - process.nextTick(emitCloseNT, this); + this.destroy(); }; Zlib.prototype._transform = function _transform(chunk, encoding, cb) { @@ -510,7 +509,7 @@ function processChunkSync(self, chunk, flushFlag) { function processChunk(self, chunk, flushFlag, cb) { var handle = self._handle; if (!handle) - return cb(new ERR_ZLIB_BINDING_CLOSED()); + assert(false, 'zlib binding closed'); handle.buffer = chunk; handle.cb = cb; @@ -603,10 +602,6 @@ function _close(engine, callback) { engine._handle = null; } -function emitCloseNT(self) { - self.emit('close'); -} - // generic zlib // minimal 2-byte header function Deflate(opts) { diff --git a/test/parallel/test-net-socket-destroy-send.js b/test/parallel/test-net-socket-destroy-send.js index a602b892538..aa587fc2e16 100644 --- a/test/parallel/test-net-socket-destroy-send.js +++ b/test/parallel/test-net-socket-destroy-send.js @@ -13,14 +13,14 @@ server.listen(0, common.mustCall(function() { // Test destroy returns this, even on multiple calls when it short-circuits. assert.strictEqual(conn, conn.destroy().destroy()); conn.on('error', common.expectsError({ - code: 'ERR_SOCKET_CLOSED', - message: 'Socket is closed', + code: 'ERR_STREAM_DESTROYED', + message: 'Cannot call write after a stream was destroyed', type: Error })); conn.write(Buffer.from('kaboom'), common.expectsError({ - code: 'ERR_SOCKET_CLOSED', - message: 'Socket is closed', + code: 'ERR_STREAM_DESTROYED', + message: 'Cannot call write after a stream was destroyed', type: Error })); server.close(); diff --git a/test/parallel/test-stream-duplex-destroy.js b/test/parallel/test-stream-duplex-destroy.js index 00e334d64b5..854d29ffc13 100644 --- a/test/parallel/test-stream-duplex-destroy.js +++ b/test/parallel/test-stream-duplex-destroy.js @@ -13,8 +13,9 @@ const { inherits } = require('util'); duplex.resume(); - duplex.on('end', common.mustCall()); - duplex.on('finish', common.mustCall()); + duplex.on('end', common.mustNotCall()); + duplex.on('finish', common.mustNotCall()); + duplex.on('close', common.mustCall()); duplex.destroy(); assert.strictEqual(duplex.destroyed, true); @@ -29,8 +30,8 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); - duplex.on('end', common.mustCall()); - duplex.on('finish', common.mustCall()); + duplex.on('end', common.mustNotCall()); + duplex.on('finish', common.mustNotCall()); duplex.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); })); @@ -78,6 +79,7 @@ const { inherits } = require('util'); // error is swallowed by the custom _destroy duplex.on('error', common.mustNotCall('no error event')); + duplex.on('close', common.mustCall()); duplex.destroy(expected); assert.strictEqual(duplex.destroyed, true); @@ -159,8 +161,8 @@ const { inherits } = require('util'); }); duplex.resume(); - duplex.on('finish', common.mustCall()); - duplex.on('end', common.mustCall()); + duplex.on('finish', common.mustNotCall()); + duplex.on('end', common.mustNotCall()); duplex.destroy(); assert.strictEqual(duplex.destroyed, true); diff --git a/test/parallel/test-stream-readable-destroy.js b/test/parallel/test-stream-readable-destroy.js index def20d26c34..026aa8ca160 100644 --- a/test/parallel/test-stream-readable-destroy.js +++ b/test/parallel/test-stream-readable-destroy.js @@ -11,7 +11,7 @@ const { inherits } = require('util'); }); read.resume(); - read.on('end', common.mustCall()); + read.on('close', common.mustCall()); read.destroy(); assert.strictEqual(read.destroyed, true); @@ -25,7 +25,8 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); - read.on('end', common.mustCall()); + read.on('end', common.mustNotCall('no end event')); + read.on('close', common.mustCall()); read.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); })); @@ -47,6 +48,7 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); read.on('end', common.mustNotCall('no end event')); + read.on('close', common.mustCall()); read.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); })); @@ -70,6 +72,7 @@ const { inherits } = require('util'); // error is swallowed by the custom _destroy read.on('error', common.mustNotCall('no error event')); + read.on('close', common.mustCall()); read.destroy(expected); assert.strictEqual(read.destroyed, true); @@ -106,6 +109,7 @@ const { inherits } = require('util'); const fail = common.mustNotCall('no end event'); read.on('end', fail); + read.on('close', common.mustCall()); read.destroy(); @@ -170,7 +174,18 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); + read.on('close', common.mustCall()); read.destroy(expected, common.mustCall(function(err) { assert.strictEqual(expected, err); })); } + +{ + const read = new Readable({ + read() {} + }); + + read.destroy(); + read.push('hi'); + read.on('data', common.mustNotCall()); +} diff --git a/test/parallel/test-stream-transform-destroy.js b/test/parallel/test-stream-transform-destroy.js index c42fe1d6f96..47cce87264b 100644 --- a/test/parallel/test-stream-transform-destroy.js +++ b/test/parallel/test-stream-transform-destroy.js @@ -11,9 +11,9 @@ const assert = require('assert'); transform.resume(); - transform.on('end', common.mustCall()); + transform.on('end', common.mustNotCall()); transform.on('close', common.mustCall()); - transform.on('finish', common.mustCall()); + transform.on('finish', common.mustNotCall()); transform.destroy(); } @@ -26,8 +26,8 @@ const assert = require('assert'); const expected = new Error('kaboom'); - transform.on('end', common.mustCall()); - transform.on('finish', common.mustCall()); + transform.on('end', common.mustNotCall()); + transform.on('finish', common.mustNotCall()); transform.on('close', common.mustCall()); transform.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); @@ -49,7 +49,7 @@ const assert = require('assert'); const expected = new Error('kaboom'); transform.on('finish', common.mustNotCall('no finish event')); - transform.on('close', common.mustNotCall('no close event')); + transform.on('close', common.mustCall()); transform.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); })); @@ -69,7 +69,7 @@ const assert = require('assert'); transform.resume(); transform.on('end', common.mustNotCall('no end event')); - transform.on('close', common.mustNotCall('no close event')); + transform.on('close', common.mustCall()); transform.on('finish', common.mustNotCall('no finish event')); // error is swallowed by the custom _destroy @@ -110,7 +110,7 @@ const assert = require('assert'); transform.on('finish', fail); transform.on('end', fail); - transform.on('close', fail); + transform.on('close', common.mustCall()); transform.destroy(); @@ -132,7 +132,7 @@ const assert = require('assert'); cb(expected); }, 1); - transform.on('close', common.mustNotCall('no close event')); + transform.on('close', common.mustCall()); transform.on('finish', common.mustNotCall('no finish event')); transform.on('end', common.mustNotCall('no end event')); transform.on('error', common.mustCall((err) => { diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js index 46c48511177..565a5564e2b 100644 --- a/test/parallel/test-stream-writable-destroy.js +++ b/test/parallel/test-stream-writable-destroy.js @@ -10,7 +10,8 @@ const { inherits } = require('util'); write(chunk, enc, cb) { cb(); } }); - write.on('finish', common.mustCall()); + write.on('finish', common.mustNotCall()); + write.on('close', common.mustCall()); write.destroy(); assert.strictEqual(write.destroyed, true); @@ -23,7 +24,8 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); - write.on('finish', common.mustCall()); + write.on('finish', common.mustNotCall()); + write.on('close', common.mustCall()); write.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); })); @@ -45,6 +47,7 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); write.on('finish', common.mustNotCall('no finish event')); + write.on('close', common.mustCall()); write.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); })); @@ -65,6 +68,7 @@ const { inherits } = require('util'); const expected = new Error('kaboom'); write.on('finish', common.mustNotCall('no finish event')); + write.on('close', common.mustCall()); // error is swallowed by the custom _destroy write.on('error', common.mustNotCall('no error event')); @@ -103,6 +107,7 @@ const { inherits } = require('util'); const fail = common.mustNotCall('no finish event'); write.on('finish', fail); + write.on('close', common.mustCall()); write.destroy(); @@ -123,6 +128,7 @@ const { inherits } = require('util'); cb(expected); }); + write.on('close', common.mustCall()); write.on('finish', common.mustNotCall('no finish event')); write.on('error', common.mustCall((err) => { assert.strictEqual(err, expected); @@ -138,6 +144,7 @@ const { inherits } = require('util'); write(chunk, enc, cb) { cb(); } }); + write.on('close', common.mustCall()); write.on('error', common.mustCall()); write.destroy(new Error('kaboom 1')); @@ -155,7 +162,7 @@ const { inherits } = require('util'); assert.strictEqual(write.destroyed, true); // the internal destroy() mechanism should not be triggered - write.on('finish', common.mustNotCall()); + write.on('close', common.mustNotCall()); write.destroy(); } diff --git a/test/parallel/test-zlib-write-after-close.js b/test/parallel/test-zlib-write-after-close.js index 88d6643da8b..160971b16bc 100644 --- a/test/parallel/test-zlib-write-after-close.js +++ b/test/parallel/test-zlib-write-after-close.js @@ -29,9 +29,9 @@ zlib.gzip('hello', common.mustCall(function(err, out) { common.expectsError( () => unzip.write(out), { - code: 'ERR_ZLIB_BINDING_CLOSED', + code: 'ERR_STREAM_DESTROYED', type: Error, - message: 'zlib binding closed' + message: 'Cannot call write after a stream was destroyed' } ); }));