zlib: simplify flushing mechanism

Previously, flushing on zlib streams was implemented through
stream 'drain' handlers. This has a number of downsides; in
particular, it is complex, and could lead to unpredictable
behaviour, since it meant that in a sequence like

```js
compressor.write('abc');
compressor.flush();
waitForMoreDataAsynchronously(() => {
  compressor.write('def');
});
```

it was not fully deterministic whether the flush happens after
the second chunk is written or the first one.

This commit replaces this mechanism by one that piggy-backs
along the stream’s write queue, using a “special” `Buffer`
instance that signals that a flush is currently due.

PR-URL: https://github.com/nodejs/node/pull/23186
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Anna Henningsen 2018-09-30 16:13:07 -04:00
parent c001ba6575
commit e688fe6b7e
No known key found for this signature in database
GPG Key ID: 9C63F3A6CD2AD8F9
3 changed files with 21 additions and 31 deletions

View File

@ -311,10 +311,9 @@ function Zlib(opts, mode) {
this._level = level; this._level = level;
this._strategy = strategy; this._strategy = strategy;
this._chunkSize = chunkSize; this._chunkSize = chunkSize;
this._flushFlag = flush; this._defaultFlushFlag = flush;
this._scheduledFlushFlag = Z_NO_FLUSH;
this._origFlushFlag = flush;
this._finishFlushFlag = finishFlush; this._finishFlushFlag = finishFlush;
this._nextFlush = -1;
this._info = opts && opts.info; this._info = opts && opts.info;
this.once('end', this.close); this.once('end', this.close);
} }
@ -398,6 +397,7 @@ function maxFlush(a, b) {
return flushiness[a] > flushiness[b] ? a : b; return flushiness[a] > flushiness[b] ? a : b;
} }
const flushBuffer = Buffer.alloc(0);
Zlib.prototype.flush = function flush(kind, callback) { Zlib.prototype.flush = function flush(kind, callback) {
var ws = this._writableState; var ws = this._writableState;
@ -412,21 +412,13 @@ Zlib.prototype.flush = function flush(kind, callback) {
} else if (ws.ending) { } else if (ws.ending) {
if (callback) if (callback)
this.once('end', callback); this.once('end', callback);
} else if (ws.needDrain) { } else if (this._nextFlush !== -1) {
const alreadyHadFlushScheduled = this._scheduledFlushFlag !== Z_NO_FLUSH; // This means that there is a flush currently in the write queue.
this._scheduledFlushFlag = maxFlush(kind, this._scheduledFlushFlag); // We currently coalesce this flush into the pending one.
this._nextFlush = maxFlush(this._nextFlush, kind);
// If a callback was passed, always register a new `drain` + flush handler,
// mostly because that's simpler and flush callbacks piling up is a rare
// thing anyway.
if (!alreadyHadFlushScheduled || callback) {
const drainHandler = () => this.flush(this._scheduledFlushFlag, callback);
this.once('drain', drainHandler);
}
} else { } else {
this._flushFlag = kind; this._nextFlush = kind;
this.write(Buffer.alloc(0), '', callback); this.write(flushBuffer, '', callback);
this._scheduledFlushFlag = Z_NO_FLUSH;
} }
}; };
@ -436,20 +428,18 @@ Zlib.prototype.close = function close(callback) {
}; };
Zlib.prototype._transform = function _transform(chunk, encoding, cb) { Zlib.prototype._transform = function _transform(chunk, encoding, cb) {
// If it's the last chunk, or a final flush, we use the Z_FINISH flush flag var flushFlag = this._defaultFlushFlag;
// (or whatever flag was provided using opts.finishFlush). // We use a 'fake' zero-length chunk to carry information about flushes from
// If it's explicitly flushing at some other time, then we use // the public API to the actual stream implementation.
// Z_FULL_FLUSH. Otherwise, use the original opts.flush flag. if (chunk === flushBuffer) {
var flushFlag; flushFlag = this._nextFlush;
this._nextFlush = -1;
}
// For the last chunk, also apply `_finishFlushFlag`.
var ws = this._writableState; var ws = this._writableState;
if ((ws.ending || ws.ended) && ws.length === chunk.byteLength) { if ((ws.ending || ws.ended) && ws.length === chunk.byteLength) {
flushFlag = this._finishFlushFlag; flushFlag = maxFlush(flushFlag, this._finishFlushFlag);
} else {
flushFlag = this._flushFlag;
// once we've flushed the last of the queue, stop flushing and
// go back to the normal behavior.
if (chunk.byteLength >= ws.length)
this._flushFlag = this._origFlushFlag;
} }
processChunk(this, chunk, flushFlag, cb); processChunk(this, chunk, flushFlag, cb);
}; };

View File

@ -44,5 +44,5 @@ process.once('exit', function() {
assert.strictEqual( assert.strictEqual(
drainCount, 1); drainCount, 1);
assert.strictEqual( assert.strictEqual(
flushCount, 2); flushCount, 1);
}); });

View File

@ -35,7 +35,7 @@ gunz.setEncoding('utf8');
gunz.on('data', (c) => output += c); gunz.on('data', (c) => output += c);
gunz.on('end', common.mustCall(() => { gunz.on('end', common.mustCall(() => {
assert.strictEqual(output, input); assert.strictEqual(output, input);
assert.strictEqual(gzip._flushFlag, zlib.constants.Z_NO_FLUSH); assert.strictEqual(gzip._nextFlush, -1);
})); }));
// make sure that flush/write doesn't trigger an assert failure // make sure that flush/write doesn't trigger an assert failure