fs: allow correct handling of burst in fs-events with AsyncIterator

PR-URL: https://github.com/nodejs/node/pull/58490
Reviewed-By: Joyee Cheung <joyeec9h3@gmail.com>
Reviewed-By: Ethan Arrowood <ethan@arrowood.dev>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
This commit is contained in:
Philipp Dunkel 2025-06-10 17:27:17 +01:00 committed by GitHub
parent be2120f1cd
commit 5f7dbf45a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 131 additions and 8 deletions

View File

@ -1365,6 +1365,13 @@ Path is a directory.
An attempt has been made to read a file whose size is larger than the maximum An attempt has been made to read a file whose size is larger than the maximum
allowed size for a `Buffer`. allowed size for a `Buffer`.
<a id="ERR_FS_WATCH_QUEUE_OVERFLOW"></a>
### `ERR_FS_WATCH_QUEUE_OVERFLOW`
The number of file system events queued without being handled exceeded the size specified in
`maxQueue` in `fs.watch()`.
<a id="ERR_HTTP2_ALTSVC_INVALID_ORIGIN"></a> <a id="ERR_HTTP2_ALTSVC_INVALID_ORIGIN"></a>
### `ERR_HTTP2_ALTSVC_INVALID_ORIGIN` ### `ERR_HTTP2_ALTSVC_INVALID_ORIGIN`

View File

@ -1797,6 +1797,11 @@ added:
filename passed to the listener. **Default:** `'utf8'`. filename passed to the listener. **Default:** `'utf8'`.
* `signal` {AbortSignal} An {AbortSignal} used to signal when the watcher * `signal` {AbortSignal} An {AbortSignal} used to signal when the watcher
should stop. should stop.
* `maxQueue` {number} Specifies the number of events to queue between iterations
of the {AsyncIterator} returned. **Default:** `2048`.
* `overflow` {string} Either `'ignore'` or `'throw'` when there are more events to be
queued than `maxQueue` allows. `'ignore'` means overflow events are dropped and a
warning is emitted, while `'throw'` means to throw an exception. **Default:** `'ignore'`.
* Returns: {AsyncIterator} of objects with the properties: * Returns: {AsyncIterator} of objects with the properties:
* `eventType` {string} The type of change * `eventType` {string} The type of change
* `filename` {string|Buffer|null} The name of the file changed. * `filename` {string|Buffer|null} The name of the file changed.

View File

@ -1219,6 +1219,7 @@ E('ERR_FS_CP_SYMLINK_TO_SUBDIRECTORY',
E('ERR_FS_CP_UNKNOWN', 'Cannot copy an unknown file type', SystemError); E('ERR_FS_CP_UNKNOWN', 'Cannot copy an unknown file type', SystemError);
E('ERR_FS_EISDIR', 'Path is a directory', SystemError, HideStackFramesError); E('ERR_FS_EISDIR', 'Path is a directory', SystemError, HideStackFramesError);
E('ERR_FS_FILE_TOO_LARGE', 'File size (%s) is greater than 2 GiB', RangeError); E('ERR_FS_FILE_TOO_LARGE', 'File size (%s) is greater than 2 GiB', RangeError);
E('ERR_FS_WATCH_QUEUE_OVERFLOW', 'fs.watch() queued more than %d events', Error);
E('ERR_HTTP2_ALTSVC_INVALID_ORIGIN', E('ERR_HTTP2_ALTSVC_INVALID_ORIGIN',
'HTTP/2 ALTSVC frames require a valid origin', TypeError); 'HTTP/2 ALTSVC frames require a valid origin', TypeError);
E('ERR_HTTP2_ALTSVC_LENGTH', E('ERR_HTTP2_ALTSVC_LENGTH',

View File

@ -1,6 +1,9 @@
'use strict'; 'use strict';
const { const {
ArrayPrototypePush,
ArrayPrototypeShift,
Error,
FunctionPrototypeCall, FunctionPrototypeCall,
ObjectDefineProperty, ObjectDefineProperty,
ObjectSetPrototypeOf, ObjectSetPrototypeOf,
@ -12,9 +15,11 @@ const {
AbortError, AbortError,
UVException, UVException,
codes: { codes: {
ERR_FS_WATCH_QUEUE_OVERFLOW,
ERR_INVALID_ARG_VALUE, ERR_INVALID_ARG_VALUE,
}, },
} = require('internal/errors'); } = require('internal/errors');
const { const {
kEmptyObject, kEmptyObject,
} = require('internal/util'); } = require('internal/util');
@ -45,6 +50,8 @@ const {
validateBoolean, validateBoolean,
validateObject, validateObject,
validateUint32, validateUint32,
validateInteger,
validateOneOf,
} = require('internal/validators'); } = require('internal/validators');
const { const {
@ -309,11 +316,15 @@ async function* watch(filename, options = kEmptyObject) {
persistent = true, persistent = true,
recursive = false, recursive = false,
encoding = 'utf8', encoding = 'utf8',
maxQueue = 2048,
overflow = 'ignore',
signal, signal,
} = options; } = options;
validateBoolean(persistent, 'options.persistent'); validateBoolean(persistent, 'options.persistent');
validateBoolean(recursive, 'options.recursive'); validateBoolean(recursive, 'options.recursive');
validateInteger(maxQueue, 'options.maxQueue');
validateOneOf(overflow, 'options.overflow', ['ignore', 'error']);
validateAbortSignal(signal, 'options.signal'); validateAbortSignal(signal, 'options.signal');
if (encoding && !isEncoding(encoding)) { if (encoding && !isEncoding(encoding)) {
@ -325,10 +336,11 @@ async function* watch(filename, options = kEmptyObject) {
throw new AbortError(undefined, { cause: signal.reason }); throw new AbortError(undefined, { cause: signal.reason });
const handle = new FSEvent(); const handle = new FSEvent();
let { promise, resolve, reject } = PromiseWithResolvers(); let { promise, resolve } = PromiseWithResolvers();
const queue = [];
const oncancel = () => { const oncancel = () => {
handle.close(); handle.close();
reject(new AbortError(undefined, { cause: signal?.reason })); resolve();
}; };
try { try {
@ -345,11 +357,20 @@ async function* watch(filename, options = kEmptyObject) {
}); });
error.filename = filename; error.filename = filename;
handle.close(); handle.close();
reject(error); ArrayPrototypePush(queue, error);
resolve();
return; return;
} }
if (queue.length < maxQueue) {
resolve({ eventType, filename }); ArrayPrototypePush(queue, { __proto__: null, eventType, filename });
resolve();
} else if (overflow === 'error') {
queue.length = 0;
ArrayPrototypePush(queue, new ERR_FS_WATCH_QUEUE_OVERFLOW(maxQueue));
resolve();
} else {
process.emitWarning('fs.watch maxQueue exceeded');
}
}; };
const err = handle.start(path, persistent, recursive, encoding); const err = handle.start(path, persistent, recursive, encoding);
@ -367,10 +388,20 @@ async function* watch(filename, options = kEmptyObject) {
} }
while (!signal?.aborted) { while (!signal?.aborted) {
yield await promise; await promise;
({ promise, resolve, reject } = PromiseWithResolvers()); while (queue.length) {
const item = ArrayPrototypeShift(queue);
if (item instanceof Error) {
throw item;
} else {
yield item;
} }
}
({ promise, resolve } = PromiseWithResolvers());
}
if (signal?.aborted) {
throw new AbortError(undefined, { cause: signal?.reason }); throw new AbortError(undefined, { cause: signal?.reason });
}
} finally { } finally {
handle.close(); handle.close();
signal?.removeEventListener('abort', oncancel); signal?.removeEventListener('abort', oncancel);

View File

@ -77,6 +77,7 @@ test-domain-throw-error-then-throw-from-uncaught-exception-handler: PASS, FLAKY
test-domain-with-abort-on-uncaught-exception: PASS, FLAKY test-domain-with-abort-on-uncaught-exception: PASS, FLAKY
# https://github.com/nodejs/node/issues/54346 # https://github.com/nodejs/node/issues/54346
test-esm-loader-hooks-inspect-wait: PASS, FLAKY test-esm-loader-hooks-inspect-wait: PASS, FLAKY
test-fs-promises-watch-iterator: SKIP
# https://github.com/nodejs/node/issues/50050 # https://github.com/nodejs/node/issues/50050
test-tick-processor-arguments: SKIP test-tick-processor-arguments: SKIP
# https://github.com/nodejs/node/issues/54534 # https://github.com/nodejs/node/issues/54534
@ -85,6 +86,7 @@ test-runner-run-watch: PASS, FLAKY
[$system==freebsd] [$system==freebsd]
# https://github.com/nodejs/node/issues/54346 # https://github.com/nodejs/node/issues/54346
test-esm-loader-hooks-inspect-wait: PASS, FLAKY test-esm-loader-hooks-inspect-wait: PASS, FLAKY
test-fs-promises-watch-iterator: SKIP
[$system==aix] [$system==aix]
# https://github.com/nodejs/node/issues/54346 # https://github.com/nodejs/node/issues/54346
@ -95,6 +97,7 @@ test-esm-loader-hooks-inspect-wait: PASS, FLAKY
test-child-process-fork-net-server: SKIP test-child-process-fork-net-server: SKIP
test-cli-node-options: SKIP test-cli-node-options: SKIP
test-cluster-shared-leak: SKIP test-cluster-shared-leak: SKIP
test-fs-promises-watch-iterator: SKIP
test-http-writable-true-after-close: SKIP test-http-writable-true-after-close: SKIP
test-http2-connect-method: SKIP test-http2-connect-method: SKIP
test-net-error-twice: SKIP test-net-error-twice: SKIP

View File

@ -0,0 +1,57 @@
'use strict';
// This tests that when there is a burst of fs watch events, the events
// emitted after the consumer receives the initial event and before the
// control returns back to fs.watch() can be queued up and show up
// in the next iteration.
const common = require('../common');
const { watch, writeFile } = require('fs/promises');
const fs = require('fs');
const assert = require('assert');
const { join } = require('path');
const { setTimeout } = require('timers/promises');
const tmpdir = require('../common/tmpdir');
class WatchTestCase {
constructor(dirName, files) {
this.dirName = dirName;
this.files = files;
}
get dirPath() { return tmpdir.resolve(this.dirName); }
filePath(fileName) { return join(this.dirPath, fileName); }
async run() {
await Promise.all([this.watchFiles(), this.writeFiles()]);
assert(!this.files.length);
}
async watchFiles() {
const watcher = watch(this.dirPath);
for await (const evt of watcher) {
const idx = this.files.indexOf(evt.filename);
if (idx < 0) continue;
this.files.splice(idx, 1);
await setTimeout(common.platformTimeout(100));
if (!this.files.length) break;
}
}
async writeFiles() {
for (const fileName of [...this.files]) {
await writeFile(this.filePath(fileName), Date.now() + fileName.repeat(1e4));
}
await setTimeout(common.platformTimeout(100));
}
}
const kCases = [
// Watch on a directory should callback with a filename on supported systems
new WatchTestCase(
'watch1',
['foo', 'bar', 'baz']
),
];
tmpdir.refresh();
for (const testCase of kCases) {
fs.mkdirSync(testCase.dirPath);
testCase.run().then(common.mustCall());
}

View File

@ -123,6 +123,25 @@ assert.rejects(
}, },
{ code: 'ERR_INVALID_ARG_TYPE' }).then(common.mustCall()); { code: 'ERR_INVALID_ARG_TYPE' }).then(common.mustCall());
assert.rejects(
async () => {
// eslint-disable-next-line no-unused-vars, no-empty
for await (const _ of watch('', { maxQueue: 'silly' })) { }
},
{ code: 'ERR_INVALID_ARG_TYPE' }).then(common.mustCall());
assert.rejects(
async () => {
// eslint-disable-next-line no-unused-vars, no-empty
for await (const _ of watch('', { overflow: 1 })) { }
},
{ code: 'ERR_INVALID_ARG_VALUE' }).then(common.mustCall());
assert.rejects(
async () => {
// eslint-disable-next-line no-unused-vars, no-empty
for await (const _ of watch('', { overflow: 'barf' })) { }
},
{ code: 'ERR_INVALID_ARG_VALUE' }).then(common.mustCall());
(async () => { (async () => {
const ac = new AbortController(); const ac = new AbortController();
const { signal } = ac; const { signal } = ac;