Skip to content

Commit

Permalink
fs: remove race condition for recursive watch on Linux
Browse files Browse the repository at this point in the history
Signed-off-by: Matteo Collina <[email protected]>
PR-URL: #51406
Reviewed-By: Yagiz Nizipli <[email protected]>
Reviewed-By: Marco Ippolito <[email protected]>
Reviewed-By: Moshe Atlow <[email protected]>
  • Loading branch information
mcollina authored and targos committed Feb 15, 2024
1 parent d8e1058 commit 15a7f21
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 198 deletions.
2 changes: 1 addition & 1 deletion lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -1255,7 +1255,7 @@ async function* _watch(filename, options = kEmptyObject) {
// e.g. Linux due to the limitations of inotify.
if (options.recursive && !isOSX && !isWindows) {
const watcher = new nonNativeWatcher.FSWatcher(options);
await watcher[kFSWatchStart](filename);
watcher[kFSWatchStart](filename);
yield* watcher;
return;
}
Expand Down
113 changes: 44 additions & 69 deletions lib/internal/fs/recursive_watch.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
'use strict';

const {
ArrayPrototypePush,
SafePromiseAllReturnVoid,
Promise,
PromisePrototypeThen,
SafeMap,
SafeSet,
StringPrototypeStartsWith,
Expand All @@ -31,47 +28,19 @@ const {
} = require('path');

let internalSync;
let internalPromises;

function lazyLoadFsPromises() {
internalPromises ??= require('fs/promises');
return internalPromises;
}

function lazyLoadFsSync() {
internalSync ??= require('fs');
return internalSync;
}
let kResistStopPropagation;

async function traverse(dir, files = new SafeMap(), symbolicLinks = new SafeSet()) {
const { opendir } = lazyLoadFsPromises();

const filenames = await opendir(dir);
const subdirectories = [];

for await (const file of filenames) {
const f = pathJoin(dir, file.name);

files.set(f, file);

// Do not follow symbolic links
if (file.isSymbolicLink()) {
symbolicLinks.add(f);
} else if (file.isDirectory()) {
ArrayPrototypePush(subdirectories, traverse(f, files));
}
}

await SafePromiseAllReturnVoid(subdirectories);

return files;
}
let kResistStopPropagation;

class FSWatcher extends EventEmitter {
#options = null;
#closed = false;
#files = new SafeMap();
#watchers = new SafeMap();
#symbolicFiles = new SafeSet();
#rootPath = pathResolve();
#watchingFile = false;
Expand Down Expand Up @@ -111,11 +80,11 @@ class FSWatcher extends EventEmitter {
return;
}

const { unwatchFile } = lazyLoadFsSync();
this.#closed = true;

for (const file of this.#files.keys()) {
unwatchFile(file);
this.#watchers.get(file).close();
this.#watchers.delete(file);
}

this.#files.clear();
Expand All @@ -124,24 +93,26 @@ class FSWatcher extends EventEmitter {
}

#unwatchFiles(file) {
const { unwatchFile } = lazyLoadFsSync();

this.#symbolicFiles.delete(file);

for (const filename of this.#files.keys()) {
if (StringPrototypeStartsWith(filename, file)) {
unwatchFile(filename);
this.#files.delete(filename);
this.#watchers.get(filename).close();
this.#watchers.delete(filename);
}
}
}

async #watchFolder(folder) {
const { opendir } = lazyLoadFsPromises();
#watchFolder(folder) {
const { readdirSync } = lazyLoadFsSync();

try {
const files = await opendir(folder);
const files = readdirSync(folder, {
withFileTypes: true,
});

for await (const file of files) {
for (const file of files) {
if (this.#closed) {
break;
}
Expand All @@ -155,11 +126,9 @@ class FSWatcher extends EventEmitter {
this.#symbolicFiles.add(f);
}

this.#files.set(f, file);
if (file.isFile()) {
this.#watchFile(f);
} else if (file.isDirectory() && !file.isSymbolicLink()) {
await this.#watchFolder(f);
this.#watchFile(f);
if (file.isDirectory() && !file.isSymbolicLink()) {
this.#watchFolder(f);
}
}
}
Expand All @@ -173,22 +142,30 @@ class FSWatcher extends EventEmitter {
return;
}

const { watchFile } = lazyLoadFsSync();
const existingStat = this.#files.get(file);
const { watch, statSync } = lazyLoadFsSync();

if (this.#files.has(file)) {
return;
}

{
const existingStat = statSync(file);
this.#files.set(file, existingStat);
}

watchFile(file, {
const watcher = watch(file, {
persistent: this.#options.persistent,
}, (currentStats, previousStats) => {
if (existingStat && !existingStat.isDirectory() &&
currentStats.nlink !== 0 && existingStat.mtimeMs === currentStats.mtimeMs) {
return;
}
}, (eventType, filename) => {
const existingStat = this.#files.get(file);
const currentStats = statSync(file);

this.#files.set(file, currentStats);

if (currentStats.birthtimeMs === 0 && previousStats.birthtimeMs !== 0) {
if (currentStats.birthtimeMs === 0 && existingStat.birthtimeMs !== 0) {
// The file is now deleted
this.#files.delete(file);
this.#watchers.delete(file);
watcher.close();
this.emit('change', 'rename', pathRelative(this.#rootPath, file));
this.#unwatchFiles(file);
} else if (file === this.#rootPath && this.#watchingFile) {
Expand All @@ -205,6 +182,7 @@ class FSWatcher extends EventEmitter {
this.emit('change', 'change', pathRelative(this.#rootPath, file));
}
});
this.#watchers.set(file, watcher);
}

[kFSWatchStart](filename) {
Expand All @@ -217,19 +195,9 @@ class FSWatcher extends EventEmitter {
this.#closed = false;
this.#watchingFile = file.isFile();

this.#watchFile(filename);
if (file.isDirectory()) {
this.#files.set(filename, file);

PromisePrototypeThen(
traverse(filename, this.#files, this.#symbolicFiles),
() => {
for (const f of this.#files.keys()) {
this.#watchFile(f);
}
},
);
} else {
this.#watchFile(filename);
this.#watchFolder(filename);
}
} catch (error) {
if (error.code === 'ENOENT') {
Expand Down Expand Up @@ -264,7 +232,10 @@ class FSWatcher extends EventEmitter {
resolve({ __proto__: null, value: { eventType, filename } });
});
} : (resolve, reject) => {
const onAbort = () => reject(new AbortError(undefined, { cause: signal.reason }));
const onAbort = () => {
this.close();
reject(new AbortError(undefined, { cause: signal.reason }));
};
if (signal.aborted) return onAbort();
kResistStopPropagation ??= require('internal/event_target').kResistStopPropagation;
signal.addEventListener('abort', onAbort, { __proto__: null, once: true, [kResistStopPropagation]: true });
Expand All @@ -277,6 +248,10 @@ class FSWatcher extends EventEmitter {
next: () => (this.#closed ?
{ __proto__: null, done: true } :
new Promise(promiseExecutor)),
return: () => {
this.close();
return { __proto__: null, done: true };
},
[SymbolAsyncIterator]() { return this; },
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict';

const common = require('../common');
const { setTimeout } = require('timers/promises');

if (common.isIBMi)
common.skip('IBMi does not support `fs.watch()`');
Expand All @@ -21,39 +20,36 @@ const tmpdir = require('../common/tmpdir');
const testDir = tmpdir.path;
tmpdir.refresh();

(async () => {
// Add a file to subfolder of a watching folder
// Add a file to subfolder of a watching folder

const rootDirectory = fs.mkdtempSync(testDir + path.sep);
const testDirectory = path.join(rootDirectory, 'test-4');
fs.mkdirSync(testDirectory);
const rootDirectory = fs.mkdtempSync(testDir + path.sep);
const testDirectory = path.join(rootDirectory, 'test-4');
fs.mkdirSync(testDirectory);

const file = 'folder-5';
const filePath = path.join(testDirectory, file);
fs.mkdirSync(filePath);
const file = 'folder-5';
const filePath = path.join(testDirectory, file);
fs.mkdirSync(filePath);

const subfolderPath = path.join(filePath, 'subfolder-6');
fs.mkdirSync(subfolderPath);
const subfolderPath = path.join(filePath, 'subfolder-6');
fs.mkdirSync(subfolderPath);

const childrenFile = 'file-7.txt';
const childrenAbsolutePath = path.join(subfolderPath, childrenFile);
const relativePath = path.join(file, path.basename(subfolderPath), childrenFile);
const childrenFile = 'file-7.txt';
const childrenAbsolutePath = path.join(subfolderPath, childrenFile);
const relativePath = path.join(file, path.basename(subfolderPath), childrenFile);

const watcher = fs.watch(testDirectory, { recursive: true });
let watcherClosed = false;
watcher.on('change', function(event, filename) {
assert.strictEqual(event, 'rename');
const watcher = fs.watch(testDirectory, { recursive: true });
let watcherClosed = false;
watcher.on('change', function(event, filename) {
assert.strictEqual(event, 'rename');

if (filename === relativePath) {
watcher.close();
watcherClosed = true;
}
});
if (filename === relativePath) {
watcher.close();
watcherClosed = true;
}
});

await setTimeout(common.platformTimeout(100));
fs.writeFileSync(childrenAbsolutePath, 'world');
fs.writeFileSync(childrenAbsolutePath, 'world');

process.once('exit', function() {
assert(watcherClosed, 'watcher Object was not closed');
});
})().then(common.mustCall());
process.once('exit', function() {
assert(watcherClosed, 'watcher Object was not closed');
});
51 changes: 23 additions & 28 deletions test/parallel/test-fs-watch-recursive-add-file-to-new-folder.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict';

const common = require('../common');
const { setTimeout } = require('timers/promises');

if (common.isIBMi)
common.skip('IBMi does not support `fs.watch()`');
Expand All @@ -21,37 +20,33 @@ const tmpdir = require('../common/tmpdir');
const testDir = tmpdir.path;
tmpdir.refresh();

(async () => {
// Add a file to newly created folder to already watching folder
// Add a file to newly created folder to already watching folder

const rootDirectory = fs.mkdtempSync(testDir + path.sep);
const testDirectory = path.join(rootDirectory, 'test-3');
fs.mkdirSync(testDirectory);
const rootDirectory = fs.mkdtempSync(testDir + path.sep);
const testDirectory = path.join(rootDirectory, 'test-3');
fs.mkdirSync(testDirectory);

const filePath = path.join(testDirectory, 'folder-3');
const filePath = path.join(testDirectory, 'folder-3');

const childrenFile = 'file-4.txt';
const childrenAbsolutePath = path.join(filePath, childrenFile);
const childrenRelativePath = path.join(path.basename(filePath), childrenFile);
const childrenFile = 'file-4.txt';
const childrenAbsolutePath = path.join(filePath, childrenFile);
const childrenRelativePath = path.join(path.basename(filePath), childrenFile);

const watcher = fs.watch(testDirectory, { recursive: true });
let watcherClosed = false;
watcher.on('change', function(event, filename) {
assert.strictEqual(event, 'rename');
assert.ok(filename === path.basename(filePath) || filename === childrenRelativePath);
const watcher = fs.watch(testDirectory, { recursive: true });
let watcherClosed = false;
watcher.on('change', function(event, filename) {
assert.strictEqual(event, 'rename');
assert.ok(filename === path.basename(filePath) || filename === childrenRelativePath);

if (filename === childrenRelativePath) {
watcher.close();
watcherClosed = true;
}
});
if (filename === childrenRelativePath) {
watcher.close();
watcherClosed = true;
}
});

await setTimeout(common.platformTimeout(100));
fs.mkdirSync(filePath);
await setTimeout(common.platformTimeout(100));
fs.writeFileSync(childrenAbsolutePath, 'world');
fs.mkdirSync(filePath);
fs.writeFileSync(childrenAbsolutePath, 'world');

process.once('exit', function() {
assert(watcherClosed, 'watcher Object was not closed');
});
})().then(common.mustCall());
process.once('exit', function() {
assert(watcherClosed, 'watcher Object was not closed');
});
Loading

0 comments on commit 15a7f21

Please sign in to comment.