zlib: do not coalesce multiple .flush()
calls
This is an approach to address the issue linked below. Previously, when `.write()` and `.flush()` calls to a zlib stream were interleaved synchronously (i.e. without waiting for these operations to finish), multiple flush calls would have been coalesced into a single flushing operation. This patch changes behaviour so that each `.flush()` all corresponds to one flushing operation on the underlying zlib resource, and the order of operations is as if the `.flush()` call were a `.write()` call. One test had to be removed because it specifically tested the previous behaviour. As a drive-by fix, this also makes sure that all flush callbacks are called. Previously, that was not the case. Fixes: https://github.com/nodejs/node/issues/28478 PR-URL: https://github.com/nodejs/node/pull/28520 Reviewed-By: Rich Trott <rtrott@gmail.com> Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de> Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
This commit is contained in:
parent
efc39464b0
commit
518ffc1256
35
lib/zlib.js
35
lib/zlib.js
@ -49,6 +49,8 @@ const {
|
|||||||
} = require('buffer');
|
} = require('buffer');
|
||||||
const { owner_symbol } = require('internal/async_hooks').symbols;
|
const { owner_symbol } = require('internal/async_hooks').symbols;
|
||||||
|
|
||||||
|
const kFlushFlag = Symbol('kFlushFlag');
|
||||||
|
|
||||||
const constants = internalBinding('constants').zlib;
|
const constants = internalBinding('constants').zlib;
|
||||||
const {
|
const {
|
||||||
// Zlib flush levels
|
// Zlib flush levels
|
||||||
@ -261,7 +263,6 @@ function ZlibBase(opts, mode, handle, { flush, finishFlush, fullFlush }) {
|
|||||||
this._chunkSize = chunkSize;
|
this._chunkSize = chunkSize;
|
||||||
this._defaultFlushFlag = flush;
|
this._defaultFlushFlag = flush;
|
||||||
this._finishFlushFlag = finishFlush;
|
this._finishFlushFlag = finishFlush;
|
||||||
this._nextFlush = -1;
|
|
||||||
this._defaultFullFlushFlag = fullFlush;
|
this._defaultFullFlushFlag = fullFlush;
|
||||||
this.once('end', this.close);
|
this.once('end', this.close);
|
||||||
this._info = opts && opts.info;
|
this._info = opts && opts.info;
|
||||||
@ -308,13 +309,16 @@ ZlibBase.prototype._flush = function(callback) {
|
|||||||
|
|
||||||
// If a flush is scheduled while another flush is still pending, a way to figure
|
// If a flush is scheduled while another flush is still pending, a way to figure
|
||||||
// out which one is the "stronger" flush is needed.
|
// out which one is the "stronger" flush is needed.
|
||||||
|
// This is currently only used to figure out which flush flag to use for the
|
||||||
|
// last chunk.
|
||||||
// Roughly, the following holds:
|
// Roughly, the following holds:
|
||||||
// Z_NO_FLUSH (< Z_TREES) < Z_BLOCK < Z_PARTIAL_FLUSH <
|
// Z_NO_FLUSH (< Z_TREES) < Z_BLOCK < Z_PARTIAL_FLUSH <
|
||||||
// Z_SYNC_FLUSH < Z_FULL_FLUSH < Z_FINISH
|
// Z_SYNC_FLUSH < Z_FULL_FLUSH < Z_FINISH
|
||||||
const flushiness = [];
|
const flushiness = [];
|
||||||
let i = 0;
|
let i = 0;
|
||||||
for (const flushFlag of [Z_NO_FLUSH, Z_BLOCK, Z_PARTIAL_FLUSH,
|
const kFlushFlagList = [Z_NO_FLUSH, Z_BLOCK, Z_PARTIAL_FLUSH,
|
||||||
Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH]) {
|
Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH];
|
||||||
|
for (const flushFlag of kFlushFlagList) {
|
||||||
flushiness[flushFlag] = i++;
|
flushiness[flushFlag] = i++;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -322,7 +326,18 @@ function maxFlush(a, b) {
|
|||||||
return flushiness[a] > flushiness[b] ? a : b;
|
return flushiness[a] > flushiness[b] ? a : b;
|
||||||
}
|
}
|
||||||
|
|
||||||
const flushBuffer = Buffer.alloc(0);
|
// Set up a list of 'special' buffers that can be written using .write()
|
||||||
|
// from the .flush() code as a way of introducing flushing operations into the
|
||||||
|
// write sequence.
|
||||||
|
const kFlushBuffers = [];
|
||||||
|
{
|
||||||
|
const dummyArrayBuffer = new ArrayBuffer();
|
||||||
|
for (const flushFlag of kFlushFlagList) {
|
||||||
|
kFlushBuffers[flushFlag] = Buffer.from(dummyArrayBuffer);
|
||||||
|
kFlushBuffers[flushFlag][kFlushFlag] = flushFlag;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ZlibBase.prototype.flush = function(kind, callback) {
|
ZlibBase.prototype.flush = function(kind, callback) {
|
||||||
const ws = this._writableState;
|
const ws = this._writableState;
|
||||||
|
|
||||||
@ -337,13 +352,8 @@ ZlibBase.prototype.flush = function(kind, callback) {
|
|||||||
} else if (ws.ending) {
|
} else if (ws.ending) {
|
||||||
if (callback)
|
if (callback)
|
||||||
this.once('end', callback);
|
this.once('end', callback);
|
||||||
} else if (this._nextFlush !== -1) {
|
|
||||||
// This means that there is a flush currently in the write queue.
|
|
||||||
// We currently coalesce this flush into the pending one.
|
|
||||||
this._nextFlush = maxFlush(this._nextFlush, kind);
|
|
||||||
} else {
|
} else {
|
||||||
this._nextFlush = kind;
|
this.write(kFlushBuffers[kind], '', callback);
|
||||||
this.write(flushBuffer, '', callback);
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -361,9 +371,8 @@ ZlibBase.prototype._transform = function(chunk, encoding, cb) {
|
|||||||
var flushFlag = this._defaultFlushFlag;
|
var flushFlag = this._defaultFlushFlag;
|
||||||
// We use a 'fake' zero-length chunk to carry information about flushes from
|
// We use a 'fake' zero-length chunk to carry information about flushes from
|
||||||
// the public API to the actual stream implementation.
|
// the public API to the actual stream implementation.
|
||||||
if (chunk === flushBuffer) {
|
if (typeof chunk[kFlushFlag] === 'number') {
|
||||||
flushFlag = this._nextFlush;
|
flushFlag = chunk[kFlushFlag];
|
||||||
this._nextFlush = -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// For the last chunk, also apply `_finishFlushFlag`.
|
// For the last chunk, also apply `_finishFlushFlag`.
|
||||||
|
@ -1,39 +0,0 @@
|
|||||||
'use strict';
|
|
||||||
|
|
||||||
const common = require('../common');
|
|
||||||
const assert = require('assert');
|
|
||||||
const zlib = require('zlib');
|
|
||||||
|
|
||||||
const {
|
|
||||||
Z_PARTIAL_FLUSH, Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH
|
|
||||||
} = zlib.constants;
|
|
||||||
|
|
||||||
async function getOutput(...sequenceOfFlushes) {
|
|
||||||
const zipper = zlib.createGzip({ highWaterMark: 16384 });
|
|
||||||
|
|
||||||
zipper.write('A'.repeat(17000));
|
|
||||||
for (const flush of sequenceOfFlushes) {
|
|
||||||
zipper.flush(flush);
|
|
||||||
}
|
|
||||||
|
|
||||||
const data = [];
|
|
||||||
|
|
||||||
return new Promise((resolve) => {
|
|
||||||
zipper.on('data', common.mustCall((d) => {
|
|
||||||
data.push(d);
|
|
||||||
if (data.length === 2) resolve(Buffer.concat(data));
|
|
||||||
}, 2));
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
(async function() {
|
|
||||||
assert.deepStrictEqual(await getOutput(Z_SYNC_FLUSH),
|
|
||||||
await getOutput(Z_SYNC_FLUSH, Z_PARTIAL_FLUSH));
|
|
||||||
assert.deepStrictEqual(await getOutput(Z_SYNC_FLUSH),
|
|
||||||
await getOutput(Z_PARTIAL_FLUSH, Z_SYNC_FLUSH));
|
|
||||||
|
|
||||||
assert.deepStrictEqual(await getOutput(Z_FINISH),
|
|
||||||
await getOutput(Z_FULL_FLUSH, Z_FINISH));
|
|
||||||
assert.deepStrictEqual(await getOutput(Z_FINISH),
|
|
||||||
await getOutput(Z_SYNC_FLUSH, Z_FINISH));
|
|
||||||
})();
|
|
57
test/parallel/test-zlib-flush-write-sync-interleaved.js
Normal file
57
test/parallel/test-zlib-flush-write-sync-interleaved.js
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
'use strict';
|
||||||
|
const common = require('../common');
|
||||||
|
const assert = require('assert');
|
||||||
|
const { createGzip, createGunzip, Z_PARTIAL_FLUSH } = require('zlib');
|
||||||
|
|
||||||
|
// Verify that .flush() behaves like .write() in terms of ordering, e.g. in
|
||||||
|
// a sequence like .write() + .flush() + .write() + .flush() each .flush() call
|
||||||
|
// only affects the data written before it.
|
||||||
|
// Refs: https://github.com/nodejs/node/issues/28478
|
||||||
|
|
||||||
|
const compress = createGzip();
|
||||||
|
const decompress = createGunzip();
|
||||||
|
decompress.setEncoding('utf8');
|
||||||
|
|
||||||
|
const events = [];
|
||||||
|
const compressedChunks = [];
|
||||||
|
|
||||||
|
for (const chunk of ['abc', 'def', 'ghi']) {
|
||||||
|
compress.write(chunk, common.mustCall(() => events.push({ written: chunk })));
|
||||||
|
compress.flush(Z_PARTIAL_FLUSH, common.mustCall(() => {
|
||||||
|
events.push('flushed');
|
||||||
|
const chunk = compress.read();
|
||||||
|
if (chunk !== null)
|
||||||
|
compressedChunks.push(chunk);
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
compress.end(common.mustCall(() => {
|
||||||
|
events.push('compress end');
|
||||||
|
writeToDecompress();
|
||||||
|
}));
|
||||||
|
|
||||||
|
function writeToDecompress() {
|
||||||
|
// Write the compressed chunks to a decompressor, one by one, in order to
|
||||||
|
// verify that the flushes actually worked.
|
||||||
|
const chunk = compressedChunks.shift();
|
||||||
|
if (chunk === undefined) return decompress.end();
|
||||||
|
decompress.write(chunk, common.mustCall(() => {
|
||||||
|
events.push({ read: decompress.read() });
|
||||||
|
writeToDecompress();
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
process.on('exit', () => {
|
||||||
|
assert.deepStrictEqual(events, [
|
||||||
|
{ written: 'abc' },
|
||||||
|
'flushed',
|
||||||
|
{ written: 'def' },
|
||||||
|
'flushed',
|
||||||
|
{ written: 'ghi' },
|
||||||
|
'flushed',
|
||||||
|
'compress end',
|
||||||
|
{ read: 'abc' },
|
||||||
|
{ read: 'def' },
|
||||||
|
{ read: 'ghi' }
|
||||||
|
]);
|
||||||
|
});
|
@ -39,7 +39,6 @@ for (const [ createCompress, createDecompress ] of [
|
|||||||
gunz.on('data', (c) => output += c);
|
gunz.on('data', (c) => output += c);
|
||||||
gunz.on('end', common.mustCall(() => {
|
gunz.on('end', common.mustCall(() => {
|
||||||
assert.strictEqual(output, input);
|
assert.strictEqual(output, input);
|
||||||
assert.strictEqual(gzip._nextFlush, -1);
|
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Make sure that flush/write doesn't trigger an assert failure
|
// Make sure that flush/write doesn't trigger an assert failure
|
||||||
|
Loading…
x
Reference in New Issue
Block a user