stream: support Uint8Array input to methods

PR-URL: https://github.com/nodejs/node/pull/11608
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
Anna Henningsen 2017-01-09 19:05:06 +01:00
parent 0ecdf29340
commit 9f610b5e26
No known key found for this signature in database
GPG Key ID: D8B9F5AEAE84E4CF
6 changed files with 205 additions and 23 deletions

View File

@ -47,10 +47,10 @@ There are four fundamental stream types within Node.js:
### Object Mode ### Object Mode
All streams created by Node.js APIs operate exclusively on strings and `Buffer` All streams created by Node.js APIs operate exclusively on strings and `Buffer`
objects. It is possible, however, for stream implementations to work with other (or `Uint8Array`) objects. It is possible, however, for stream implementations
types of JavaScript values (with the exception of `null`, which serves a special to work with other types of JavaScript values (with the exception of `null`,
purpose within streams). Such streams are considered to operate in "object which serves a special purpose within streams). Such streams are considered to
mode". operate in "object mode".
Stream instances are switched into object mode using the `objectMode` option Stream instances are switched into object mode using the `objectMode` option
when the stream is created. Attempting to switch an existing stream into when the stream is created. Attempting to switch an existing stream into
@ -352,12 +352,17 @@ See also: [`writable.uncork()`][].
##### writable.end([chunk][, encoding][, callback]) ##### writable.end([chunk][, encoding][, callback])
<!-- YAML <!-- YAML
added: v0.9.4 added: v0.9.4
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/11608
description: The `chunk` argument can now be a `Uint8Array` instance.
--> -->
* `chunk` {string|Buffer|any} Optional data to write. For streams not operating * `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams
in object mode, `chunk` must be a string or a `Buffer`. For object mode not operating in object mode, `chunk` must be a string, `Buffer` or
streams, `chunk` may be any JavaScript value other than `null`. `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
* `encoding` {string} The encoding, if `chunk` is a String other than `null`.
* `encoding` {string} The encoding, if `chunk` is a string
* `callback` {Function} Optional callback for when the stream is finished * `callback` {Function} Optional callback for when the stream is finished
Calling the `writable.end()` method signals that no more data will be written Calling the `writable.end()` method signals that no more data will be written
@ -434,14 +439,20 @@ See also: [`writable.cork()`][].
<!-- YAML <!-- YAML
added: v0.9.4 added: v0.9.4
changes: changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/11608
description: The `chunk` argument can now be a `Uint8Array` instance.
- version: v6.0.0 - version: v6.0.0
pr-url: https://github.com/nodejs/node/pull/6170 pr-url: https://github.com/nodejs/node/pull/6170
description: Passing `null` as the `chunk` parameter will always be description: Passing `null` as the `chunk` parameter will always be
considered invalid now, even in object mode. considered invalid now, even in object mode.
--> -->
* `chunk` {string|Buffer} The data to write * `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams
* `encoding` {string} The encoding, if `chunk` is a String not operating in object mode, `chunk` must be a string, `Buffer` or
`Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
other than `null`.
* `encoding` {string} The encoding, if `chunk` is a string
* `callback` {Function} Callback for when this chunk of data is flushed * `callback` {Function} Callback for when this chunk of data is flushed
* Returns: {boolean} `false` if the stream wishes for the calling code to * Returns: {boolean} `false` if the stream wishes for the calling code to
wait for the `'drain'` event to be emitted before continuing to write wait for the `'drain'` event to be emitted before continuing to write
@ -985,9 +996,16 @@ setTimeout(() => {
##### readable.unshift(chunk) ##### readable.unshift(chunk)
<!-- YAML <!-- YAML
added: v0.9.11 added: v0.9.11
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/11608
description: The `chunk` argument can now be a `Uint8Array` instance.
--> -->
* `chunk` {Buffer|string|any} Chunk of data to unshift onto the read queue * `chunk` {Buffer|Uint8Array|string|any} Chunk of data to unshift onto the
read queue. For streams not operating in object mode, `chunk` must be a
string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be
any JavaScript value other than `null`.
The `readable.unshift()` method pushes a chunk of data back into the internal The `readable.unshift()` method pushes a chunk of data back into the internal
buffer. This is useful in certain situations where a stream is being consumed by buffer. This is useful in certain situations where a stream is being consumed by
@ -1274,8 +1292,9 @@ constructor and implement the `writable._write()` method. The
Defaults to `true` Defaults to `true`
* `objectMode` {boolean} Whether or not the * `objectMode` {boolean} Whether or not the
[`stream.write(anyObj)`][stream-write] is a valid operation. When set, [`stream.write(anyObj)`][stream-write] is a valid operation. When set,
it becomes possible to write JavaScript values other than string or it becomes possible to write JavaScript values other than string,
`Buffer` if supported by the stream implementation. Defaults to `false` `Buffer` or `Uint8Array` if supported by the stream implementation.
Defaults to `false`
* `write` {Function} Implementation for the * `write` {Function} Implementation for the
[`stream._write()`][stream-_write] method. [`stream._write()`][stream-_write] method.
* `writev` {Function} Implementation for the * `writev` {Function} Implementation for the
@ -1564,16 +1583,26 @@ internal to the class that defines it, and should never be called directly by
user programs. user programs.
#### readable.push(chunk[, encoding]) #### readable.push(chunk[, encoding])
<!-- YAML
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/11608
description: The `chunk` argument can now be a `Uint8Array` instance.
-->
* `chunk` {Buffer|null|string|any} Chunk of data to push into the read queue * `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to push into the
* `encoding` {string} Encoding of String chunks. Must be a valid read queue. For streams not operating in object mode, `chunk` must be a
string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be
any JavaScript value.
* `encoding` {string} Encoding of string chunks. Must be a valid
Buffer encoding, such as `'utf8'` or `'ascii'` Buffer encoding, such as `'utf8'` or `'ascii'`
* Returns {boolean} `true` if additional chunks of data may continued to be * Returns {boolean} `true` if additional chunks of data may continued to be
pushed; `false` otherwise. pushed; `false` otherwise.
When `chunk` is not `null`, the `chunk` of data will be added to the When `chunk` is a `Buffer`, `Uint8Array` or `string`, the `chunk` of data will
internal queue for users of the stream to consume. Passing `chunk` as `null` be added to the internal queue for users of the stream to consume.
signals the end of the stream (EOF), after which no more data can be written. Passing `chunk` as `null` signals the end of the stream (EOF), after which no
more data can be written.
When the Readable is operating in paused mode, the data added with When the Readable is operating in paused mode, the data added with
`readable.push()` can be read out by calling the `readable.push()` can be read out by calling the
@ -2088,8 +2117,8 @@ Readable stream class internals.
Use of `readable.push('')` is not recommended. Use of `readable.push('')` is not recommended.
Pushing a zero-byte string or `Buffer` to a stream that is not in object mode Pushing a zero-byte string, `Buffer` or `Uint8Array` to a stream that is not in
has an interesting side effect. Because it *is* a call to object mode has an interesting side effect. Because it *is* a call to
[`readable.push()`][stream-push], the call will end the reading process. [`readable.push()`][stream-push], the call will end the reading process.
However, because the argument is an empty string, no data is added to the However, because the argument is an empty string, no data is added to the
readable buffer so there is nothing for a user to consume. readable buffer so there is nothing for a user to consume.

View File

@ -212,6 +212,12 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
if (er) { if (er) {
stream.emit('error', er); stream.emit('error', er);
} else if (state.objectMode || chunk && chunk.length > 0) { } else if (state.objectMode || chunk && chunk.length > 0) {
if (typeof chunk !== 'string' &&
Object.getPrototypeOf(chunk) !== Buffer.prototype &&
!state.objectMode) {
chunk = Stream._uint8ArrayToBuffer(chunk);
}
if (addToFront) { if (addToFront) {
if (state.endEmitted) if (state.endEmitted)
stream.emit('error', new Error('stream.unshift() after end event')); stream.emit('error', new Error('stream.unshift() after end event'));
@ -259,7 +265,7 @@ function addChunk(stream, state, chunk, addToFront) {
function chunkInvalid(state, chunk) { function chunkInvalid(state, chunk) {
var er; var er;
if (!(chunk instanceof Buffer) && if (!Stream._isUint8Array(chunk) &&
typeof chunk !== 'string' && typeof chunk !== 'string' &&
chunk !== undefined && chunk !== undefined &&
!state.objectMode) { !state.objectMode) {

View File

@ -248,7 +248,11 @@ function validChunk(stream, state, chunk, cb) {
Writable.prototype.write = function(chunk, encoding, cb) { Writable.prototype.write = function(chunk, encoding, cb) {
var state = this._writableState; var state = this._writableState;
var ret = false; var ret = false;
var isBuf = (chunk instanceof Buffer); var isBuf = Stream._isUint8Array(chunk) && !state.objectMode;
if (isBuf && Object.getPrototypeOf(chunk) !== Buffer.prototype) {
chunk = Stream._uint8ArrayToBuffer(chunk);
}
if (typeof encoding === 'function') { if (typeof encoding === 'function') {
cb = encoding; cb = encoding;

View File

@ -2,6 +2,10 @@
const Buffer = require('buffer').Buffer; const Buffer = require('buffer').Buffer;
function copyBuffer(src, target, offset) {
Buffer.prototype.copy.call(src, target, offset);
}
module.exports = class BufferList { module.exports = class BufferList {
constructor() { constructor() {
this.head = null; this.head = null;
@ -63,7 +67,7 @@ module.exports = class BufferList {
var p = this.head; var p = this.head;
var i = 0; var i = 0;
while (p) { while (p) {
p.data.copy(ret, i); copyBuffer(p.data, ret, i);
i += p.data.length; i += p.data.length;
p = p.next; p = p.next;
} }

View File

@ -21,6 +21,8 @@
'use strict'; 'use strict';
const Buffer = require('buffer').Buffer;
// Note: export Stream before Readable/Writable/Duplex/... // Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues // to avoid a cross-reference(require) issues
const Stream = module.exports = require('internal/streams/legacy'); const Stream = module.exports = require('internal/streams/legacy');
@ -33,3 +35,38 @@ Stream.PassThrough = require('_stream_passthrough');
// Backwards-compat with node 0.4.x // Backwards-compat with node 0.4.x
Stream.Stream = Stream; Stream.Stream = Stream;
// Internal utilities
try {
Stream._isUint8Array = process.binding('util').isUint8Array;
} catch (e) {
// This throws for Node < 4.2.0 because theres no util binding and
// returns undefined for Node < 7.4.0.
}
if (!Stream._isUint8Array) {
Stream._isUint8Array = function _isUint8Array(obj) {
return Object.prototype.toString.call(obj) === '[object Uint8Array]';
};
}
const version = process.version.substr(1).split('.');
if (version[0] === 0 && version[1] < 12) {
Stream._uint8ArrayToBuffer = Buffer;
} else {
try {
const internalBuffer = require('internal/buffer');
Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) {
return new internalBuffer.FastBuffer(chunk.buffer,
chunk.byteOffset,
chunk.byteLength);
};
} catch (e) {
}
if (!Stream._uint8ArrayToBuffer) {
Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) {
return Buffer.prototype.slice.call(chunk);
};
}
}

View File

@ -0,0 +1,102 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const Buffer = require('buffer').Buffer;
const { Readable, Writable } = require('stream');
const ABC = new Uint8Array([0x41, 0x42, 0x43]);
const DEF = new Uint8Array([0x44, 0x45, 0x46]);
const GHI = new Uint8Array([0x47, 0x48, 0x49]);
{
// Simple Writable test.
let n = 0;
const writable = new Writable({
write: common.mustCall((chunk, encoding, cb) => {
assert(chunk instanceof Buffer);
if (n++ === 0) {
assert.strictEqual(String(chunk), 'ABC');
} else {
assert.strictEqual(String(chunk), 'DEF');
}
cb();
}, 2)
});
writable.write(ABC);
writable.end(DEF);
}
{
// Writable test, pass in Uint8Array in object mode.
const writable = new Writable({
objectMode: true,
write: common.mustCall((chunk, encoding, cb) => {
assert(!(chunk instanceof Buffer));
assert(chunk instanceof Uint8Array);
assert.strictEqual(chunk, ABC);
assert.strictEqual(encoding, 'utf8');
cb();
})
});
writable.end(ABC);
}
{
// Writable test, multiple writes carried out via writev.
let callback;
const writable = new Writable({
write: common.mustCall((chunk, encoding, cb) => {
assert(chunk instanceof Buffer);
assert.strictEqual(encoding, 'buffer');
assert.strictEqual(String(chunk), 'ABC');
callback = cb;
}),
writev: common.mustCall((chunks, cb) => {
assert.strictEqual(chunks.length, 2);
assert.strictEqual(chunks[0].encoding, 'buffer');
assert.strictEqual(chunks[1].encoding, 'buffer');
assert.strictEqual(chunks[0].chunk + chunks[1].chunk, 'DEFGHI');
})
});
writable.write(ABC);
writable.write(DEF);
writable.end(GHI);
callback();
}
{
// Simple Readable test.
const readable = new Readable({
read() {}
});
readable.push(DEF);
readable.unshift(ABC);
const buf = readable.read();
assert(buf instanceof Buffer);
assert.deepStrictEqual([...buf], [...ABC, ...DEF]);
}
{
// Readable test, setEncoding.
const readable = new Readable({
read() {}
});
readable.setEncoding('utf8');
readable.push(DEF);
readable.unshift(ABC);
const out = readable.read();
assert.strictEqual(out, 'ABCDEF');
}