From 5f7dbf45a3d3e3070d5f58f9a9c2c43dbecc8672 Mon Sep 17 00:00:00 2001 From: Philipp Dunkel Date: Tue, 10 Jun 2025 17:27:17 +0100 Subject: [PATCH] fs: allow correct handling of burst in fs-events with AsyncIterator PR-URL: https://github.com/nodejs/node/pull/58490 Reviewed-By: Joyee Cheung Reviewed-By: Ethan Arrowood Reviewed-By: Benjamin Gruenbaum --- doc/api/errors.md | 7 +++ doc/api/fs.md | 5 ++ lib/internal/errors.js | 1 + lib/internal/fs/watchers.js | 47 ++++++++++++--- test/parallel/parallel.status | 3 + .../test-fs-promises-watch-iterator.js | 57 +++++++++++++++++++ test/parallel/test-fs-promises-watch.js | 19 +++++++ 7 files changed, 131 insertions(+), 8 deletions(-) create mode 100644 test/parallel/test-fs-promises-watch-iterator.js diff --git a/doc/api/errors.md b/doc/api/errors.md index fadc4142c2d..b06abbf8d96 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -1365,6 +1365,13 @@ Path is a directory. An attempt has been made to read a file whose size is larger than the maximum allowed size for a `Buffer`. + + +### `ERR_FS_WATCH_QUEUE_OVERFLOW` + +The number of file system events queued without being handled exceeded the size specified in +`maxQueue` in `fs.watch()`. + ### `ERR_HTTP2_ALTSVC_INVALID_ORIGIN` diff --git a/doc/api/fs.md b/doc/api/fs.md index 2d3c01312e3..2232528e47c 100644 --- a/doc/api/fs.md +++ b/doc/api/fs.md @@ -1797,6 +1797,11 @@ added: filename passed to the listener. **Default:** `'utf8'`. * `signal` {AbortSignal} An {AbortSignal} used to signal when the watcher should stop. + * `maxQueue` {number} Specifies the number of events to queue between iterations + of the {AsyncIterator} returned. **Default:** `2048`. + * `overflow` {string} Either `'ignore'` or `'throw'` when there are more events to be + queued than `maxQueue` allows. `'ignore'` means overflow events are dropped and a + warning is emitted, while `'throw'` means to throw an exception. **Default:** `'ignore'`. * Returns: {AsyncIterator} of objects with the properties: * `eventType` {string} The type of change * `filename` {string|Buffer|null} The name of the file changed. diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 6c48b1fd31b..1714744915f 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -1219,6 +1219,7 @@ E('ERR_FS_CP_SYMLINK_TO_SUBDIRECTORY', E('ERR_FS_CP_UNKNOWN', 'Cannot copy an unknown file type', SystemError); E('ERR_FS_EISDIR', 'Path is a directory', SystemError, HideStackFramesError); E('ERR_FS_FILE_TOO_LARGE', 'File size (%s) is greater than 2 GiB', RangeError); +E('ERR_FS_WATCH_QUEUE_OVERFLOW', 'fs.watch() queued more than %d events', Error); E('ERR_HTTP2_ALTSVC_INVALID_ORIGIN', 'HTTP/2 ALTSVC frames require a valid origin', TypeError); E('ERR_HTTP2_ALTSVC_LENGTH', diff --git a/lib/internal/fs/watchers.js b/lib/internal/fs/watchers.js index d1d8bcf3726..605dee28cac 100644 --- a/lib/internal/fs/watchers.js +++ b/lib/internal/fs/watchers.js @@ -1,6 +1,9 @@ 'use strict'; const { + ArrayPrototypePush, + ArrayPrototypeShift, + Error, FunctionPrototypeCall, ObjectDefineProperty, ObjectSetPrototypeOf, @@ -12,9 +15,11 @@ const { AbortError, UVException, codes: { + ERR_FS_WATCH_QUEUE_OVERFLOW, ERR_INVALID_ARG_VALUE, }, } = require('internal/errors'); + const { kEmptyObject, } = require('internal/util'); @@ -45,6 +50,8 @@ const { validateBoolean, validateObject, validateUint32, + validateInteger, + validateOneOf, } = require('internal/validators'); const { @@ -309,11 +316,15 @@ async function* watch(filename, options = kEmptyObject) { persistent = true, recursive = false, encoding = 'utf8', + maxQueue = 2048, + overflow = 'ignore', signal, } = options; validateBoolean(persistent, 'options.persistent'); validateBoolean(recursive, 'options.recursive'); + validateInteger(maxQueue, 'options.maxQueue'); + validateOneOf(overflow, 'options.overflow', ['ignore', 'error']); validateAbortSignal(signal, 'options.signal'); if (encoding && !isEncoding(encoding)) { @@ -325,10 +336,11 @@ async function* watch(filename, options = kEmptyObject) { throw new AbortError(undefined, { cause: signal.reason }); const handle = new FSEvent(); - let { promise, resolve, reject } = PromiseWithResolvers(); + let { promise, resolve } = PromiseWithResolvers(); + const queue = []; const oncancel = () => { handle.close(); - reject(new AbortError(undefined, { cause: signal?.reason })); + resolve(); }; try { @@ -345,11 +357,20 @@ async function* watch(filename, options = kEmptyObject) { }); error.filename = filename; handle.close(); - reject(error); + ArrayPrototypePush(queue, error); + resolve(); return; } - - resolve({ eventType, filename }); + if (queue.length < maxQueue) { + ArrayPrototypePush(queue, { __proto__: null, eventType, filename }); + resolve(); + } else if (overflow === 'error') { + queue.length = 0; + ArrayPrototypePush(queue, new ERR_FS_WATCH_QUEUE_OVERFLOW(maxQueue)); + resolve(); + } else { + process.emitWarning('fs.watch maxQueue exceeded'); + } }; const err = handle.start(path, persistent, recursive, encoding); @@ -367,10 +388,20 @@ async function* watch(filename, options = kEmptyObject) { } while (!signal?.aborted) { - yield await promise; - ({ promise, resolve, reject } = PromiseWithResolvers()); + await promise; + while (queue.length) { + const item = ArrayPrototypeShift(queue); + if (item instanceof Error) { + throw item; + } else { + yield item; + } + } + ({ promise, resolve } = PromiseWithResolvers()); + } + if (signal?.aborted) { + throw new AbortError(undefined, { cause: signal?.reason }); } - throw new AbortError(undefined, { cause: signal?.reason }); } finally { handle.close(); signal?.removeEventListener('abort', oncancel); diff --git a/test/parallel/parallel.status b/test/parallel/parallel.status index e612653d45c..9c534c4492a 100644 --- a/test/parallel/parallel.status +++ b/test/parallel/parallel.status @@ -77,6 +77,7 @@ test-domain-throw-error-then-throw-from-uncaught-exception-handler: PASS, FLAKY test-domain-with-abort-on-uncaught-exception: PASS, FLAKY # https://github.com/nodejs/node/issues/54346 test-esm-loader-hooks-inspect-wait: PASS, FLAKY +test-fs-promises-watch-iterator: SKIP # https://github.com/nodejs/node/issues/50050 test-tick-processor-arguments: SKIP # https://github.com/nodejs/node/issues/54534 @@ -85,6 +86,7 @@ test-runner-run-watch: PASS, FLAKY [$system==freebsd] # https://github.com/nodejs/node/issues/54346 test-esm-loader-hooks-inspect-wait: PASS, FLAKY +test-fs-promises-watch-iterator: SKIP [$system==aix] # https://github.com/nodejs/node/issues/54346 @@ -95,6 +97,7 @@ test-esm-loader-hooks-inspect-wait: PASS, FLAKY test-child-process-fork-net-server: SKIP test-cli-node-options: SKIP test-cluster-shared-leak: SKIP +test-fs-promises-watch-iterator: SKIP test-http-writable-true-after-close: SKIP test-http2-connect-method: SKIP test-net-error-twice: SKIP diff --git a/test/parallel/test-fs-promises-watch-iterator.js b/test/parallel/test-fs-promises-watch-iterator.js new file mode 100644 index 00000000000..1606bdef422 --- /dev/null +++ b/test/parallel/test-fs-promises-watch-iterator.js @@ -0,0 +1,57 @@ +'use strict'; +// This tests that when there is a burst of fs watch events, the events +// emitted after the consumer receives the initial event and before the +// control returns back to fs.watch() can be queued up and show up +// in the next iteration. +const common = require('../common'); +const { watch, writeFile } = require('fs/promises'); +const fs = require('fs'); +const assert = require('assert'); +const { join } = require('path'); +const { setTimeout } = require('timers/promises'); +const tmpdir = require('../common/tmpdir'); + +class WatchTestCase { + constructor(dirName, files) { + this.dirName = dirName; + this.files = files; + } + get dirPath() { return tmpdir.resolve(this.dirName); } + filePath(fileName) { return join(this.dirPath, fileName); } + + async run() { + await Promise.all([this.watchFiles(), this.writeFiles()]); + assert(!this.files.length); + } + async watchFiles() { + const watcher = watch(this.dirPath); + for await (const evt of watcher) { + const idx = this.files.indexOf(evt.filename); + if (idx < 0) continue; + this.files.splice(idx, 1); + await setTimeout(common.platformTimeout(100)); + if (!this.files.length) break; + } + } + async writeFiles() { + for (const fileName of [...this.files]) { + await writeFile(this.filePath(fileName), Date.now() + fileName.repeat(1e4)); + } + await setTimeout(common.platformTimeout(100)); + } +} + +const kCases = [ + // Watch on a directory should callback with a filename on supported systems + new WatchTestCase( + 'watch1', + ['foo', 'bar', 'baz'] + ), +]; + +tmpdir.refresh(); + +for (const testCase of kCases) { + fs.mkdirSync(testCase.dirPath); + testCase.run().then(common.mustCall()); +} diff --git a/test/parallel/test-fs-promises-watch.js b/test/parallel/test-fs-promises-watch.js index 692ed33dbcd..65ac429a59a 100644 --- a/test/parallel/test-fs-promises-watch.js +++ b/test/parallel/test-fs-promises-watch.js @@ -123,6 +123,25 @@ assert.rejects( }, { code: 'ERR_INVALID_ARG_TYPE' }).then(common.mustCall()); +assert.rejects( + async () => { + // eslint-disable-next-line no-unused-vars, no-empty + for await (const _ of watch('', { maxQueue: 'silly' })) { } + }, + { code: 'ERR_INVALID_ARG_TYPE' }).then(common.mustCall()); +assert.rejects( + async () => { + // eslint-disable-next-line no-unused-vars, no-empty + for await (const _ of watch('', { overflow: 1 })) { } + }, + { code: 'ERR_INVALID_ARG_VALUE' }).then(common.mustCall()); +assert.rejects( + async () => { + // eslint-disable-next-line no-unused-vars, no-empty + for await (const _ of watch('', { overflow: 'barf' })) { } + }, + { code: 'ERR_INVALID_ARG_VALUE' }).then(common.mustCall()); + (async () => { const ac = new AbortController(); const { signal } = ac;