stream: add pipeline and finished
PR-URL: https://github.com/nodejs/node/pull/19828 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
parent
5cc948b77a
commit
f64bebf205
@ -1431,6 +1431,12 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream.
|
|||||||
|
|
||||||
An attempt was made to call [`stream.write()`][] with a `null` chunk.
|
An attempt was made to call [`stream.write()`][] with a `null` chunk.
|
||||||
|
|
||||||
|
<a id="ERR_STREAM_PREMATURE_CLOSE"></a>
|
||||||
|
### ERR_STREAM_PREMATURE_CLOSE
|
||||||
|
|
||||||
|
An error returned by `stream.finished()` and `stream.pipeline()`, when a stream
|
||||||
|
or a pipeline ends non gracefully with no explicit error.
|
||||||
|
|
||||||
<a id="ERR_STREAM_PUSH_AFTER_EOF"></a>
|
<a id="ERR_STREAM_PUSH_AFTER_EOF"></a>
|
||||||
### ERR_STREAM_PUSH_AFTER_EOF
|
### ERR_STREAM_PUSH_AFTER_EOF
|
||||||
|
|
||||||
|
@ -46,6 +46,9 @@ There are four fundamental stream types within Node.js:
|
|||||||
* [Transform][] - Duplex streams that can modify or transform the data as it
|
* [Transform][] - Duplex streams that can modify or transform the data as it
|
||||||
is written and read (for example [`zlib.createDeflate()`][]).
|
is written and read (for example [`zlib.createDeflate()`][]).
|
||||||
|
|
||||||
|
Additionally this module includes the utility functions [pipeline][] and
|
||||||
|
[finished][].
|
||||||
|
|
||||||
### Object Mode
|
### Object Mode
|
||||||
|
|
||||||
All streams created by Node.js APIs operate exclusively on strings and `Buffer`
|
All streams created by Node.js APIs operate exclusively on strings and `Buffer`
|
||||||
@ -1287,6 +1290,107 @@ implementors should not override this method, but instead implement
|
|||||||
[`readable._destroy()`][readable-_destroy].
|
[`readable._destroy()`][readable-_destroy].
|
||||||
The default implementation of `_destroy()` for `Transform` also emit `'close'`.
|
The default implementation of `_destroy()` for `Transform` also emit `'close'`.
|
||||||
|
|
||||||
|
### stream.finished(stream, callback)
|
||||||
|
<!-- YAML
|
||||||
|
added: REPLACEME
|
||||||
|
-->
|
||||||
|
|
||||||
|
* `stream` {Stream} A readable and/or writable stream.
|
||||||
|
* `callback` {Function} A callback function that takes an optional error
|
||||||
|
argument.
|
||||||
|
|
||||||
|
A function to get notified when a stream is no longer readable, writable
|
||||||
|
or has experienced an error or a premature close event.
|
||||||
|
|
||||||
|
```js
|
||||||
|
const { finished } = require('stream');
|
||||||
|
|
||||||
|
const rs = fs.createReadStream('archive.tar');
|
||||||
|
|
||||||
|
finished(rs, (err) => {
|
||||||
|
if (err) {
|
||||||
|
console.error('Stream failed', err);
|
||||||
|
} else {
|
||||||
|
console.log('Stream is done reading');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
rs.resume(); // drain the stream
|
||||||
|
```
|
||||||
|
|
||||||
|
Especially useful in error handling scenarios where a stream is destroyed
|
||||||
|
prematurely (like an aborted HTTP request), and will not emit `'end'`
|
||||||
|
or `'finish'`.
|
||||||
|
|
||||||
|
The `finished` API is promisify'able as well;
|
||||||
|
|
||||||
|
```js
|
||||||
|
const finished = util.promisify(stream.finished);
|
||||||
|
|
||||||
|
const rs = fs.createReadStream('archive.tar');
|
||||||
|
|
||||||
|
async function run() {
|
||||||
|
await finished(rs);
|
||||||
|
console.log('Stream is done reading');
|
||||||
|
}
|
||||||
|
|
||||||
|
run().catch(console.error);
|
||||||
|
rs.resume(); // drain the stream
|
||||||
|
```
|
||||||
|
|
||||||
|
### stream.pipeline(...streams[, callback])
|
||||||
|
<!-- YAML
|
||||||
|
added: REPLACEME
|
||||||
|
-->
|
||||||
|
|
||||||
|
* `...streams` {Stream} Two or more streams to pipe between.
|
||||||
|
* `callback` {Function} A callback function that takes an optional error
|
||||||
|
argument.
|
||||||
|
|
||||||
|
A module method to pipe between streams forwarding errors and properly cleaning
|
||||||
|
up and provide a callback when the pipeline is complete.
|
||||||
|
|
||||||
|
```js
|
||||||
|
const { pipeline } = require('stream');
|
||||||
|
const fs = require('fs');
|
||||||
|
const zlib = require('zlib');
|
||||||
|
|
||||||
|
// Use the pipeline API to easily pipe a series of streams
|
||||||
|
// together and get notified when the pipeline is fully done.
|
||||||
|
|
||||||
|
// A pipeline to gzip a potentially huge tar file efficiently:
|
||||||
|
|
||||||
|
pipeline(
|
||||||
|
fs.createReadStream('archive.tar'),
|
||||||
|
zlib.createGzip(),
|
||||||
|
fs.createWriteStream('archive.tar.gz'),
|
||||||
|
(err) => {
|
||||||
|
if (err) {
|
||||||
|
console.error('Pipeline failed', err);
|
||||||
|
} else {
|
||||||
|
console.log('Pipeline succeeded');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
```
|
||||||
|
|
||||||
|
The `pipeline` API is promisify'able as well:
|
||||||
|
|
||||||
|
```js
|
||||||
|
const pipeline = util.promisify(stream.pipeline);
|
||||||
|
|
||||||
|
async function run() {
|
||||||
|
await pipeline(
|
||||||
|
fs.createReadStream('archive.tar'),
|
||||||
|
zlib.createGzip(),
|
||||||
|
fs.createWriteStream('archive.tar.gz')
|
||||||
|
);
|
||||||
|
console.log('Pipeline succeeded');
|
||||||
|
}
|
||||||
|
|
||||||
|
run().catch(console.error);
|
||||||
|
```
|
||||||
|
|
||||||
## API for Stream Implementers
|
## API for Stream Implementers
|
||||||
|
|
||||||
<!--type=misc-->
|
<!--type=misc-->
|
||||||
@ -2397,6 +2501,8 @@ contain multi-byte characters.
|
|||||||
[http-incoming-message]: http.html#http_class_http_incomingmessage
|
[http-incoming-message]: http.html#http_class_http_incomingmessage
|
||||||
[zlib]: zlib.html
|
[zlib]: zlib.html
|
||||||
[hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding
|
[hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding
|
||||||
|
[pipeline]: #stream_stream_pipeline_streams_callback
|
||||||
|
[finished]: #stream_stream_finished_stream_callback
|
||||||
[stream-_flush]: #stream_transform_flush_callback
|
[stream-_flush]: #stream_transform_flush_callback
|
||||||
[stream-_read]: #stream_readable_read_size_1
|
[stream-_read]: #stream_readable_read_size_1
|
||||||
[stream-_transform]: #stream_transform_transform_chunk_encoding_callback
|
[stream-_transform]: #stream_transform_transform_chunk_encoding_callback
|
||||||
|
@ -961,6 +961,7 @@ E('ERR_STDOUT_CLOSE', 'process.stdout cannot be closed', Error);
|
|||||||
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
|
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
|
||||||
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
|
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
|
||||||
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
|
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
|
||||||
|
E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
|
||||||
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
|
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
|
||||||
E('ERR_STREAM_UNSHIFT_AFTER_END_EVENT',
|
E('ERR_STREAM_UNSHIFT_AFTER_END_EVENT',
|
||||||
'stream.unshift() after end event', Error);
|
'stream.unshift() after end event', Error);
|
||||||
|
96
lib/internal/streams/end-of-stream.js
Normal file
96
lib/internal/streams/end-of-stream.js
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
// Ported from https://github.com/mafintosh/end-of-stream with
|
||||||
|
// permission from the author, Mathias Buus (@mafintosh).
|
||||||
|
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
const {
|
||||||
|
ERR_STREAM_PREMATURE_CLOSE
|
||||||
|
} = require('internal/errors').codes;
|
||||||
|
|
||||||
|
function noop() {}
|
||||||
|
|
||||||
|
function isRequest(stream) {
|
||||||
|
return stream.setHeader && typeof stream.abort === 'function';
|
||||||
|
}
|
||||||
|
|
||||||
|
function once(callback) {
|
||||||
|
let called = false;
|
||||||
|
return function(err) {
|
||||||
|
if (called) return;
|
||||||
|
called = true;
|
||||||
|
callback.call(this, err);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function eos(stream, opts, callback) {
|
||||||
|
if (typeof opts === 'function') return eos(stream, null, opts);
|
||||||
|
if (!opts) opts = {};
|
||||||
|
|
||||||
|
callback = once(callback || noop);
|
||||||
|
|
||||||
|
const ws = stream._writableState;
|
||||||
|
const rs = stream._readableState;
|
||||||
|
let readable = opts.readable || (opts.readable !== false && stream.readable);
|
||||||
|
let writable = opts.writable || (opts.writable !== false && stream.writable);
|
||||||
|
|
||||||
|
const onlegacyfinish = () => {
|
||||||
|
if (!stream.writable) onfinish();
|
||||||
|
};
|
||||||
|
|
||||||
|
const onfinish = () => {
|
||||||
|
writable = false;
|
||||||
|
if (!readable) callback.call(stream);
|
||||||
|
};
|
||||||
|
|
||||||
|
const onend = () => {
|
||||||
|
readable = false;
|
||||||
|
if (!writable) callback.call(stream);
|
||||||
|
};
|
||||||
|
|
||||||
|
const onerror = (err) => {
|
||||||
|
callback.call(stream, err);
|
||||||
|
};
|
||||||
|
|
||||||
|
const onclose = () => {
|
||||||
|
if (readable && !(rs && rs.ended)) {
|
||||||
|
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
|
||||||
|
}
|
||||||
|
if (writable && !(ws && ws.ended)) {
|
||||||
|
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const onrequest = () => {
|
||||||
|
stream.req.on('finish', onfinish);
|
||||||
|
};
|
||||||
|
|
||||||
|
if (isRequest(stream)) {
|
||||||
|
stream.on('complete', onfinish);
|
||||||
|
stream.on('abort', onclose);
|
||||||
|
if (stream.req) onrequest();
|
||||||
|
else stream.on('request', onrequest);
|
||||||
|
} else if (writable && !ws) { // legacy streams
|
||||||
|
stream.on('end', onlegacyfinish);
|
||||||
|
stream.on('close', onlegacyfinish);
|
||||||
|
}
|
||||||
|
|
||||||
|
stream.on('end', onend);
|
||||||
|
stream.on('finish', onfinish);
|
||||||
|
if (opts.error !== false) stream.on('error', onerror);
|
||||||
|
stream.on('close', onclose);
|
||||||
|
|
||||||
|
return function() {
|
||||||
|
stream.removeListener('complete', onfinish);
|
||||||
|
stream.removeListener('abort', onclose);
|
||||||
|
stream.removeListener('request', onrequest);
|
||||||
|
if (stream.req) stream.req.removeListener('finish', onfinish);
|
||||||
|
stream.removeListener('end', onlegacyfinish);
|
||||||
|
stream.removeListener('close', onlegacyfinish);
|
||||||
|
stream.removeListener('finish', onfinish);
|
||||||
|
stream.removeListener('end', onend);
|
||||||
|
stream.removeListener('error', onerror);
|
||||||
|
stream.removeListener('close', onclose);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = eos;
|
95
lib/internal/streams/pipeline.js
Normal file
95
lib/internal/streams/pipeline.js
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
// Ported from https://github.com/mafintosh/pump with
|
||||||
|
// permission from the author, Mathias Buus (@mafintosh).
|
||||||
|
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
const eos = require('internal/streams/end-of-stream');
|
||||||
|
|
||||||
|
const {
|
||||||
|
ERR_MISSING_ARGS,
|
||||||
|
ERR_STREAM_DESTROYED
|
||||||
|
} = require('internal/errors').codes;
|
||||||
|
|
||||||
|
function once(callback) {
|
||||||
|
let called = false;
|
||||||
|
return function(err) {
|
||||||
|
if (called) return;
|
||||||
|
called = true;
|
||||||
|
callback(err);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function noop() {}
|
||||||
|
|
||||||
|
function isRequest(stream) {
|
||||||
|
return stream.setHeader && typeof stream.abort === 'function';
|
||||||
|
}
|
||||||
|
|
||||||
|
function destroyer(stream, reading, writing, callback) {
|
||||||
|
callback = once(callback);
|
||||||
|
|
||||||
|
let closed = false;
|
||||||
|
stream.on('close', () => {
|
||||||
|
closed = true;
|
||||||
|
});
|
||||||
|
|
||||||
|
eos(stream, { readable: reading, writable: writing }, (err) => {
|
||||||
|
if (err) return callback(err);
|
||||||
|
closed = true;
|
||||||
|
callback();
|
||||||
|
});
|
||||||
|
|
||||||
|
let destroyed = false;
|
||||||
|
return (err) => {
|
||||||
|
if (closed) return;
|
||||||
|
if (destroyed) return;
|
||||||
|
destroyed = true;
|
||||||
|
|
||||||
|
// request.destroy just do .end - .abort is what we want
|
||||||
|
if (isRequest(stream)) return stream.abort();
|
||||||
|
if (typeof stream.destroy === 'function') return stream.destroy();
|
||||||
|
|
||||||
|
callback(err || new ERR_STREAM_DESTROYED('pipe'));
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function call(fn) {
|
||||||
|
fn();
|
||||||
|
}
|
||||||
|
|
||||||
|
function pipe(from, to) {
|
||||||
|
return from.pipe(to);
|
||||||
|
}
|
||||||
|
|
||||||
|
function popCallback(streams) {
|
||||||
|
if (!streams.length) return noop;
|
||||||
|
if (typeof streams[streams.length - 1] !== 'function') return noop;
|
||||||
|
return streams.pop();
|
||||||
|
}
|
||||||
|
|
||||||
|
function pipeline(...streams) {
|
||||||
|
const callback = popCallback(streams);
|
||||||
|
|
||||||
|
if (Array.isArray(streams[0])) streams = streams[0];
|
||||||
|
|
||||||
|
if (streams.length < 2) {
|
||||||
|
throw new ERR_MISSING_ARGS('streams');
|
||||||
|
}
|
||||||
|
|
||||||
|
let error;
|
||||||
|
const destroys = streams.map(function(stream, i) {
|
||||||
|
const reading = i < streams.length - 1;
|
||||||
|
const writing = i > 0;
|
||||||
|
return destroyer(stream, reading, writing, function(err) {
|
||||||
|
if (!error) error = err;
|
||||||
|
if (err) destroys.forEach(call);
|
||||||
|
if (reading) return;
|
||||||
|
destroys.forEach(call);
|
||||||
|
callback(error);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
return streams.reduce(pipe);
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = pipeline;
|
@ -22,6 +22,8 @@
|
|||||||
'use strict';
|
'use strict';
|
||||||
|
|
||||||
const { Buffer } = require('buffer');
|
const { Buffer } = require('buffer');
|
||||||
|
const pipeline = require('internal/streams/pipeline');
|
||||||
|
const eos = require('internal/streams/end-of-stream');
|
||||||
|
|
||||||
// Note: export Stream before Readable/Writable/Duplex/...
|
// Note: export Stream before Readable/Writable/Duplex/...
|
||||||
// to avoid a cross-reference(require) issues
|
// to avoid a cross-reference(require) issues
|
||||||
@ -33,6 +35,9 @@ Stream.Duplex = require('_stream_duplex');
|
|||||||
Stream.Transform = require('_stream_transform');
|
Stream.Transform = require('_stream_transform');
|
||||||
Stream.PassThrough = require('_stream_passthrough');
|
Stream.PassThrough = require('_stream_passthrough');
|
||||||
|
|
||||||
|
Stream.pipeline = pipeline;
|
||||||
|
Stream.finished = eos;
|
||||||
|
|
||||||
// Backwards-compat with node 0.4.x
|
// Backwards-compat with node 0.4.x
|
||||||
Stream.Stream = Stream;
|
Stream.Stream = Stream;
|
||||||
|
|
||||||
|
2
node.gyp
2
node.gyp
@ -154,6 +154,8 @@
|
|||||||
'lib/internal/streams/legacy.js',
|
'lib/internal/streams/legacy.js',
|
||||||
'lib/internal/streams/destroy.js',
|
'lib/internal/streams/destroy.js',
|
||||||
'lib/internal/streams/state.js',
|
'lib/internal/streams/state.js',
|
||||||
|
'lib/internal/streams/pipeline.js',
|
||||||
|
'lib/internal/streams/end-of-stream.js',
|
||||||
'lib/internal/wrap_js_stream.js',
|
'lib/internal/wrap_js_stream.js',
|
||||||
'deps/v8/tools/splaytree.js',
|
'deps/v8/tools/splaytree.js',
|
||||||
'deps/v8/tools/codemap.js',
|
'deps/v8/tools/codemap.js',
|
||||||
|
123
test/parallel/test-stream-finished.js
Normal file
123
test/parallel/test-stream-finished.js
Normal file
@ -0,0 +1,123 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
const common = require('../common');
|
||||||
|
const { Writable, Readable, Transform, finished } = require('stream');
|
||||||
|
const assert = require('assert');
|
||||||
|
const fs = require('fs');
|
||||||
|
const { promisify } = require('util');
|
||||||
|
|
||||||
|
common.crashOnUnhandledRejection();
|
||||||
|
|
||||||
|
{
|
||||||
|
const rs = new Readable({
|
||||||
|
read() {}
|
||||||
|
});
|
||||||
|
|
||||||
|
finished(rs, common.mustCall((err) => {
|
||||||
|
assert(!err, 'no error');
|
||||||
|
}));
|
||||||
|
|
||||||
|
rs.push(null);
|
||||||
|
rs.resume();
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const ws = new Writable({
|
||||||
|
write(data, enc, cb) {
|
||||||
|
cb();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
finished(ws, common.mustCall((err) => {
|
||||||
|
assert(!err, 'no error');
|
||||||
|
}));
|
||||||
|
|
||||||
|
ws.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const tr = new Transform({
|
||||||
|
transform(data, enc, cb) {
|
||||||
|
cb();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let finish = false;
|
||||||
|
let ended = false;
|
||||||
|
|
||||||
|
tr.on('end', () => {
|
||||||
|
ended = true;
|
||||||
|
});
|
||||||
|
|
||||||
|
tr.on('finish', () => {
|
||||||
|
finish = true;
|
||||||
|
});
|
||||||
|
|
||||||
|
finished(tr, common.mustCall((err) => {
|
||||||
|
assert(!err, 'no error');
|
||||||
|
assert(finish);
|
||||||
|
assert(ended);
|
||||||
|
}));
|
||||||
|
|
||||||
|
tr.end();
|
||||||
|
tr.resume();
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const rs = fs.createReadStream(__filename);
|
||||||
|
|
||||||
|
rs.resume();
|
||||||
|
finished(rs, common.mustCall());
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const finishedPromise = promisify(finished);
|
||||||
|
|
||||||
|
async function run() {
|
||||||
|
const rs = fs.createReadStream(__filename);
|
||||||
|
const done = common.mustCall();
|
||||||
|
|
||||||
|
let ended = false;
|
||||||
|
rs.resume();
|
||||||
|
rs.on('end', () => {
|
||||||
|
ended = true;
|
||||||
|
});
|
||||||
|
await finishedPromise(rs);
|
||||||
|
assert(ended);
|
||||||
|
done();
|
||||||
|
}
|
||||||
|
|
||||||
|
run();
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const rs = fs.createReadStream('file-does-not-exist');
|
||||||
|
|
||||||
|
finished(rs, common.mustCall((err) => {
|
||||||
|
assert.strictEqual(err.code, 'ENOENT');
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const rs = new Readable();
|
||||||
|
|
||||||
|
finished(rs, common.mustCall((err) => {
|
||||||
|
assert(!err, 'no error');
|
||||||
|
}));
|
||||||
|
|
||||||
|
rs.push(null);
|
||||||
|
rs.emit('close'); // should not trigger an error
|
||||||
|
rs.resume();
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const rs = new Readable();
|
||||||
|
|
||||||
|
finished(rs, common.mustCall((err) => {
|
||||||
|
assert(err, 'premature close error');
|
||||||
|
}));
|
||||||
|
|
||||||
|
rs.emit('close'); // should trigger error
|
||||||
|
rs.push(null);
|
||||||
|
rs.resume();
|
||||||
|
}
|
483
test/parallel/test-stream-pipeline.js
Normal file
483
test/parallel/test-stream-pipeline.js
Normal file
@ -0,0 +1,483 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
const common = require('../common');
|
||||||
|
if (!common.hasCrypto)
|
||||||
|
common.skip('missing crypto');
|
||||||
|
const { Stream, Writable, Readable, Transform, pipeline } = require('stream');
|
||||||
|
const assert = require('assert');
|
||||||
|
const http = require('http');
|
||||||
|
const http2 = require('http2');
|
||||||
|
const { promisify } = require('util');
|
||||||
|
|
||||||
|
common.crashOnUnhandledRejection();
|
||||||
|
|
||||||
|
{
|
||||||
|
let finished = false;
|
||||||
|
const processed = [];
|
||||||
|
const expected = [
|
||||||
|
Buffer.from('a'),
|
||||||
|
Buffer.from('b'),
|
||||||
|
Buffer.from('c')
|
||||||
|
];
|
||||||
|
|
||||||
|
const read = new Readable({
|
||||||
|
read() {}
|
||||||
|
});
|
||||||
|
|
||||||
|
const write = new Writable({
|
||||||
|
write(data, enc, cb) {
|
||||||
|
processed.push(data);
|
||||||
|
cb();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
write.on('finish', () => {
|
||||||
|
finished = true;
|
||||||
|
});
|
||||||
|
|
||||||
|
for (let i = 0; i < expected.length; i++) {
|
||||||
|
read.push(expected[i]);
|
||||||
|
}
|
||||||
|
read.push(null);
|
||||||
|
|
||||||
|
pipeline(read, write, common.mustCall((err) => {
|
||||||
|
assert.ok(!err, 'no error');
|
||||||
|
assert.ok(finished);
|
||||||
|
assert.deepStrictEqual(processed, expected);
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const read = new Readable({
|
||||||
|
read() {}
|
||||||
|
});
|
||||||
|
|
||||||
|
assert.throws(() => {
|
||||||
|
pipeline(read, () => {});
|
||||||
|
}, /ERR_MISSING_ARGS/);
|
||||||
|
assert.throws(() => {
|
||||||
|
pipeline(() => {});
|
||||||
|
}, /ERR_MISSING_ARGS/);
|
||||||
|
assert.throws(() => {
|
||||||
|
pipeline();
|
||||||
|
}, /ERR_MISSING_ARGS/);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const read = new Readable({
|
||||||
|
read() {}
|
||||||
|
});
|
||||||
|
|
||||||
|
const write = new Writable({
|
||||||
|
write(data, enc, cb) {
|
||||||
|
cb();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
read.push('data');
|
||||||
|
setImmediate(() => read.destroy());
|
||||||
|
|
||||||
|
pipeline(read, write, common.mustCall((err) => {
|
||||||
|
assert.ok(err, 'should have an error');
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const read = new Readable({
|
||||||
|
read() {}
|
||||||
|
});
|
||||||
|
|
||||||
|
const write = new Writable({
|
||||||
|
write(data, enc, cb) {
|
||||||
|
cb();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
read.push('data');
|
||||||
|
setImmediate(() => read.destroy(new Error('kaboom')));
|
||||||
|
|
||||||
|
const dst = pipeline(read, write, common.mustCall((err) => {
|
||||||
|
assert.deepStrictEqual(err, new Error('kaboom'));
|
||||||
|
}));
|
||||||
|
|
||||||
|
assert.strictEqual(dst, write);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const read = new Readable({
|
||||||
|
read() {}
|
||||||
|
});
|
||||||
|
|
||||||
|
const transform = new Transform({
|
||||||
|
transform(data, enc, cb) {
|
||||||
|
cb(new Error('kaboom'));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const write = new Writable({
|
||||||
|
write(data, enc, cb) {
|
||||||
|
cb();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
read.on('close', common.mustCall());
|
||||||
|
transform.on('close', common.mustCall());
|
||||||
|
write.on('close', common.mustCall());
|
||||||
|
|
||||||
|
const dst = pipeline(read, transform, write, common.mustCall((err) => {
|
||||||
|
assert.deepStrictEqual(err, new Error('kaboom'));
|
||||||
|
}));
|
||||||
|
|
||||||
|
assert.strictEqual(dst, write);
|
||||||
|
|
||||||
|
read.push('hello');
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const server = http.createServer((req, res) => {
|
||||||
|
const rs = new Readable({
|
||||||
|
read() {
|
||||||
|
rs.push('hello');
|
||||||
|
rs.push(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
pipeline(rs, res);
|
||||||
|
});
|
||||||
|
|
||||||
|
server.listen(0, () => {
|
||||||
|
const req = http.request({
|
||||||
|
port: server.address().port
|
||||||
|
});
|
||||||
|
|
||||||
|
req.end();
|
||||||
|
req.on('response', (res) => {
|
||||||
|
const buf = [];
|
||||||
|
res.on('data', (data) => buf.push(data));
|
||||||
|
res.on('end', common.mustCall(() => {
|
||||||
|
assert.deepStrictEqual(
|
||||||
|
Buffer.concat(buf),
|
||||||
|
Buffer.from('hello')
|
||||||
|
);
|
||||||
|
server.close();
|
||||||
|
}));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const server = http.createServer((req, res) => {
|
||||||
|
const rs = new Readable({
|
||||||
|
read() {
|
||||||
|
rs.push('hello');
|
||||||
|
},
|
||||||
|
destroy: common.mustCall((err, cb) => {
|
||||||
|
// prevents fd leaks by destroying http pipelines
|
||||||
|
cb();
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
pipeline(rs, res);
|
||||||
|
});
|
||||||
|
|
||||||
|
server.listen(0, () => {
|
||||||
|
const req = http.request({
|
||||||
|
port: server.address().port
|
||||||
|
});
|
||||||
|
|
||||||
|
req.end();
|
||||||
|
req.on('response', (res) => {
|
||||||
|
setImmediate(() => {
|
||||||
|
res.destroy();
|
||||||
|
server.close();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const server = http.createServer((req, res) => {
|
||||||
|
const rs = new Readable({
|
||||||
|
read() {
|
||||||
|
rs.push('hello');
|
||||||
|
},
|
||||||
|
destroy: common.mustCall((err, cb) => {
|
||||||
|
cb();
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
pipeline(rs, res);
|
||||||
|
});
|
||||||
|
|
||||||
|
let cnt = 10;
|
||||||
|
|
||||||
|
const badSink = new Writable({
|
||||||
|
write(data, enc, cb) {
|
||||||
|
cnt--;
|
||||||
|
if (cnt === 0) cb(new Error('kaboom'));
|
||||||
|
else cb();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
server.listen(0, () => {
|
||||||
|
const req = http.request({
|
||||||
|
port: server.address().port
|
||||||
|
});
|
||||||
|
|
||||||
|
req.end();
|
||||||
|
req.on('response', (res) => {
|
||||||
|
pipeline(res, badSink, common.mustCall((err) => {
|
||||||
|
assert.deepStrictEqual(err, new Error('kaboom'));
|
||||||
|
server.close();
|
||||||
|
}));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const server = http.createServer((req, res) => {
|
||||||
|
pipeline(req, res, common.mustCall());
|
||||||
|
});
|
||||||
|
|
||||||
|
server.listen(0, () => {
|
||||||
|
const req = http.request({
|
||||||
|
port: server.address().port
|
||||||
|
});
|
||||||
|
|
||||||
|
const rs = new Readable({
|
||||||
|
read() {
|
||||||
|
rs.push('hello');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
pipeline(rs, req, common.mustCall(() => {
|
||||||
|
server.close();
|
||||||
|
}));
|
||||||
|
|
||||||
|
req.on('response', (res) => {
|
||||||
|
let cnt = 10;
|
||||||
|
res.on('data', () => {
|
||||||
|
cnt--;
|
||||||
|
if (cnt === 0) rs.destroy();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const server = http2.createServer((req, res) => {
|
||||||
|
pipeline(req, res, common.mustCall());
|
||||||
|
});
|
||||||
|
|
||||||
|
server.listen(0, () => {
|
||||||
|
const url = `http://localhost:${server.address().port}`;
|
||||||
|
const client = http2.connect(url);
|
||||||
|
const req = client.request({ ':method': 'POST' });
|
||||||
|
|
||||||
|
const rs = new Readable({
|
||||||
|
read() {
|
||||||
|
rs.push('hello');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
pipeline(rs, req, common.mustCall((err) => {
|
||||||
|
// TODO: this is working around an http2 bug
|
||||||
|
// where the client keeps the event loop going
|
||||||
|
// (replacing the rs.destroy() with req.end()
|
||||||
|
// exits it so seems to be a destroy bug there
|
||||||
|
client.unref();
|
||||||
|
|
||||||
|
server.close();
|
||||||
|
client.close();
|
||||||
|
}));
|
||||||
|
|
||||||
|
let cnt = 10;
|
||||||
|
req.on('data', (data) => {
|
||||||
|
cnt--;
|
||||||
|
if (cnt === 0) rs.destroy();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const makeTransform = () => {
|
||||||
|
const tr = new Transform({
|
||||||
|
transform(data, enc, cb) {
|
||||||
|
cb(null, data);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tr.on('close', common.mustCall());
|
||||||
|
return tr;
|
||||||
|
};
|
||||||
|
|
||||||
|
const rs = new Readable({
|
||||||
|
read() {
|
||||||
|
rs.push('hello');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let cnt = 10;
|
||||||
|
|
||||||
|
const ws = new Writable({
|
||||||
|
write(data, enc, cb) {
|
||||||
|
cnt--;
|
||||||
|
if (cnt === 0) return cb(new Error('kaboom'));
|
||||||
|
cb();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
rs.on('close', common.mustCall());
|
||||||
|
ws.on('close', common.mustCall());
|
||||||
|
|
||||||
|
pipeline(
|
||||||
|
rs,
|
||||||
|
makeTransform(),
|
||||||
|
makeTransform(),
|
||||||
|
makeTransform(),
|
||||||
|
makeTransform(),
|
||||||
|
makeTransform(),
|
||||||
|
makeTransform(),
|
||||||
|
ws,
|
||||||
|
common.mustCall((err) => {
|
||||||
|
assert.deepStrictEqual(err, new Error('kaboom'));
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const oldStream = new Stream();
|
||||||
|
|
||||||
|
oldStream.pause = oldStream.resume = () => {};
|
||||||
|
oldStream.write = (data) => {
|
||||||
|
oldStream.emit('data', data);
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
oldStream.end = () => {
|
||||||
|
oldStream.emit('end');
|
||||||
|
};
|
||||||
|
|
||||||
|
const expected = [
|
||||||
|
Buffer.from('hello'),
|
||||||
|
Buffer.from('world')
|
||||||
|
];
|
||||||
|
|
||||||
|
const rs = new Readable({
|
||||||
|
read() {
|
||||||
|
for (let i = 0; i < expected.length; i++) {
|
||||||
|
rs.push(expected[i]);
|
||||||
|
}
|
||||||
|
rs.push(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const ws = new Writable({
|
||||||
|
write(data, enc, cb) {
|
||||||
|
assert.deepStrictEqual(data, expected.shift());
|
||||||
|
cb();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let finished = false;
|
||||||
|
|
||||||
|
ws.on('finish', () => {
|
||||||
|
finished = true;
|
||||||
|
});
|
||||||
|
|
||||||
|
pipeline(
|
||||||
|
rs,
|
||||||
|
oldStream,
|
||||||
|
ws,
|
||||||
|
common.mustCall((err) => {
|
||||||
|
assert(!err, 'no error');
|
||||||
|
assert(finished, 'last stream finished');
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const oldStream = new Stream();
|
||||||
|
|
||||||
|
oldStream.pause = oldStream.resume = () => {};
|
||||||
|
oldStream.write = (data) => {
|
||||||
|
oldStream.emit('data', data);
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
oldStream.end = () => {
|
||||||
|
oldStream.emit('end');
|
||||||
|
};
|
||||||
|
|
||||||
|
const destroyableOldStream = new Stream();
|
||||||
|
|
||||||
|
destroyableOldStream.pause = destroyableOldStream.resume = () => {};
|
||||||
|
destroyableOldStream.destroy = common.mustCall(() => {
|
||||||
|
destroyableOldStream.emit('close');
|
||||||
|
});
|
||||||
|
destroyableOldStream.write = (data) => {
|
||||||
|
destroyableOldStream.emit('data', data);
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
destroyableOldStream.end = () => {
|
||||||
|
destroyableOldStream.emit('end');
|
||||||
|
};
|
||||||
|
|
||||||
|
const rs = new Readable({
|
||||||
|
read() {
|
||||||
|
rs.destroy(new Error('stop'));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const ws = new Writable({
|
||||||
|
write(data, enc, cb) {
|
||||||
|
cb();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let finished = false;
|
||||||
|
|
||||||
|
ws.on('finish', () => {
|
||||||
|
finished = true;
|
||||||
|
});
|
||||||
|
|
||||||
|
pipeline(
|
||||||
|
rs,
|
||||||
|
oldStream,
|
||||||
|
destroyableOldStream,
|
||||||
|
ws,
|
||||||
|
common.mustCall((err) => {
|
||||||
|
assert.deepStrictEqual(err, new Error('stop'));
|
||||||
|
assert(!finished, 'should not finish');
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
const pipelinePromise = promisify(pipeline);
|
||||||
|
|
||||||
|
async function run() {
|
||||||
|
const read = new Readable({
|
||||||
|
read() {}
|
||||||
|
});
|
||||||
|
|
||||||
|
const write = new Writable({
|
||||||
|
write(data, enc, cb) {
|
||||||
|
cb();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
read.push('data');
|
||||||
|
read.push(null);
|
||||||
|
|
||||||
|
let finished = false;
|
||||||
|
|
||||||
|
write.on('finish', () => {
|
||||||
|
finished = true;
|
||||||
|
});
|
||||||
|
|
||||||
|
await pipelinePromise(read, write);
|
||||||
|
|
||||||
|
assert(finished);
|
||||||
|
}
|
||||||
|
|
||||||
|
run();
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user