stream: support abortsignal in constructor
PR-URL: https://github.com/nodejs/node/pull/36431 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
parent
806588361a
commit
040a27ae5f
@ -1938,6 +1938,9 @@ method.
|
||||
#### `new stream.Writable([options])`
|
||||
<!-- YAML
|
||||
changes:
|
||||
- version: REPLACEME
|
||||
pr-url: https://github.com/nodejs/node/pull/36431
|
||||
description: support passing in an AbortSignal.
|
||||
- version: v14.0.0
|
||||
pr-url: https://github.com/nodejs/node/pull/30623
|
||||
description: Change `autoDestroy` option default to `true`.
|
||||
@ -1985,6 +1988,7 @@ changes:
|
||||
[`stream._construct()`][writable-_construct] method.
|
||||
* `autoDestroy` {boolean} Whether this stream should automatically call
|
||||
`.destroy()` on itself after ending. **Default:** `true`.
|
||||
* `signal` {AbortSignal} A signal representing possible cancellation.
|
||||
|
||||
<!-- eslint-disable no-useless-constructor -->
|
||||
```js
|
||||
@ -2028,6 +2032,27 @@ const myWritable = new Writable({
|
||||
});
|
||||
```
|
||||
|
||||
Calling `abort` on the `AbortController` corresponding to the passed
|
||||
`AbortSignal` will behave the same way as calling `.destroy(new AbortError())`
|
||||
on the writeable stream.
|
||||
|
||||
```js
|
||||
const { Writable } = require('stream');
|
||||
|
||||
const controller = new AbortController();
|
||||
const myWritable = new Writable({
|
||||
write(chunk, encoding, callback) {
|
||||
// ...
|
||||
},
|
||||
writev(chunks, callback) {
|
||||
// ...
|
||||
},
|
||||
signal: controller.signal
|
||||
});
|
||||
// Later, abort the operation closing the stream
|
||||
controller.abort();
|
||||
|
||||
```
|
||||
#### `writable._construct(callback)`
|
||||
<!-- YAML
|
||||
added: v15.0.0
|
||||
@ -2276,6 +2301,9 @@ constructor and implement the [`readable._read()`][] method.
|
||||
#### `new stream.Readable([options])`
|
||||
<!-- YAML
|
||||
changes:
|
||||
- version: REPLACEME
|
||||
pr-url: https://github.com/nodejs/node/pull/36431
|
||||
description: support passing in an AbortSignal.
|
||||
- version: v14.0.0
|
||||
pr-url: https://github.com/nodejs/node/pull/30623
|
||||
description: Change `autoDestroy` option default to `true`.
|
||||
@ -2306,6 +2334,7 @@ changes:
|
||||
[`stream._construct()`][readable-_construct] method.
|
||||
* `autoDestroy` {boolean} Whether this stream should automatically call
|
||||
`.destroy()` on itself after ending. **Default:** `true`.
|
||||
* `signal` {AbortSignal} A signal representing possible cancellation.
|
||||
|
||||
<!-- eslint-disable no-useless-constructor -->
|
||||
```js
|
||||
@ -2346,6 +2375,23 @@ const myReadable = new Readable({
|
||||
});
|
||||
```
|
||||
|
||||
Calling `abort` on the `AbortController` corresponding to the passed
|
||||
`AbortSignal` will behave the same way as calling `.destroy(new AbortError())`
|
||||
on the readable created.
|
||||
|
||||
```js
|
||||
const fs = require('fs');
|
||||
const controller = new AbortController();
|
||||
const read = new Readable({
|
||||
read(size) {
|
||||
// ...
|
||||
},
|
||||
signal: controller.signal
|
||||
});
|
||||
// Later, abort the operation closing the stream
|
||||
controller.abort();
|
||||
```
|
||||
|
||||
#### `readable._construct(callback)`
|
||||
<!-- YAML
|
||||
added: v15.0.0
|
||||
|
@ -9,12 +9,11 @@ const eos = require('internal/streams/end-of-stream');
|
||||
const { ERR_INVALID_ARG_TYPE } = codes;
|
||||
|
||||
// This method is inlined here for readable-stream
|
||||
// It also does not allow for signal to not exist on the steam
|
||||
// https://github.com/nodejs/node/pull/36061#discussion_r533718029
|
||||
const validateAbortSignal = (signal, name) => {
|
||||
if (signal !== undefined &&
|
||||
(signal === null ||
|
||||
typeof signal !== 'object' ||
|
||||
!('aborted' in signal))) {
|
||||
if (typeof signal !== 'object' ||
|
||||
!('aborted' in signal)) {
|
||||
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal);
|
||||
}
|
||||
};
|
||||
@ -23,11 +22,17 @@ function isStream(obj) {
|
||||
return !!(obj && typeof obj.pipe === 'function');
|
||||
}
|
||||
|
||||
module.exports = function addAbortSignal(signal, stream) {
|
||||
module.exports.addAbortSignal = function addAbortSignal(signal, stream) {
|
||||
validateAbortSignal(signal, 'signal');
|
||||
if (!isStream(stream)) {
|
||||
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
|
||||
}
|
||||
return module.exports.addAbortSignalNoValidate(signal, stream);
|
||||
};
|
||||
module.exports.addAbortSignalNoValidate = function(signal, stream) {
|
||||
if (typeof signal !== 'object' || !('aborted' in signal)) {
|
||||
return stream;
|
||||
}
|
||||
const onAbort = () => {
|
||||
stream.destroy(new AbortError());
|
||||
};
|
||||
|
@ -41,6 +41,10 @@ const EE = require('events');
|
||||
const { Stream, prependListener } = require('internal/streams/legacy');
|
||||
const { Buffer } = require('buffer');
|
||||
|
||||
const {
|
||||
addAbortSignalNoValidate,
|
||||
} = require('internal/streams/add-abort-signal');
|
||||
|
||||
let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
|
||||
debug = fn;
|
||||
});
|
||||
@ -192,6 +196,8 @@ function Readable(options) {
|
||||
|
||||
if (typeof options.construct === 'function')
|
||||
this._construct = options.construct;
|
||||
if (options.signal && !isDuplex)
|
||||
addAbortSignalNoValidate(options.signal, this);
|
||||
}
|
||||
|
||||
Stream.call(this, options);
|
||||
|
@ -41,6 +41,11 @@ const EE = require('events');
|
||||
const Stream = require('internal/streams/legacy').Stream;
|
||||
const { Buffer } = require('buffer');
|
||||
const destroyImpl = require('internal/streams/destroy');
|
||||
|
||||
const {
|
||||
addAbortSignalNoValidate,
|
||||
} = require('internal/streams/add-abort-signal');
|
||||
|
||||
const {
|
||||
getHighWaterMark,
|
||||
getDefaultHighWaterMark
|
||||
@ -263,6 +268,8 @@ function Writable(options) {
|
||||
|
||||
if (typeof options.construct === 'function')
|
||||
this._construct = options.construct;
|
||||
if (options.signal)
|
||||
addAbortSignalNoValidate(options.signal, this);
|
||||
}
|
||||
|
||||
Stream.call(this, options);
|
||||
|
@ -43,7 +43,8 @@ Stream.Duplex = require('internal/streams/duplex');
|
||||
Stream.Transform = require('internal/streams/transform');
|
||||
Stream.PassThrough = require('internal/streams/passthrough');
|
||||
Stream.pipeline = pipeline;
|
||||
Stream.addAbortSignal = require('internal/streams/add-abort-signal');
|
||||
const { addAbortSignal } = require('internal/streams/add-abort-signal');
|
||||
Stream.addAbortSignal = addAbortSignal;
|
||||
Stream.finished = eos;
|
||||
|
||||
function lazyLoadPromises() {
|
||||
|
@ -238,3 +238,20 @@ const assert = require('assert');
|
||||
});
|
||||
duplex.on('close', common.mustCall());
|
||||
}
|
||||
{
|
||||
// Check abort signal
|
||||
const controller = new AbortController();
|
||||
const { signal } = controller;
|
||||
const duplex = new Duplex({
|
||||
write(chunk, enc, cb) { cb(); },
|
||||
read() {},
|
||||
signal,
|
||||
});
|
||||
let count = 0;
|
||||
duplex.on('error', common.mustCall((e) => {
|
||||
assert.strictEqual(count++, 0); // Ensure not called twice
|
||||
assert.strictEqual(e.name, 'AbortError');
|
||||
}));
|
||||
duplex.on('close', common.mustCall());
|
||||
controller.abort();
|
||||
}
|
||||
|
@ -284,6 +284,22 @@ const assert = require('assert');
|
||||
read.on('data', common.mustNotCall());
|
||||
}
|
||||
|
||||
{
|
||||
const controller = new AbortController();
|
||||
const read = new Readable({
|
||||
signal: controller.signal,
|
||||
read() {
|
||||
this.push('asd');
|
||||
},
|
||||
});
|
||||
|
||||
read.on('error', common.mustCall((e) => {
|
||||
assert.strictEqual(e.name, 'AbortError');
|
||||
}));
|
||||
controller.abort();
|
||||
read.on('data', common.mustNotCall());
|
||||
}
|
||||
|
||||
{
|
||||
const controller = new AbortController();
|
||||
const read = addAbortSignal(controller.signal, new Readable({
|
||||
|
@ -431,3 +431,18 @@ const assert = require('assert');
|
||||
write.write('asd');
|
||||
ac.abort();
|
||||
}
|
||||
|
||||
{
|
||||
const ac = new AbortController();
|
||||
const write = new Writable({
|
||||
signal: ac.signal,
|
||||
write(chunk, enc, cb) { cb(); }
|
||||
});
|
||||
|
||||
write.on('error', common.mustCall((e) => {
|
||||
assert.strictEqual(e.name, 'AbortError');
|
||||
assert.strictEqual(write.destroyed, true);
|
||||
}));
|
||||
write.write('asd');
|
||||
ac.abort();
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user