stream: add auto-destroy mode
PR-URL: https://github.com/nodejs/node/pull/22795 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Jeremiah Senkpiel <fishrock123@rocketmail.com>
This commit is contained in:
parent
cd1193d9ed
commit
f24b070cb7
@ -1493,6 +1493,11 @@ changes:
|
||||
pr-url: https://github.com/nodejs/node/pull/18438
|
||||
description: >
|
||||
Add `emitClose` option to specify if `'close'` is emitted on destroy
|
||||
- version: REPLACEME
|
||||
pr-url: https://github.com/nodejs/node/pull/22795
|
||||
description: >
|
||||
Add `autoDestroy` option to automatically `destroy()` the stream
|
||||
when it emits `'finish'` or errors
|
||||
-->
|
||||
|
||||
* `options` {Object}
|
||||
@ -1521,6 +1526,8 @@ changes:
|
||||
[`stream._destroy()`][writable-_destroy] method.
|
||||
* `final` {Function} Implementation for the
|
||||
[`stream._final()`][stream-_final] method.
|
||||
* `autoDestroy` {boolean} Whether this stream should automatically call
|
||||
`.destroy()` on itself after ending. **Default:** `false`.
|
||||
|
||||
```js
|
||||
const { Writable } = require('stream');
|
||||
@ -1756,6 +1763,14 @@ Custom `Readable` streams *must* call the `new stream.Readable([options])`
|
||||
constructor and implement the `readable._read()` method.
|
||||
|
||||
#### new stream.Readable([options])
|
||||
<!-- YAML
|
||||
changes:
|
||||
- version: REPLACEME
|
||||
pr-url: https://github.com/nodejs/node/pull/22795
|
||||
description: >
|
||||
Add `autoDestroy` option to automatically `destroy()` the stream
|
||||
when it emits `'end'` or errors
|
||||
-->
|
||||
|
||||
* `options` {Object}
|
||||
* `highWaterMark` {number} The maximum [number of bytes][hwm-gotcha] to store
|
||||
@ -1770,6 +1785,8 @@ constructor and implement the `readable._read()` method.
|
||||
method.
|
||||
* `destroy` {Function} Implementation for the
|
||||
[`stream._destroy()`][readable-_destroy] method.
|
||||
* `autoDestroy` {boolean} Whether this stream should automatically call
|
||||
`.destroy()` on itself after ending. **Default:** `false`.
|
||||
|
||||
```js
|
||||
const { Readable } = require('stream');
|
||||
|
@ -46,6 +46,7 @@ let createReadableStreamAsyncIterator;
|
||||
|
||||
util.inherits(Readable, Stream);
|
||||
|
||||
const { errorOrDestroy } = destroyImpl;
|
||||
const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume'];
|
||||
|
||||
function prependListener(emitter, event, fn) {
|
||||
@ -117,6 +118,9 @@ function ReadableState(options, stream, isDuplex) {
|
||||
// Should close be emitted on destroy. Defaults to true.
|
||||
this.emitClose = options.emitClose !== false;
|
||||
|
||||
// Should .destroy() be called after 'end' (and potentially 'finish')
|
||||
this.autoDestroy = !!options.autoDestroy;
|
||||
|
||||
// has it been destroyed
|
||||
this.destroyed = false;
|
||||
|
||||
@ -235,7 +239,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
|
||||
if (!skipChunkCheck)
|
||||
er = chunkInvalid(state, chunk);
|
||||
if (er) {
|
||||
stream.emit('error', er);
|
||||
errorOrDestroy(stream, er);
|
||||
} else if (state.objectMode || chunk && chunk.length > 0) {
|
||||
if (typeof chunk !== 'string' &&
|
||||
!state.objectMode &&
|
||||
@ -245,11 +249,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
|
||||
|
||||
if (addToFront) {
|
||||
if (state.endEmitted)
|
||||
stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
|
||||
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
|
||||
else
|
||||
addChunk(stream, state, chunk, true);
|
||||
} else if (state.ended) {
|
||||
stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF());
|
||||
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
|
||||
} else if (state.destroyed) {
|
||||
return false;
|
||||
} else {
|
||||
@ -581,7 +585,7 @@ function maybeReadMore_(stream, state) {
|
||||
// for virtual (non-string, non-buffer) streams, "length" is somewhat
|
||||
// arbitrary, and perhaps not very meaningful.
|
||||
Readable.prototype._read = function(n) {
|
||||
this.emit('error', new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
|
||||
errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
|
||||
};
|
||||
|
||||
Readable.prototype.pipe = function(dest, pipeOpts) {
|
||||
@ -687,7 +691,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
|
||||
unpipe();
|
||||
dest.removeListener('error', onerror);
|
||||
if (EE.listenerCount(dest, 'error') === 0)
|
||||
dest.emit('error', er);
|
||||
errorOrDestroy(dest, er);
|
||||
}
|
||||
|
||||
// Make sure our error handler is attached before userland ones.
|
||||
@ -1092,5 +1096,14 @@ function endReadableNT(state, stream) {
|
||||
state.endEmitted = true;
|
||||
stream.readable = false;
|
||||
stream.emit('end');
|
||||
|
||||
if (state.autoDestroy) {
|
||||
// In case of duplex streams we need a way to detect
|
||||
// if the writable side is ready for autoDestroy as well
|
||||
const wState = stream._writableState;
|
||||
if (!wState || (wState.autoDestroy && wState.finished)) {
|
||||
stream.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -45,6 +45,8 @@ const {
|
||||
ERR_UNKNOWN_ENCODING
|
||||
} = require('internal/errors').codes;
|
||||
|
||||
const { errorOrDestroy } = destroyImpl;
|
||||
|
||||
util.inherits(Writable, Stream);
|
||||
|
||||
function nop() {}
|
||||
@ -147,6 +149,9 @@ function WritableState(options, stream, isDuplex) {
|
||||
// Should close be emitted on destroy. Defaults to true.
|
||||
this.emitClose = options.emitClose !== false;
|
||||
|
||||
// Should .destroy() be called after 'finish' (and potentially 'end')
|
||||
this.autoDestroy = !!options.autoDestroy;
|
||||
|
||||
// count buffered requests
|
||||
this.bufferedRequestCount = 0;
|
||||
|
||||
@ -235,14 +240,14 @@ function Writable(options) {
|
||||
|
||||
// Otherwise people can pipe Writable streams, which is just wrong.
|
||||
Writable.prototype.pipe = function() {
|
||||
this.emit('error', new ERR_STREAM_CANNOT_PIPE());
|
||||
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
|
||||
};
|
||||
|
||||
|
||||
function writeAfterEnd(stream, cb) {
|
||||
var er = new ERR_STREAM_WRITE_AFTER_END();
|
||||
// TODO: defer error events consistently everywhere, not just the cb
|
||||
stream.emit('error', er);
|
||||
errorOrDestroy(stream, er);
|
||||
process.nextTick(cb, er);
|
||||
}
|
||||
|
||||
@ -258,7 +263,7 @@ function validChunk(stream, state, chunk, cb) {
|
||||
er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
|
||||
}
|
||||
if (er) {
|
||||
stream.emit('error', er);
|
||||
errorOrDestroy(stream, er);
|
||||
process.nextTick(cb, er);
|
||||
return false;
|
||||
}
|
||||
@ -422,13 +427,13 @@ function onwriteError(stream, state, sync, er, cb) {
|
||||
// after error
|
||||
process.nextTick(finishMaybe, stream, state);
|
||||
stream._writableState.errorEmitted = true;
|
||||
stream.emit('error', er);
|
||||
errorOrDestroy(stream, er);
|
||||
} else {
|
||||
// the caller expect this to happen before if
|
||||
// it is async
|
||||
cb(er);
|
||||
stream._writableState.errorEmitted = true;
|
||||
stream.emit('error', er);
|
||||
errorOrDestroy(stream, er);
|
||||
// this can emit finish, but finish must
|
||||
// always follow error
|
||||
finishMaybe(stream, state);
|
||||
@ -612,7 +617,7 @@ function callFinal(stream, state) {
|
||||
stream._final((err) => {
|
||||
state.pendingcb--;
|
||||
if (err) {
|
||||
stream.emit('error', err);
|
||||
errorOrDestroy(stream, err);
|
||||
}
|
||||
state.prefinished = true;
|
||||
stream.emit('prefinish');
|
||||
@ -639,6 +644,15 @@ function finishMaybe(stream, state) {
|
||||
if (state.pendingcb === 0) {
|
||||
state.finished = true;
|
||||
stream.emit('finish');
|
||||
|
||||
if (state.autoDestroy) {
|
||||
// In case of duplex streams we need a way to detect
|
||||
// if the readable side is ready for autoDestroy as well
|
||||
const rState = stream._readableState;
|
||||
if (!rState || (rState.autoDestroy && rState.endEmitted)) {
|
||||
stream.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return need;
|
||||
|
@ -82,7 +82,25 @@ function emitErrorNT(self, err) {
|
||||
self.emit('error', err);
|
||||
}
|
||||
|
||||
function errorOrDestroy(stream, err) {
|
||||
// We have tests that rely on errors being emitted
|
||||
// in the same tick, so changing this is semver major.
|
||||
// For now when you opt-in to autoDestroy we allow
|
||||
// the error to be emitted nextTick. In a future
|
||||
// semver major update we should change the default to this.
|
||||
|
||||
const rState = stream._readableState;
|
||||
const wState = stream._writableState;
|
||||
|
||||
if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy))
|
||||
stream.destroy(err);
|
||||
else
|
||||
stream.emit('error', err);
|
||||
}
|
||||
|
||||
|
||||
module.exports = {
|
||||
destroy,
|
||||
undestroy
|
||||
undestroy,
|
||||
errorOrDestroy
|
||||
};
|
||||
|
84
test/parallel/test-stream-auto-destroy.js
Normal file
84
test/parallel/test-stream-auto-destroy.js
Normal file
@ -0,0 +1,84 @@
|
||||
'use strict';
|
||||
const common = require('../common');
|
||||
const stream = require('stream');
|
||||
const assert = require('assert');
|
||||
|
||||
{
|
||||
const r = new stream.Readable({
|
||||
autoDestroy: true,
|
||||
read() {
|
||||
this.push('hello');
|
||||
this.push('world');
|
||||
this.push(null);
|
||||
},
|
||||
destroy: common.mustCall((err, cb) => cb())
|
||||
});
|
||||
|
||||
let ended = false;
|
||||
|
||||
r.resume();
|
||||
|
||||
r.on('end', common.mustCall(() => {
|
||||
ended = true;
|
||||
}));
|
||||
|
||||
r.on('close', common.mustCall(() => {
|
||||
assert(ended);
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
const w = new stream.Writable({
|
||||
autoDestroy: true,
|
||||
write(data, enc, cb) {
|
||||
cb(null);
|
||||
},
|
||||
destroy: common.mustCall((err, cb) => cb())
|
||||
});
|
||||
|
||||
let finished = false;
|
||||
|
||||
w.write('hello');
|
||||
w.write('world');
|
||||
w.end();
|
||||
|
||||
w.on('finish', common.mustCall(() => {
|
||||
finished = true;
|
||||
}));
|
||||
|
||||
w.on('close', common.mustCall(() => {
|
||||
assert(finished);
|
||||
}));
|
||||
}
|
||||
|
||||
{
|
||||
const t = new stream.Transform({
|
||||
autoDestroy: true,
|
||||
transform(data, enc, cb) {
|
||||
cb(null, data);
|
||||
},
|
||||
destroy: common.mustCall((err, cb) => cb())
|
||||
});
|
||||
|
||||
let ended = false;
|
||||
let finished = false;
|
||||
|
||||
t.write('hello');
|
||||
t.write('world');
|
||||
t.end();
|
||||
|
||||
t.resume();
|
||||
|
||||
t.on('end', common.mustCall(() => {
|
||||
ended = true;
|
||||
}));
|
||||
|
||||
t.on('finish', common.mustCall(() => {
|
||||
finished = true;
|
||||
}));
|
||||
|
||||
t.on('close', common.mustCall(() => {
|
||||
assert(ended);
|
||||
assert(finished);
|
||||
}));
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user