stream: invoke callback before emitting error always

Ensure the callback is always invoked before emitting
the error in both sync and async case.

PR-URL: https://github.com/nodejs/node/pull/29293
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Robert Nagy 2019-08-24 16:33:46 +02:00 committed by Rich Trott
parent c48467408d
commit 3de5eae6db
8 changed files with 122 additions and 28 deletions

View File

@ -571,7 +571,8 @@ The `writable.write()` method writes some data to the stream, and calls the
supplied `callback` once the data has been fully handled. If an error supplied `callback` once the data has been fully handled. If an error
occurs, the `callback` *may or may not* be called with the error as its occurs, the `callback` *may or may not* be called with the error as its
first argument. To reliably detect write errors, add a listener for the first argument. To reliably detect write errors, add a listener for the
`'error'` event. `'error'` event. If `callback` is called with an error, it will be called
before the `'error'` event is emitted.
The return value is `true` if the internal buffer is less than the The return value is `true` if the internal buffer is less than the
`highWaterMark` configured when the stream was created after admitting `chunk`. `highWaterMark` configured when the stream was created after admitting `chunk`.

View File

@ -158,6 +158,11 @@ function WritableState(options, stream, isDuplex) {
// Should .destroy() be called after 'finish' (and potentially 'end') // Should .destroy() be called after 'finish' (and potentially 'end')
this.autoDestroy = !!(options && options.autoDestroy); this.autoDestroy = !!(options && options.autoDestroy);
// Indicates whether the stream has errored. When true all write() calls
// should return false. This is needed since when autoDestroy
// is disabled we need a way to tell whether the stream has failed.
this.errored = false;
// Count buffered requests // Count buffered requests
this.bufferedRequestCount = 0; this.bufferedRequestCount = 0;
@ -401,7 +406,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
if (!ret) if (!ret)
state.needDrain = true; state.needDrain = true;
if (state.writing || state.corked) { if (state.writing || state.corked || state.errored) {
var last = state.lastBufferedRequest; var last = state.lastBufferedRequest;
state.lastBufferedRequest = { state.lastBufferedRequest = {
chunk, chunk,
@ -420,7 +425,9 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
doWrite(stream, state, false, len, chunk, encoding, cb); doWrite(stream, state, false, len, chunk, encoding, cb);
} }
return ret; // Return false if errored or destroyed in order to break
// any synchronous while(stream.write(data)) loops.
return ret && !state.errored && !state.destroyed;
} }
function doWrite(stream, state, writev, len, chunk, encoding, cb) { function doWrite(stream, state, writev, len, chunk, encoding, cb) {
@ -437,18 +444,11 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.sync = false; state.sync = false;
} }
function onwriteError(stream, state, sync, er, cb) { function onwriteError(stream, state, er, cb) {
--state.pendingcb; --state.pendingcb;
if (sync) { cb(er);
// Defer the callback if we are being called synchronously // This can emit error, but error must always follow cb.
// to avoid piling up things on the stack
process.nextTick(cb, er);
} else {
// The caller expect this to happen before if
// it is async
cb(er);
}
errorOrDestroy(stream, er); errorOrDestroy(stream, er);
} }
@ -465,9 +465,14 @@ function onwrite(stream, er) {
state.length -= state.writelen; state.length -= state.writelen;
state.writelen = 0; state.writelen = 0;
if (er) if (er) {
onwriteError(stream, state, sync, er, cb); state.errored = true;
else { if (sync) {
process.nextTick(onwriteError, stream, state, er, cb);
} else {
onwriteError(stream, state, er, cb);
}
} else {
// Check if we're actually ready to finish, but don't emit yet // Check if we're actually ready to finish, but don't emit yet
var finished = needFinish(state) || stream.destroyed; var finished = needFinish(state) || stream.destroyed;
@ -622,7 +627,7 @@ Object.defineProperty(Writable.prototype, 'writableLength', {
function needFinish(state) { function needFinish(state) {
return (state.ending && return (state.ending &&
state.length === 0 && state.length === 0 &&
!state.errorEmitted && !state.errored &&
state.bufferedRequest === null && state.bufferedRequest === null &&
!state.finished && !state.finished &&
!state.writing); !state.writing);

View File

@ -27,6 +27,10 @@ function destroy(err, cb) {
const r = this._readableState; const r = this._readableState;
const w = this._writableState; const w = this._writableState;
if (w && err) {
w.errored = true;
}
if ((w && w.destroyed) || (r && r.destroyed)) { if ((w && w.destroyed) || (r && r.destroyed)) {
if (cb) { if (cb) {
cb(err); cb(err);
@ -50,10 +54,12 @@ function destroy(err, cb) {
this._destroy(err || null, (err) => { this._destroy(err || null, (err) => {
const emitClose = (w && w.emitClose) || (r && r.emitClose); const emitClose = (w && w.emitClose) || (r && r.emitClose);
if (cb) { if (cb) {
// Invoke callback before scheduling emitClose so that callback
// can schedule before.
cb(err);
if (emitClose) { if (emitClose) {
process.nextTick(emitCloseNT, this); process.nextTick(emitCloseNT, this);
} }
cb(err);
} else if (needError(this, err)) { } else if (needError(this, err)) {
process.nextTick(emitClose ? emitErrorCloseNT : emitErrorNT, this, err); process.nextTick(emitClose ? emitErrorCloseNT : emitErrorNT, this, err);
} else if (emitClose) { } else if (emitClose) {
@ -91,6 +97,7 @@ function undestroy() {
if (w) { if (w) {
w.destroyed = false; w.destroyed = false;
w.errored = false;
w.ended = false; w.ended = false;
w.ending = false; w.ending = false;
w.finalCalled = false; w.finalCalled = false;
@ -110,6 +117,10 @@ function errorOrDestroy(stream, err) {
const r = stream._readableState; const r = stream._readableState;
const w = stream._writableState; const w = stream._writableState;
if (w & err) {
w.errored = true;
}
if ((r && r.autoDestroy) || (w && w.autoDestroy)) if ((r && r.autoDestroy) || (w && w.autoDestroy))
stream.destroy(err); stream.destroy(err);
else if (needError(stream, err)) else if (needError(stream, err))

View File

@ -67,7 +67,10 @@ const worker = new Worker(__filename).on('message', common.mustCall((port) => {
h2header.writeIntBE(1, 0, 3); // Length: 1 h2header.writeIntBE(1, 0, 3); // Length: 1
h2header.writeIntBE(i, 5, 4); // Stream ID h2header.writeIntBE(i, 5, 4); // Stream ID
// 0x88 = :status: 200 // 0x88 = :status: 200
conn.write(Buffer.concat([h2header, Buffer.from([0x88])])); if (!conn.write(Buffer.concat([h2header, Buffer.from([0x88])]))) {
process.nextTick(writeRequests);
break;
}
} }
} }

View File

@ -16,6 +16,20 @@ const assert = require('assert');
assert.strictEqual(write.destroyed, true); assert.strictEqual(write.destroyed, true);
} }
{
const write = new Writable({
write(chunk, enc, cb) {
this.destroy(new Error('asd'));
cb();
}
});
write.on('error', common.mustCall());
write.on('finish', common.mustNotCall());
write.end('asd');
assert.strictEqual(write.destroyed, true);
}
{ {
const write = new Writable({ const write = new Writable({
write(chunk, enc, cb) { cb(); } write(chunk, enc, cb) { cb(); }

View File

@ -0,0 +1,58 @@
'use strict';
const common = require('../common');
const { Writable } = require('stream');
const assert = require('assert');
// Ensure callback is always invoked before
// error is emitted. Regardless if error was
// sync or async.
{
let callbackCalled = false;
// Sync Error
const writable = new Writable({
write: common.mustCall((buf, enc, cb) => {
cb(new Error());
})
});
writable.on('error', common.mustCall(() => {
assert.strictEqual(callbackCalled, true);
}));
writable.write('hi', common.mustCall(() => {
callbackCalled = true;
}));
}
{
let callbackCalled = false;
// Async Error
const writable = new Writable({
write: common.mustCall((buf, enc, cb) => {
process.nextTick(cb, new Error());
})
});
writable.on('error', common.mustCall(() => {
assert.strictEqual(callbackCalled, true);
}));
writable.write('hi', common.mustCall(() => {
callbackCalled = true;
}));
}
{
// Sync Error
const writable = new Writable({
write: common.mustCall((buf, enc, cb) => {
cb(new Error());
})
});
writable.on('error', common.mustCall());
let cnt = 0;
// Ensure we don't live lock on sync error
while (writable.write('a'))
cnt++;
assert.strictEqual(cnt, 0);
}

View File

@ -16,4 +16,8 @@ const socket = new JSStreamWrap(new Duplex({
}) })
})); }));
assert.throws(() => socket.end('foo'), /Error: write EPROTO/); socket.end('foo');
socket.on('error', common.expectsError({
type: Error,
message: 'write EPROTO'
}));

View File

@ -26,12 +26,10 @@ const zlib = require('zlib');
zlib.gzip('hello', common.mustCall(function(err, out) { zlib.gzip('hello', common.mustCall(function(err, out) {
const unzip = zlib.createGunzip(); const unzip = zlib.createGunzip();
unzip.close(common.mustCall()); unzip.close(common.mustCall());
common.expectsError(
() => unzip.write(out), unzip.write(out);
{ unzip.on('error', common.expectsError({
code: 'ERR_STREAM_DESTROYED', code: 'ERR_STREAM_DESTROYED',
type: Error, type: Error
message: 'Cannot call write after a stream was destroyed' }));
}
);
})); }));