stream: convert string to Buffer when calling unshift(<string>)
`readable.unshift` can take a string as an argument, but that string wasn't being converted to a Buffer, which caused a <TypeError: Argument must be a buffer> in some cases. Also if a string was passed, that string was coerced to utf8 encoding. A second optional argument `encoding` was added to `unshift` to fix the encoding issue. Fixes: https://github.com/nodejs/node/issues/27192 PR-URL: https://github.com/nodejs/node/pull/27194 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
This commit is contained in:
parent
e0c4a9c8ed
commit
df339bccf2
@ -2349,7 +2349,7 @@ such as `process.stdout.on('data')`.
|
||||
[`sign.sign()`]: crypto.html#crypto_sign_sign_privatekey_outputencoding
|
||||
[`stream.pipe()`]: stream.html#stream_readable_pipe_destination_options
|
||||
[`stream.push()`]: stream.html#stream_readable_push_chunk_encoding
|
||||
[`stream.unshift()`]: stream.html#stream_readable_unshift_chunk
|
||||
[`stream.unshift()`]: stream.html#stream_readable_unshift_chunk_encoding
|
||||
[`stream.write()`]: stream.html#stream_writable_write_chunk_encoding_callback
|
||||
[`subprocess.kill()`]: child_process.html#child_process_subprocess_kill_signal
|
||||
[`subprocess.send()`]: child_process.html#child_process_subprocess_send_message_sendhandle_options_callback
|
||||
|
@ -1195,7 +1195,7 @@ setTimeout(() => {
|
||||
}, 1000);
|
||||
```
|
||||
|
||||
##### readable.unshift(chunk)
|
||||
##### readable.unshift(chunk[, encoding])
|
||||
<!-- YAML
|
||||
added: v0.9.11
|
||||
changes:
|
||||
@ -1208,6 +1208,8 @@ changes:
|
||||
read queue. For streams not operating in object mode, `chunk` must be a
|
||||
string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be
|
||||
any JavaScript value other than `null`.
|
||||
* `encoding` {string} Encoding of string chunks. Must be a valid
|
||||
`Buffer` encoding, such as `'utf8'` or `'ascii'`.
|
||||
|
||||
The `readable.unshift()` method pushes a chunk of data back into the internal
|
||||
buffer. This is useful in certain situations where a stream is being consumed by
|
||||
|
@ -207,13 +207,28 @@ Readable.prototype._destroy = function(err, cb) {
|
||||
// similar to how Writable.write() returns true if you should
|
||||
// write() some more.
|
||||
Readable.prototype.push = function(chunk, encoding) {
|
||||
const state = this._readableState;
|
||||
var skipChunkCheck;
|
||||
return readableAddChunk(this, chunk, encoding, false);
|
||||
};
|
||||
|
||||
// Unshift should *always* be something directly out of read()
|
||||
Readable.prototype.unshift = function(chunk, encoding) {
|
||||
return readableAddChunk(this, chunk, encoding, true);
|
||||
};
|
||||
|
||||
function readableAddChunk(stream, chunk, encoding, addToFront) {
|
||||
debug('readableAddChunk', chunk);
|
||||
const state = stream._readableState;
|
||||
|
||||
let skipChunkCheck;
|
||||
|
||||
if (!state.objectMode) {
|
||||
if (typeof chunk === 'string') {
|
||||
encoding = encoding || state.defaultEncoding;
|
||||
if (encoding !== state.encoding) {
|
||||
if (addToFront && state.encoding && state.encoding !== encoding) {
|
||||
// When unshifting, if state.encoding is set, we have to save
|
||||
// the string in the BufferList with the state encoding
|
||||
chunk = Buffer.from(chunk, encoding).toString(state.encoding);
|
||||
} else if (encoding !== state.encoding) {
|
||||
chunk = Buffer.from(chunk, encoding);
|
||||
encoding = '';
|
||||
}
|
||||
@ -223,17 +238,6 @@ Readable.prototype.push = function(chunk, encoding) {
|
||||
skipChunkCheck = true;
|
||||
}
|
||||
|
||||
return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
|
||||
};
|
||||
|
||||
// Unshift should *always* be something directly out of read()
|
||||
Readable.prototype.unshift = function(chunk) {
|
||||
return readableAddChunk(this, chunk, null, true, false);
|
||||
};
|
||||
|
||||
function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
|
||||
debug('readableAddChunk', chunk);
|
||||
const state = stream._readableState;
|
||||
if (chunk === null) {
|
||||
state.reading = false;
|
||||
onEofChunk(stream, state);
|
||||
|
187
test/parallel/test-stream-readable-unshift.js
Normal file
187
test/parallel/test-stream-readable-unshift.js
Normal file
@ -0,0 +1,187 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
const { Readable } = require('stream');
|
||||
|
||||
{
|
||||
// Check that strings are saved as Buffer
|
||||
const readable = new Readable({ read() {} });
|
||||
|
||||
const string = 'abc';
|
||||
|
||||
readable.on('data', common.mustCall((chunk) => {
|
||||
assert(Buffer.isBuffer(chunk));
|
||||
assert.strictEqual(chunk.toString('utf8'), string);
|
||||
}, 1));
|
||||
|
||||
readable.unshift(string);
|
||||
|
||||
}
|
||||
|
||||
{
|
||||
// Check that data goes at the beginning
|
||||
const readable = new Readable({ read() {} });
|
||||
const unshift = 'front';
|
||||
const push = 'back';
|
||||
|
||||
const expected = [unshift, push];
|
||||
readable.on('data', common.mustCall((chunk) => {
|
||||
assert.strictEqual(chunk.toString('utf8'), expected.shift());
|
||||
}, 2));
|
||||
|
||||
|
||||
readable.push(push);
|
||||
readable.unshift(unshift);
|
||||
}
|
||||
|
||||
{
|
||||
// Check that buffer is saved with correct encoding
|
||||
const readable = new Readable({ read() {} });
|
||||
|
||||
const encoding = 'base64';
|
||||
const string = Buffer.from('abc').toString(encoding);
|
||||
|
||||
readable.on('data', common.mustCall((chunk) => {
|
||||
assert.strictEqual(chunk.toString(encoding), string);
|
||||
}, 1));
|
||||
|
||||
readable.unshift(string, encoding);
|
||||
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
const streamEncoding = 'base64';
|
||||
|
||||
function checkEncoding(readable) {
|
||||
|
||||
// chunk encodings
|
||||
const encodings = ['utf8', 'binary', 'hex', 'base64'];
|
||||
const expected = [];
|
||||
|
||||
readable.on('data', common.mustCall((chunk) => {
|
||||
const { encoding, string } = expected.pop();
|
||||
assert.strictEqual(chunk.toString(encoding), string);
|
||||
}, encodings.length));
|
||||
|
||||
for (const encoding of encodings) {
|
||||
const string = 'abc';
|
||||
|
||||
// If encoding is the same as the state.encoding the string is
|
||||
// saved as is
|
||||
const expect = encoding !== streamEncoding ?
|
||||
Buffer.from(string, encoding).toString(streamEncoding) : string;
|
||||
|
||||
expected.push({ encoding, string: expect });
|
||||
|
||||
readable.unshift(string, encoding);
|
||||
}
|
||||
}
|
||||
|
||||
const r1 = new Readable({ read() {} });
|
||||
r1.setEncoding(streamEncoding);
|
||||
checkEncoding(r1);
|
||||
|
||||
const r2 = new Readable({ read() {}, encoding: streamEncoding });
|
||||
checkEncoding(r2);
|
||||
|
||||
}
|
||||
|
||||
{
|
||||
// Both .push & .unshift should have the same behaviour
|
||||
// When setting an encoding, each chunk should be emitted with that encoding
|
||||
const encoding = 'base64';
|
||||
|
||||
function checkEncoding(readable) {
|
||||
const string = 'abc';
|
||||
readable.on('data', common.mustCall((chunk) => {
|
||||
assert.strictEqual(chunk, Buffer.from(string).toString(encoding));
|
||||
}, 2));
|
||||
|
||||
readable.push(string);
|
||||
readable.unshift(string);
|
||||
}
|
||||
|
||||
const r1 = new Readable({ read() {} });
|
||||
r1.setEncoding(encoding);
|
||||
checkEncoding(r1);
|
||||
|
||||
const r2 = new Readable({ read() {}, encoding });
|
||||
checkEncoding(r2);
|
||||
|
||||
}
|
||||
|
||||
{
|
||||
// Check that error is thrown for invalid chunks
|
||||
|
||||
const readable = new Readable({ read() {} });
|
||||
function checkError(fn) {
|
||||
common.expectsError(fn, {
|
||||
code: 'ERR_INVALID_ARG_TYPE',
|
||||
type: TypeError
|
||||
});
|
||||
}
|
||||
|
||||
checkError(() => readable.unshift([]));
|
||||
checkError(() => readable.unshift({}));
|
||||
checkError(() => readable.unshift(0));
|
||||
|
||||
}
|
||||
|
||||
{
|
||||
// Check that ObjectMode works
|
||||
const readable = new Readable({ objectMode: true, read() {} });
|
||||
|
||||
const chunks = ['a', 1, {}, []];
|
||||
|
||||
readable.on('data', common.mustCall((chunk) => {
|
||||
assert.strictEqual(chunk, chunks.pop());
|
||||
}, chunks.length));
|
||||
|
||||
for (const chunk of chunks) {
|
||||
readable.unshift(chunk);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
// Should not throw: https://github.com/nodejs/node/issues/27192
|
||||
const highWaterMark = 50;
|
||||
class ArrayReader extends Readable {
|
||||
constructor(opt) {
|
||||
super({ highWaterMark });
|
||||
// The error happened only when pushing above hwm
|
||||
this.buffer = new Array(highWaterMark * 2).fill(0).map(String);
|
||||
}
|
||||
_read(size) {
|
||||
while (this.buffer.length) {
|
||||
const chunk = this.buffer.shift();
|
||||
if (!this.buffer.length) {
|
||||
this.push(chunk);
|
||||
this.push(null);
|
||||
return true;
|
||||
}
|
||||
if (!this.push(chunk))
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function onRead() {
|
||||
while (null !== (stream.read())) {
|
||||
// Remove the 'readable' listener before unshifting
|
||||
stream.removeListener('readable', onRead);
|
||||
stream.unshift('a');
|
||||
stream.on('data', (chunk) => {
|
||||
console.log(chunk.length);
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
const stream = new ArrayReader();
|
||||
stream.once('readable', common.mustCall(onRead));
|
||||
stream.on('end', common.mustCall(() => {}));
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user