diff --git a/doc/api/errors.md b/doc/api/errors.md
index fadc4142c2d..b06abbf8d96 100644
--- a/doc/api/errors.md
+++ b/doc/api/errors.md
@@ -1365,6 +1365,13 @@ Path is a directory.
An attempt has been made to read a file whose size is larger than the maximum
allowed size for a `Buffer`.
+
+
+### `ERR_FS_WATCH_QUEUE_OVERFLOW`
+
+The number of file system events queued without being handled exceeded the size specified in
+`maxQueue` in `fs.watch()`.
+
### `ERR_HTTP2_ALTSVC_INVALID_ORIGIN`
diff --git a/doc/api/fs.md b/doc/api/fs.md
index 2d3c01312e3..2232528e47c 100644
--- a/doc/api/fs.md
+++ b/doc/api/fs.md
@@ -1797,6 +1797,11 @@ added:
filename passed to the listener. **Default:** `'utf8'`.
* `signal` {AbortSignal} An {AbortSignal} used to signal when the watcher
should stop.
+ * `maxQueue` {number} Specifies the number of events to queue between iterations
+ of the {AsyncIterator} returned. **Default:** `2048`.
+ * `overflow` {string} Either `'ignore'` or `'throw'` when there are more events to be
+ queued than `maxQueue` allows. `'ignore'` means overflow events are dropped and a
+ warning is emitted, while `'throw'` means to throw an exception. **Default:** `'ignore'`.
* Returns: {AsyncIterator} of objects with the properties:
* `eventType` {string} The type of change
* `filename` {string|Buffer|null} The name of the file changed.
diff --git a/lib/internal/errors.js b/lib/internal/errors.js
index 6c48b1fd31b..1714744915f 100644
--- a/lib/internal/errors.js
+++ b/lib/internal/errors.js
@@ -1219,6 +1219,7 @@ E('ERR_FS_CP_SYMLINK_TO_SUBDIRECTORY',
E('ERR_FS_CP_UNKNOWN', 'Cannot copy an unknown file type', SystemError);
E('ERR_FS_EISDIR', 'Path is a directory', SystemError, HideStackFramesError);
E('ERR_FS_FILE_TOO_LARGE', 'File size (%s) is greater than 2 GiB', RangeError);
+E('ERR_FS_WATCH_QUEUE_OVERFLOW', 'fs.watch() queued more than %d events', Error);
E('ERR_HTTP2_ALTSVC_INVALID_ORIGIN',
'HTTP/2 ALTSVC frames require a valid origin', TypeError);
E('ERR_HTTP2_ALTSVC_LENGTH',
diff --git a/lib/internal/fs/watchers.js b/lib/internal/fs/watchers.js
index d1d8bcf3726..605dee28cac 100644
--- a/lib/internal/fs/watchers.js
+++ b/lib/internal/fs/watchers.js
@@ -1,6 +1,9 @@
'use strict';
const {
+ ArrayPrototypePush,
+ ArrayPrototypeShift,
+ Error,
FunctionPrototypeCall,
ObjectDefineProperty,
ObjectSetPrototypeOf,
@@ -12,9 +15,11 @@ const {
AbortError,
UVException,
codes: {
+ ERR_FS_WATCH_QUEUE_OVERFLOW,
ERR_INVALID_ARG_VALUE,
},
} = require('internal/errors');
+
const {
kEmptyObject,
} = require('internal/util');
@@ -45,6 +50,8 @@ const {
validateBoolean,
validateObject,
validateUint32,
+ validateInteger,
+ validateOneOf,
} = require('internal/validators');
const {
@@ -309,11 +316,15 @@ async function* watch(filename, options = kEmptyObject) {
persistent = true,
recursive = false,
encoding = 'utf8',
+ maxQueue = 2048,
+ overflow = 'ignore',
signal,
} = options;
validateBoolean(persistent, 'options.persistent');
validateBoolean(recursive, 'options.recursive');
+ validateInteger(maxQueue, 'options.maxQueue');
+ validateOneOf(overflow, 'options.overflow', ['ignore', 'error']);
validateAbortSignal(signal, 'options.signal');
if (encoding && !isEncoding(encoding)) {
@@ -325,10 +336,11 @@ async function* watch(filename, options = kEmptyObject) {
throw new AbortError(undefined, { cause: signal.reason });
const handle = new FSEvent();
- let { promise, resolve, reject } = PromiseWithResolvers();
+ let { promise, resolve } = PromiseWithResolvers();
+ const queue = [];
const oncancel = () => {
handle.close();
- reject(new AbortError(undefined, { cause: signal?.reason }));
+ resolve();
};
try {
@@ -345,11 +357,20 @@ async function* watch(filename, options = kEmptyObject) {
});
error.filename = filename;
handle.close();
- reject(error);
+ ArrayPrototypePush(queue, error);
+ resolve();
return;
}
-
- resolve({ eventType, filename });
+ if (queue.length < maxQueue) {
+ ArrayPrototypePush(queue, { __proto__: null, eventType, filename });
+ resolve();
+ } else if (overflow === 'error') {
+ queue.length = 0;
+ ArrayPrototypePush(queue, new ERR_FS_WATCH_QUEUE_OVERFLOW(maxQueue));
+ resolve();
+ } else {
+ process.emitWarning('fs.watch maxQueue exceeded');
+ }
};
const err = handle.start(path, persistent, recursive, encoding);
@@ -367,10 +388,20 @@ async function* watch(filename, options = kEmptyObject) {
}
while (!signal?.aborted) {
- yield await promise;
- ({ promise, resolve, reject } = PromiseWithResolvers());
+ await promise;
+ while (queue.length) {
+ const item = ArrayPrototypeShift(queue);
+ if (item instanceof Error) {
+ throw item;
+ } else {
+ yield item;
+ }
+ }
+ ({ promise, resolve } = PromiseWithResolvers());
+ }
+ if (signal?.aborted) {
+ throw new AbortError(undefined, { cause: signal?.reason });
}
- throw new AbortError(undefined, { cause: signal?.reason });
} finally {
handle.close();
signal?.removeEventListener('abort', oncancel);
diff --git a/test/parallel/parallel.status b/test/parallel/parallel.status
index e612653d45c..9c534c4492a 100644
--- a/test/parallel/parallel.status
+++ b/test/parallel/parallel.status
@@ -77,6 +77,7 @@ test-domain-throw-error-then-throw-from-uncaught-exception-handler: PASS, FLAKY
test-domain-with-abort-on-uncaught-exception: PASS, FLAKY
# https://github.com/nodejs/node/issues/54346
test-esm-loader-hooks-inspect-wait: PASS, FLAKY
+test-fs-promises-watch-iterator: SKIP
# https://github.com/nodejs/node/issues/50050
test-tick-processor-arguments: SKIP
# https://github.com/nodejs/node/issues/54534
@@ -85,6 +86,7 @@ test-runner-run-watch: PASS, FLAKY
[$system==freebsd]
# https://github.com/nodejs/node/issues/54346
test-esm-loader-hooks-inspect-wait: PASS, FLAKY
+test-fs-promises-watch-iterator: SKIP
[$system==aix]
# https://github.com/nodejs/node/issues/54346
@@ -95,6 +97,7 @@ test-esm-loader-hooks-inspect-wait: PASS, FLAKY
test-child-process-fork-net-server: SKIP
test-cli-node-options: SKIP
test-cluster-shared-leak: SKIP
+test-fs-promises-watch-iterator: SKIP
test-http-writable-true-after-close: SKIP
test-http2-connect-method: SKIP
test-net-error-twice: SKIP
diff --git a/test/parallel/test-fs-promises-watch-iterator.js b/test/parallel/test-fs-promises-watch-iterator.js
new file mode 100644
index 00000000000..1606bdef422
--- /dev/null
+++ b/test/parallel/test-fs-promises-watch-iterator.js
@@ -0,0 +1,57 @@
+'use strict';
+// This tests that when there is a burst of fs watch events, the events
+// emitted after the consumer receives the initial event and before the
+// control returns back to fs.watch() can be queued up and show up
+// in the next iteration.
+const common = require('../common');
+const { watch, writeFile } = require('fs/promises');
+const fs = require('fs');
+const assert = require('assert');
+const { join } = require('path');
+const { setTimeout } = require('timers/promises');
+const tmpdir = require('../common/tmpdir');
+
+class WatchTestCase {
+ constructor(dirName, files) {
+ this.dirName = dirName;
+ this.files = files;
+ }
+ get dirPath() { return tmpdir.resolve(this.dirName); }
+ filePath(fileName) { return join(this.dirPath, fileName); }
+
+ async run() {
+ await Promise.all([this.watchFiles(), this.writeFiles()]);
+ assert(!this.files.length);
+ }
+ async watchFiles() {
+ const watcher = watch(this.dirPath);
+ for await (const evt of watcher) {
+ const idx = this.files.indexOf(evt.filename);
+ if (idx < 0) continue;
+ this.files.splice(idx, 1);
+ await setTimeout(common.platformTimeout(100));
+ if (!this.files.length) break;
+ }
+ }
+ async writeFiles() {
+ for (const fileName of [...this.files]) {
+ await writeFile(this.filePath(fileName), Date.now() + fileName.repeat(1e4));
+ }
+ await setTimeout(common.platformTimeout(100));
+ }
+}
+
+const kCases = [
+ // Watch on a directory should callback with a filename on supported systems
+ new WatchTestCase(
+ 'watch1',
+ ['foo', 'bar', 'baz']
+ ),
+];
+
+tmpdir.refresh();
+
+for (const testCase of kCases) {
+ fs.mkdirSync(testCase.dirPath);
+ testCase.run().then(common.mustCall());
+}
diff --git a/test/parallel/test-fs-promises-watch.js b/test/parallel/test-fs-promises-watch.js
index 692ed33dbcd..65ac429a59a 100644
--- a/test/parallel/test-fs-promises-watch.js
+++ b/test/parallel/test-fs-promises-watch.js
@@ -123,6 +123,25 @@ assert.rejects(
},
{ code: 'ERR_INVALID_ARG_TYPE' }).then(common.mustCall());
+assert.rejects(
+ async () => {
+ // eslint-disable-next-line no-unused-vars, no-empty
+ for await (const _ of watch('', { maxQueue: 'silly' })) { }
+ },
+ { code: 'ERR_INVALID_ARG_TYPE' }).then(common.mustCall());
+assert.rejects(
+ async () => {
+ // eslint-disable-next-line no-unused-vars, no-empty
+ for await (const _ of watch('', { overflow: 1 })) { }
+ },
+ { code: 'ERR_INVALID_ARG_VALUE' }).then(common.mustCall());
+assert.rejects(
+ async () => {
+ // eslint-disable-next-line no-unused-vars, no-empty
+ for await (const _ of watch('', { overflow: 'barf' })) { }
+ },
+ { code: 'ERR_INVALID_ARG_VALUE' }).then(common.mustCall());
+
(async () => {
const ac = new AbortController();
const { signal } = ac;