stream: add final method

Adds the ability to for write streams to have an _final method which acts
similarly to the _flush method that transform streams have but is called before
the finish event is emitted and if asynchronous delays the stream from
finishing.  The `final` option may also be passed in order to set it.

PR-URL: https://github.com/nodejs/node/pull/12828
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Refael Ackermann <refack@gmail.com>
This commit is contained in:
Calvin Metcalf 2017-05-04 15:33:14 +02:00
parent e912c67d24
commit ba513d140c
No known key found for this signature in database
GPG Key ID: F617F2120633E5F2
8 changed files with 317 additions and 32 deletions

View File

@ -1198,7 +1198,8 @@ on the type of stream being created, as detailed in the chart below:
<p>[Writable](#stream_class_stream_writable)</p> <p>[Writable](#stream_class_stream_writable)</p>
</td> </td>
<td> <td>
<p><code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code></p> <p><code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code>,
<code>[_final][stream-_final]</code></p>
</td> </td>
</tr> </tr>
<tr> <tr>
@ -1209,7 +1210,8 @@ on the type of stream being created, as detailed in the chart below:
<p>[Duplex](#stream_class_stream_duplex)</p> <p>[Duplex](#stream_class_stream_duplex)</p>
</td> </td>
<td> <td>
<p><code>[_read][stream-_read]</code>, <code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code></p> <p><code>[_read][stream-_read]</code>, <code>[_write][stream-_write]</code>, <code>[_writev][stream-_writev]</code>,
<code>[_final][stream-_final]</code></p>
</td> </td>
</tr> </tr>
<tr> <tr>
@ -1220,7 +1222,8 @@ on the type of stream being created, as detailed in the chart below:
<p>[Transform](#stream_class_stream_transform)</p> <p>[Transform](#stream_class_stream_transform)</p>
</td> </td>
<td> <td>
<p><code>[_transform][stream-_transform]</code>, <code>[_flush][stream-_flush]</code></p> <p><code>[_transform][stream-_transform]</code>, <code>[_flush][stream-_flush]</code>,
<code>[_final][stream-_final]</code></p>
</td> </td>
</tr> </tr>
</table> </table>
@ -1279,6 +1282,8 @@ constructor and implement the `writable._write()` method. The
[`stream._writev()`][stream-_writev] method. [`stream._writev()`][stream-_writev] method.
* `destroy` {Function} Implementation for the * `destroy` {Function} Implementation for the
[`stream._destroy()`][writable-_destroy] method. [`stream._destroy()`][writable-_destroy] method.
* `final` {Function} Implementation for the
[`stream._final()`][stream-_final] method.
For example: For example:
@ -1398,6 +1403,22 @@ added: REPLACEME
* `callback` {Function} A callback function that takes an optional error argument * `callback` {Function} A callback function that takes an optional error argument
which is invoked when the writable is destroyed. which is invoked when the writable is destroyed.
#### writable.\_final(callback)
<!-- YAML
added: REPLACEME
-->
* `callback` {Function} Call this function (optionally with an error
argument) when you are done writing any remaining data.
Note: `_final()` **must not** be called directly. It MAY be implemented
by child classes, and if so, will be called by the internal Writable
class methods only.
This optional function will be called before the stream closes, delaying the
`finish` event until `callback` is called. This is useful to close resources
or write buffered data before a stream ends.
#### Errors While Writing #### Errors While Writing
It is recommended that errors occurring during the processing of the It is recommended that errors occurring during the processing of the
@ -2115,6 +2136,7 @@ readable buffer so there is nothing for a user to consume.
[stream-_transform]: #stream_transform_transform_chunk_encoding_callback [stream-_transform]: #stream_transform_transform_chunk_encoding_callback
[stream-_write]: #stream_writable_write_chunk_encoding_callback_1 [stream-_write]: #stream_writable_write_chunk_encoding_callback_1
[stream-_writev]: #stream_writable_writev_chunks_callback [stream-_writev]: #stream_writable_writev_chunks_callback
[stream-_final]: #stream_writable_final_callback
[stream-end]: #stream_writable_end_chunk_encoding_callback [stream-end]: #stream_writable_end_chunk_encoding_callback
[stream-pause]: #stream_readable_pause [stream-pause]: #stream_readable_pause
[stream-push]: #stream_readable_push_chunk_encoding [stream-push]: #stream_readable_push_chunk_encoding

View File

@ -58,6 +58,12 @@ function WritableState(options, stream) {
// cast to ints. // cast to ints.
this.highWaterMark = Math.floor(this.highWaterMark); this.highWaterMark = Math.floor(this.highWaterMark);
// if _final has been called
this.finalCalled = false;
// if _final has been called
this.finalCalled = false;
// drain event flag. // drain event flag.
this.needDrain = false; this.needDrain = false;
// at the start of calling end() // at the start of calling end()
@ -199,6 +205,9 @@ function Writable(options) {
if (typeof options.destroy === 'function') if (typeof options.destroy === 'function')
this._destroy = options.destroy; this._destroy = options.destroy;
if (typeof options.final === 'function')
this._final = options.final;
} }
Stream.call(this); Stream.call(this);
@ -520,23 +529,37 @@ function needFinish(state) {
!state.finished && !state.finished &&
!state.writing); !state.writing);
} }
function callFinal(stream, state) {
function prefinish(stream, state) { stream._final((err) => {
if (!state.prefinished) { state.pendingcb--;
if (err) {
stream.emit('error', err);
}
state.prefinished = true; state.prefinished = true;
stream.emit('prefinish'); stream.emit('prefinish');
finishMaybe(stream, state);
});
}
function prefinish(stream, state) {
if (!state.prefinished && !state.finalCalled) {
if (typeof stream._final === 'function') {
state.pendingcb++;
state.finalCalled = true;
process.nextTick(callFinal, stream, state);
} else {
state.prefinished = true;
stream.emit('prefinish');
}
} }
} }
function finishMaybe(stream, state) { function finishMaybe(stream, state) {
var need = needFinish(state); var need = needFinish(state);
if (need) { if (need) {
if (state.pendingcb === 0) {
prefinish(stream, state); prefinish(stream, state);
if (state.pendingcb === 0) {
state.finished = true; state.finished = true;
stream.emit('finish'); stream.emit('finish');
} else {
prefinish(stream, state);
} }
} }
return need; return need;

View File

@ -1,19 +1,11 @@
'use strict'; 'use strict';
require('../common'); const common = require('../common');
const assert = require('assert');
const Readable = require('stream').Readable; const Readable = require('stream').Readable;
let _readCalled = false; const _read = common.mustCall(function _read(n) {
function _read(n) {
_readCalled = true;
this.push(null); this.push(null);
} });
const r = new Readable({ read: _read }); const r = new Readable({ read: _read });
r.resume(); r.resume();
process.on('exit', function() {
assert.strictEqual(r._read, _read);
assert(_readCalled);
});

View File

@ -1,24 +1,25 @@
'use strict'; 'use strict';
require('../common'); const common = require('../common');
const assert = require('assert'); const assert = require('assert');
const Transform = require('stream').Transform; const Transform = require('stream').Transform;
let _transformCalled = false; const _transform = common.mustCall(function _transform(d, e, n) {
function _transform(d, e, n) {
_transformCalled = true;
n(); n();
} });
let _flushCalled = false; const _final = common.mustCall(function _final(n) {
function _flush(n) {
_flushCalled = true;
n(); n();
} });
const _flush = common.mustCall(function _flush(n) {
n();
});
const t = new Transform({ const t = new Transform({
transform: _transform, transform: _transform,
flush: _flush flush: _flush,
final: _final
}); });
const t2 = new Transform({}); const t2 = new Transform({});
@ -34,6 +35,5 @@ assert.throws(() => {
process.on('exit', () => { process.on('exit', () => {
assert.strictEqual(t._transform, _transform); assert.strictEqual(t._transform, _transform);
assert.strictEqual(t._flush, _flush); assert.strictEqual(t._flush, _flush);
assert.strictEqual(_transformCalled, true); assert.strictEqual(t._final, _final);
assert.strictEqual(_flushCalled, true);
}); });

View File

@ -0,0 +1,100 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const stream = require('stream');
let state = 0;
/*
What you do
var stream = new tream.Transform({
transform: function transformCallback(chunk, _, next) {
// part 1
this.push(chunk);
//part 2
next();
},
final: function endCallback(done) {
// part 1
process.nextTick(function () {
// part 2
done();
});
},
flush: function flushCallback(done) {
// part 1
process.nextTick(function () {
// part 2
done();
});
}
});
t.on('data', dataListener);
t.on('end', endListener);
t.on('finish', finishListener);
t.write(1);
t.write(4);
t.end(7, endMethodCallback);
The order things are called
1. transformCallback part 1
2. dataListener
3. transformCallback part 2
4. transformCallback part 1
5. dataListener
6. transformCallback part 2
7. transformCallback part 1
8. dataListener
9. transformCallback part 2
10. finalCallback part 1
11. finalCallback part 2
12. flushCallback part 1
13. finishListener
14. endMethodCallback
15. flushCallback part 2
16. endListener
*/
const t = new stream.Transform({
objectMode: true,
transform: common.mustCall(function(chunk, _, next) {
assert.strictEqual(++state, chunk, 'transformCallback part 1');
this.push(state);
assert.strictEqual(++state, chunk + 2, 'transformCallback part 2');
process.nextTick(next);
}, 3),
final: common.mustCall(function(done) {
state++;
assert.strictEqual(state, 10, 'finalCallback part 1');
state++;
assert.strictEqual(state, 11, 'finalCallback part 2');
done();
}, 1),
flush: common.mustCall(function(done) {
state++;
assert.strictEqual(state, 12, 'flushCallback part 1');
process.nextTick(function() {
state++;
assert.strictEqual(state, 15, 'flushCallback part 2');
done();
});
}, 1)
});
t.on('finish', common.mustCall(function() {
state++;
assert.strictEqual(state, 13, 'finishListener');
}, 1));
t.on('end', common.mustCall(function() {
state++;
assert.strictEqual(state, 16, 'end event');
}, 1));
t.on('data', common.mustCall(function(d) {
assert.strictEqual(++state, d + 1, 'dataListener');
}, 3));
t.write(1);
t.write(4);
t.end(7, common.mustCall(function() {
state++;
assert.strictEqual(state, 14, 'endMethodCallback');
}, 1));

View File

@ -0,0 +1,102 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const stream = require('stream');
let state = 0;
/*
What you do
var stream = new tream.Transform({
transform: function transformCallback(chunk, _, next) {
// part 1
this.push(chunk);
//part 2
next();
},
final: function endCallback(done) {
// part 1
process.nextTick(function () {
// part 2
done();
});
},
flush: function flushCallback(done) {
// part 1
process.nextTick(function () {
// part 2
done();
});
}
});
t.on('data', dataListener);
t.on('end', endListener);
t.on('finish', finishListener);
t.write(1);
t.write(4);
t.end(7, endMethodCallback);
The order things are called
1. transformCallback part 1
2. dataListener
3. transformCallback part 2
4. transformCallback part 1
5. dataListener
6. transformCallback part 2
7. transformCallback part 1
8. dataListener
9. transformCallback part 2
10. finalCallback part 1
11. finalCallback part 2
12. flushCallback part 1
13. finishListener
14. endMethodCallback
15. flushCallback part 2
16. endListener
*/
const t = new stream.Transform({
objectMode: true,
transform: common.mustCall(function(chunk, _, next) {
assert.strictEqual(++state, chunk, 'transformCallback part 1');
this.push(state);
assert.strictEqual(++state, chunk + 2, 'transformCallback part 2');
process.nextTick(next);
}, 3),
final: common.mustCall(function(done) {
state++;
assert.strictEqual(state, 10, 'finalCallback part 1');
setTimeout(function() {
state++;
assert.strictEqual(state, 11, 'finalCallback part 2');
done();
}, 100);
}, 1),
flush: common.mustCall(function(done) {
state++;
assert.strictEqual(state, 12, 'flushCallback part 1');
process.nextTick(function() {
state++;
assert.strictEqual(state, 15, 'flushCallback part 2');
done();
});
}, 1)
});
t.on('finish', common.mustCall(function() {
state++;
assert.strictEqual(state, 13, 'finishListener');
}, 1));
t.on('end', common.mustCall(function() {
state++;
assert.strictEqual(state, 16, 'end event');
}, 1));
t.on('data', common.mustCall(function(d) {
assert.strictEqual(++state, d + 1, 'dataListener');
}, 3));
t.write(1);
t.write(4);
t.end(7, common.mustCall(function() {
state++;
assert.strictEqual(state, 14, 'endMethodCallback');
}, 1));

View File

@ -0,0 +1,24 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const stream = require('stream');
let shutdown = false;
const w = new stream.Writable({
final: common.mustCall(function(cb) {
assert.strictEqual(this, w);
setTimeout(function() {
shutdown = true;
cb();
}, 100);
}),
write: function(chunk, e, cb) {
process.nextTick(cb);
}
});
w.on('finish', common.mustCall(function() {
assert(shutdown);
}));
w.write(Buffer.allocUnsafe(1));
w.end(Buffer.allocUnsafe(0));

View File

@ -408,3 +408,25 @@ test('finish is emitted if last chunk is empty', function(t) {
w.write(Buffer.allocUnsafe(1)); w.write(Buffer.allocUnsafe(1));
w.end(Buffer.alloc(0)); w.end(Buffer.alloc(0));
}); });
test('finish is emitted after shutdown', function(t) {
const w = new W();
let shutdown = false;
w._final = common.mustCall(function(cb) {
assert.strictEqual(this, w);
setTimeout(function() {
shutdown = true;
cb();
}, 100);
});
w._write = function(chunk, e, cb) {
process.nextTick(cb);
};
w.on('finish', common.mustCall(function() {
assert(shutdown);
t.end();
}));
w.write(Buffer.allocUnsafe(1));
w.end(Buffer.allocUnsafe(0));
});