stream: finish must always follow error

When _write completes with an Error, 'finish' was emitted before
'error' if the callback was asynchronous. This commit restore the
previous behavior.
The logic is still less then ideal, because we call the write()
callback before emitting error if asynchronous, but after if
synchronous. This commit do not try to change the behavior.
This commit fixes a regression introduced by:
https://github.com/nodejs/node/pull/13195.

Fixes: https://github.com/nodejs/node/issues/13812
PR-URL: https://github.com/nodejs/node/pull/13850
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Calvin Metcalf <calvin.metcalf@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
This commit is contained in:
Matteo Collina 2017-06-21 20:04:12 +02:00
parent c02dcc7b59
commit b4432888f5
3 changed files with 175 additions and 64 deletions

View File

@ -374,18 +374,26 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
function onwriteError(stream, state, sync, er, cb) {
--state.pendingcb;
if (sync)
process.nextTick(afterError, stream, state, cb, er);
else
afterError(stream, state, cb, er);
stream._writableState.errorEmitted = true;
stream.emit('error', er);
}
function afterError(stream, state, cb, err) {
cb(err);
finishMaybe(stream, state);
if (sync) {
// defer the callback if we are being called synchronously
// to avoid piling up things on the stack
process.nextTick(cb, er);
// this can emit finish, and it will always happen
// after error
process.nextTick(finishMaybe, stream, state);
stream._writableState.errorEmitted = true;
stream.emit('error', er);
} else {
// the caller expect this to happen before if
// it is async
cb(er);
stream._writableState.errorEmitted = true;
stream.emit('error', er);
// this can emit finish, but finish must
// always follow error
finishMaybe(stream, state);
}
}
function onwriteStateUpdate(state) {

View File

@ -0,0 +1,156 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const stream = require('stream');
// ensure consistency between the finish event when using cork()
// and writev and when not using them
{
const writable = new stream.Writable();
writable._write = (chunks, encoding, cb) => {
cb(new Error('write test error'));
};
let firstError = false;
writable.on('finish', common.mustCall(function() {
assert.strictEqual(firstError, true);
}));
writable.on('prefinish', common.mustCall());
writable.on('error', common.mustCall((er) => {
assert.strictEqual(er.message, 'write test error');
firstError = true;
}));
writable.end('test');
}
{
const writable = new stream.Writable();
writable._write = (chunks, encoding, cb) => {
setImmediate(cb, new Error('write test error'));
};
let firstError = false;
writable.on('finish', common.mustCall(function() {
assert.strictEqual(firstError, true);
}));
writable.on('prefinish', common.mustCall());
writable.on('error', common.mustCall((er) => {
assert.strictEqual(er.message, 'write test error');
firstError = true;
}));
writable.end('test');
}
{
const writable = new stream.Writable();
writable._write = (chunks, encoding, cb) => {
cb(new Error('write test error'));
};
writable._writev = (chunks, cb) => {
cb(new Error('writev test error'));
};
let firstError = false;
writable.on('finish', common.mustCall(function() {
assert.strictEqual(firstError, true);
}));
writable.on('prefinish', common.mustCall());
writable.on('error', common.mustCall((er) => {
assert.strictEqual(er.message, 'writev test error');
firstError = true;
}));
writable.cork();
writable.write('test');
setImmediate(function() {
writable.end('test');
});
}
{
const writable = new stream.Writable();
writable._write = (chunks, encoding, cb) => {
setImmediate(cb, new Error('write test error'));
};
writable._writev = (chunks, cb) => {
setImmediate(cb, new Error('writev test error'));
};
let firstError = false;
writable.on('finish', common.mustCall(function() {
assert.strictEqual(firstError, true);
}));
writable.on('prefinish', common.mustCall());
writable.on('error', common.mustCall((er) => {
assert.strictEqual(er.message, 'writev test error');
firstError = true;
}));
writable.cork();
writable.write('test');
setImmediate(function() {
writable.end('test');
});
}
// Regression test for
// https://github.com/nodejs/node/issues/13812
{
const rs = new stream.Readable();
rs.push('ok');
rs.push(null);
rs._read = () => {};
const ws = new stream.Writable();
let firstError = false;
ws.on('finish', common.mustCall(function() {
assert.strictEqual(firstError, true);
}));
ws.on('error', common.mustCall(function() {
firstError = true;
}));
ws._write = (chunk, encoding, done) => {
setImmediate(done, new Error());
};
rs.pipe(ws);
}
{
const rs = new stream.Readable();
rs.push('ok');
rs.push(null);
rs._read = () => {};
const ws = new stream.Writable();
ws.on('finish', common.mustNotCall());
ws.on('error', common.mustCall());
ws._write = (chunk, encoding, done) => {
done(new Error());
};
rs.pipe(ws);
}

View File

@ -1,53 +0,0 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const stream = require('stream');
// ensure consistency between the finish event when using cork()
// and writev and when not using them
{
const writable = new stream.Writable();
writable._write = (chunks, encoding, cb) => {
cb(new Error('write test error'));
};
writable.on('finish', common.mustCall());
writable.on('prefinish', common.mustCall());
writable.on('error', common.mustCall((er) => {
assert.strictEqual(er.message, 'write test error');
}));
writable.end('test');
}
{
const writable = new stream.Writable();
writable._write = (chunks, encoding, cb) => {
cb(new Error('write test error'));
};
writable._writev = (chunks, cb) => {
cb(new Error('writev test error'));
};
writable.on('finish', common.mustCall());
writable.on('prefinish', common.mustCall());
writable.on('error', common.mustCall((er) => {
assert.strictEqual(er.message, 'writev test error');
}));
writable.cork();
writable.write('test');
setImmediate(function() {
writable.end('test');
});
}