Revert "stream: invoke callback before emitting error always"
This reverts commit 3de5eae6dbe503485b95bdeb8bddbd67e4613d59. PR-URL: https://github.com/nodejs/node/pull/29741 Reviewed-By: Rich Trott <rtrott@gmail.com> Reviewed-By: Jiawen Geng <technicalcute@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
This commit is contained in:
parent
35bfe0e414
commit
95792a7989
@ -571,8 +571,7 @@ The `writable.write()` method writes some data to the stream, and calls the
|
|||||||
supplied `callback` once the data has been fully handled. If an error
|
supplied `callback` once the data has been fully handled. If an error
|
||||||
occurs, the `callback` *may or may not* be called with the error as its
|
occurs, the `callback` *may or may not* be called with the error as its
|
||||||
first argument. To reliably detect write errors, add a listener for the
|
first argument. To reliably detect write errors, add a listener for the
|
||||||
`'error'` event. If `callback` is called with an error, it will be called
|
`'error'` event.
|
||||||
before the `'error'` event is emitted.
|
|
||||||
|
|
||||||
The return value is `true` if the internal buffer is less than the
|
The return value is `true` if the internal buffer is less than the
|
||||||
`highWaterMark` configured when the stream was created after admitting `chunk`.
|
`highWaterMark` configured when the stream was created after admitting `chunk`.
|
||||||
|
@ -158,11 +158,6 @@ function WritableState(options, stream, isDuplex) {
|
|||||||
// Should .destroy() be called after 'finish' (and potentially 'end')
|
// Should .destroy() be called after 'finish' (and potentially 'end')
|
||||||
this.autoDestroy = !!(options && options.autoDestroy);
|
this.autoDestroy = !!(options && options.autoDestroy);
|
||||||
|
|
||||||
// Indicates whether the stream has errored. When true all write() calls
|
|
||||||
// should return false. This is needed since when autoDestroy
|
|
||||||
// is disabled we need a way to tell whether the stream has failed.
|
|
||||||
this.errored = false;
|
|
||||||
|
|
||||||
// Count buffered requests
|
// Count buffered requests
|
||||||
this.bufferedRequestCount = 0;
|
this.bufferedRequestCount = 0;
|
||||||
|
|
||||||
@ -406,7 +401,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
|
|||||||
if (!ret)
|
if (!ret)
|
||||||
state.needDrain = true;
|
state.needDrain = true;
|
||||||
|
|
||||||
if (state.writing || state.corked || state.errored) {
|
if (state.writing || state.corked) {
|
||||||
var last = state.lastBufferedRequest;
|
var last = state.lastBufferedRequest;
|
||||||
state.lastBufferedRequest = {
|
state.lastBufferedRequest = {
|
||||||
chunk,
|
chunk,
|
||||||
@ -425,9 +420,7 @@ function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
|
|||||||
doWrite(stream, state, false, len, chunk, encoding, cb);
|
doWrite(stream, state, false, len, chunk, encoding, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return false if errored or destroyed in order to break
|
return ret;
|
||||||
// any synchronous while(stream.write(data)) loops.
|
|
||||||
return ret && !state.errored && !state.destroyed;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
|
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
|
||||||
@ -444,11 +437,18 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
|
|||||||
state.sync = false;
|
state.sync = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
function onwriteError(stream, state, er, cb) {
|
function onwriteError(stream, state, sync, er, cb) {
|
||||||
--state.pendingcb;
|
--state.pendingcb;
|
||||||
|
|
||||||
|
if (sync) {
|
||||||
|
// Defer the callback if we are being called synchronously
|
||||||
|
// to avoid piling up things on the stack
|
||||||
|
process.nextTick(cb, er);
|
||||||
|
} else {
|
||||||
|
// The caller expect this to happen before if
|
||||||
|
// it is async
|
||||||
cb(er);
|
cb(er);
|
||||||
// This can emit error, but error must always follow cb.
|
}
|
||||||
errorOrDestroy(stream, er);
|
errorOrDestroy(stream, er);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -465,14 +465,9 @@ function onwrite(stream, er) {
|
|||||||
state.length -= state.writelen;
|
state.length -= state.writelen;
|
||||||
state.writelen = 0;
|
state.writelen = 0;
|
||||||
|
|
||||||
if (er) {
|
if (er)
|
||||||
state.errored = true;
|
onwriteError(stream, state, sync, er, cb);
|
||||||
if (sync) {
|
else {
|
||||||
process.nextTick(onwriteError, stream, state, er, cb);
|
|
||||||
} else {
|
|
||||||
onwriteError(stream, state, er, cb);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Check if we're actually ready to finish, but don't emit yet
|
// Check if we're actually ready to finish, but don't emit yet
|
||||||
var finished = needFinish(state) || stream.destroyed;
|
var finished = needFinish(state) || stream.destroyed;
|
||||||
|
|
||||||
@ -627,7 +622,7 @@ Object.defineProperty(Writable.prototype, 'writableLength', {
|
|||||||
function needFinish(state) {
|
function needFinish(state) {
|
||||||
return (state.ending &&
|
return (state.ending &&
|
||||||
state.length === 0 &&
|
state.length === 0 &&
|
||||||
!state.errored &&
|
!state.errorEmitted &&
|
||||||
state.bufferedRequest === null &&
|
state.bufferedRequest === null &&
|
||||||
!state.finished &&
|
!state.finished &&
|
||||||
!state.writing);
|
!state.writing);
|
||||||
|
@ -27,10 +27,6 @@ function destroy(err, cb) {
|
|||||||
const r = this._readableState;
|
const r = this._readableState;
|
||||||
const w = this._writableState;
|
const w = this._writableState;
|
||||||
|
|
||||||
if (w && err) {
|
|
||||||
w.errored = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((w && w.destroyed) || (r && r.destroyed)) {
|
if ((w && w.destroyed) || (r && r.destroyed)) {
|
||||||
if (cb) {
|
if (cb) {
|
||||||
cb(err);
|
cb(err);
|
||||||
@ -54,12 +50,10 @@ function destroy(err, cb) {
|
|||||||
this._destroy(err || null, (err) => {
|
this._destroy(err || null, (err) => {
|
||||||
const emitClose = (w && w.emitClose) || (r && r.emitClose);
|
const emitClose = (w && w.emitClose) || (r && r.emitClose);
|
||||||
if (cb) {
|
if (cb) {
|
||||||
// Invoke callback before scheduling emitClose so that callback
|
|
||||||
// can schedule before.
|
|
||||||
cb(err);
|
|
||||||
if (emitClose) {
|
if (emitClose) {
|
||||||
process.nextTick(emitCloseNT, this);
|
process.nextTick(emitCloseNT, this);
|
||||||
}
|
}
|
||||||
|
cb(err);
|
||||||
} else if (needError(this, err)) {
|
} else if (needError(this, err)) {
|
||||||
process.nextTick(emitClose ? emitErrorCloseNT : emitErrorNT, this, err);
|
process.nextTick(emitClose ? emitErrorCloseNT : emitErrorNT, this, err);
|
||||||
} else if (emitClose) {
|
} else if (emitClose) {
|
||||||
@ -97,7 +91,6 @@ function undestroy() {
|
|||||||
|
|
||||||
if (w) {
|
if (w) {
|
||||||
w.destroyed = false;
|
w.destroyed = false;
|
||||||
w.errored = false;
|
|
||||||
w.ended = false;
|
w.ended = false;
|
||||||
w.ending = false;
|
w.ending = false;
|
||||||
w.finalCalled = false;
|
w.finalCalled = false;
|
||||||
@ -117,10 +110,6 @@ function errorOrDestroy(stream, err) {
|
|||||||
const r = stream._readableState;
|
const r = stream._readableState;
|
||||||
const w = stream._writableState;
|
const w = stream._writableState;
|
||||||
|
|
||||||
if (w & err) {
|
|
||||||
w.errored = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((r && r.autoDestroy) || (w && w.autoDestroy))
|
if ((r && r.autoDestroy) || (w && w.autoDestroy))
|
||||||
stream.destroy(err);
|
stream.destroy(err);
|
||||||
else if (needError(stream, err))
|
else if (needError(stream, err))
|
||||||
|
@ -67,10 +67,7 @@ const worker = new Worker(__filename).on('message', common.mustCall((port) => {
|
|||||||
h2header.writeIntBE(1, 0, 3); // Length: 1
|
h2header.writeIntBE(1, 0, 3); // Length: 1
|
||||||
h2header.writeIntBE(i, 5, 4); // Stream ID
|
h2header.writeIntBE(i, 5, 4); // Stream ID
|
||||||
// 0x88 = :status: 200
|
// 0x88 = :status: 200
|
||||||
if (!conn.write(Buffer.concat([h2header, Buffer.from([0x88])]))) {
|
conn.write(Buffer.concat([h2header, Buffer.from([0x88])]));
|
||||||
process.nextTick(writeRequests);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,20 +16,6 @@ const assert = require('assert');
|
|||||||
assert.strictEqual(write.destroyed, true);
|
assert.strictEqual(write.destroyed, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
|
||||||
const write = new Writable({
|
|
||||||
write(chunk, enc, cb) {
|
|
||||||
this.destroy(new Error('asd'));
|
|
||||||
cb();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
write.on('error', common.mustCall());
|
|
||||||
write.on('finish', common.mustNotCall());
|
|
||||||
write.end('asd');
|
|
||||||
assert.strictEqual(write.destroyed, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
{
|
||||||
const write = new Writable({
|
const write = new Writable({
|
||||||
write(chunk, enc, cb) { cb(); }
|
write(chunk, enc, cb) { cb(); }
|
||||||
|
@ -1,58 +0,0 @@
|
|||||||
'use strict';
|
|
||||||
const common = require('../common');
|
|
||||||
const { Writable } = require('stream');
|
|
||||||
const assert = require('assert');
|
|
||||||
|
|
||||||
// Ensure callback is always invoked before
|
|
||||||
// error is emitted. Regardless if error was
|
|
||||||
// sync or async.
|
|
||||||
|
|
||||||
{
|
|
||||||
let callbackCalled = false;
|
|
||||||
// Sync Error
|
|
||||||
const writable = new Writable({
|
|
||||||
write: common.mustCall((buf, enc, cb) => {
|
|
||||||
cb(new Error());
|
|
||||||
})
|
|
||||||
});
|
|
||||||
writable.on('error', common.mustCall(() => {
|
|
||||||
assert.strictEqual(callbackCalled, true);
|
|
||||||
}));
|
|
||||||
writable.write('hi', common.mustCall(() => {
|
|
||||||
callbackCalled = true;
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
let callbackCalled = false;
|
|
||||||
// Async Error
|
|
||||||
const writable = new Writable({
|
|
||||||
write: common.mustCall((buf, enc, cb) => {
|
|
||||||
process.nextTick(cb, new Error());
|
|
||||||
})
|
|
||||||
});
|
|
||||||
writable.on('error', common.mustCall(() => {
|
|
||||||
assert.strictEqual(callbackCalled, true);
|
|
||||||
}));
|
|
||||||
writable.write('hi', common.mustCall(() => {
|
|
||||||
callbackCalled = true;
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
// Sync Error
|
|
||||||
const writable = new Writable({
|
|
||||||
write: common.mustCall((buf, enc, cb) => {
|
|
||||||
cb(new Error());
|
|
||||||
})
|
|
||||||
});
|
|
||||||
|
|
||||||
writable.on('error', common.mustCall());
|
|
||||||
|
|
||||||
let cnt = 0;
|
|
||||||
// Ensure we don't live lock on sync error
|
|
||||||
while (writable.write('a'))
|
|
||||||
cnt++;
|
|
||||||
|
|
||||||
assert.strictEqual(cnt, 0);
|
|
||||||
}
|
|
@ -16,8 +16,4 @@ const socket = new JSStreamWrap(new Duplex({
|
|||||||
})
|
})
|
||||||
}));
|
}));
|
||||||
|
|
||||||
socket.end('foo');
|
assert.throws(() => socket.end('foo'), /Error: write EPROTO/);
|
||||||
socket.on('error', common.expectsError({
|
|
||||||
type: Error,
|
|
||||||
message: 'write EPROTO'
|
|
||||||
}));
|
|
||||||
|
@ -26,10 +26,12 @@ const zlib = require('zlib');
|
|||||||
zlib.gzip('hello', common.mustCall(function(err, out) {
|
zlib.gzip('hello', common.mustCall(function(err, out) {
|
||||||
const unzip = zlib.createGunzip();
|
const unzip = zlib.createGunzip();
|
||||||
unzip.close(common.mustCall());
|
unzip.close(common.mustCall());
|
||||||
|
common.expectsError(
|
||||||
unzip.write(out);
|
() => unzip.write(out),
|
||||||
unzip.on('error', common.expectsError({
|
{
|
||||||
code: 'ERR_STREAM_DESTROYED',
|
code: 'ERR_STREAM_DESTROYED',
|
||||||
type: Error
|
type: Error,
|
||||||
}));
|
message: 'Cannot call write after a stream was destroyed'
|
||||||
|
}
|
||||||
|
);
|
||||||
}));
|
}));
|
||||||
|
Loading…
x
Reference in New Issue
Block a user