Skip to content

Commit

Permalink
fs: add FileHandle.prototype.readableWebStream()
Browse files Browse the repository at this point in the history
Adds an experimental `readableWebStream()` method to `FileHandle` that
returns a web `ReadableStream`

Signed-off-by: James M Snell <[email protected]>

PR-URL: #39331
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Anna Henningsen <[email protected]>
  • Loading branch information
jasnell committed Jul 15, 2021
1 parent e2a6399 commit 6cd12be
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 3 deletions.
46 changes: 46 additions & 0 deletions doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,52 @@ Reads data from the file and stores that in the given buffer.
If the file is not modified concurrently, the end-of-file is reached when the
number of bytes read is zero.
#### `filehandle.readableWebStream()`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
* Returns: {ReadableStream}
Returns a `ReadableStream` that may be used to read the files data.
An error will be thrown if this method is called more than once or is called
after the `FileHandle` is closed or closing.
```mjs
import {
open,
} from 'node:fs/promises';

const file = await open('./some/file/to/read');

for await (const chunk of file.readableWebStream())
console.log(chunk);

await file.close();
```
```cjs
const {
open,
} = require('fs/promises');

(async () => {
const file = await open('./some/file/to/read');

for await (const chunk of file.readableWebStream())
console.log(chunk);

await file.close();
})();
```
While the `ReadableStream` will read the file to completion, it will not
close the `FileHandle` automatically. User code must still call the
`fileHandle.close()` method.
#### `filehandle.readFile(options)`
<!-- YAML
added: v10.0.0
Expand Down
39 changes: 37 additions & 2 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const {
codes: {
ERR_FS_FILE_TOO_LARGE,
ERR_INVALID_ARG_VALUE,
ERR_INVALID_STATE,
ERR_METHOD_NOT_IMPLEMENTED,
},
AbortError,
Expand Down Expand Up @@ -90,12 +91,21 @@ const kCloseResolve = Symbol('kCloseResolve');
const kCloseReject = Symbol('kCloseReject');
const kRef = Symbol('kRef');
const kUnref = Symbol('kUnref');
const kLocked = Symbol('kLocked');

const { kUsePromises } = binding;
const {
JSTransferable, kDeserialize, kTransfer, kTransferList
} = require('internal/worker/js_transferable');

const {
newReadableStreamFromStreamBase,
} = require('internal/webstreams/adapters');

const {
readableStreamCancel,
} = require('internal/webstreams/readablestream');

const getDirectoryEntriesPromise = promisify(getDirents);
const validateRmOptionsPromise = promisify(validateRmOptions);

Expand Down Expand Up @@ -209,6 +219,33 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
return this[kClosePromise];
}

/**
* @typedef {import('../webstreams/readablestream').ReadableStream
* } ReadableStream
* @returns {ReadableStream}
*/
readableWebStream() {
if (this[kFd] === -1)
throw new ERR_INVALID_STATE('The FileHandle is closed');
if (this[kClosePromise])
throw new ERR_INVALID_STATE('The FileHandle is closing');
if (this[kLocked])
throw new ERR_INVALID_STATE('The FileHandle is locked');
this[kLocked] = true;

const readable = newReadableStreamFromStreamBase(
this[kHandle],
undefined,
{ ondone: () => this[kUnref]() });

this[kRef]();
this.once('close', () => {
readableStreamCancel(readable);
});

return readable;
}

[kTransfer]() {
if (this[kClosePromise] || this[kRefs] > 1) {
throw lazyDOMException('Cannot transfer FileHandle while in use',
Expand Down Expand Up @@ -788,8 +825,6 @@ module.exports = {
appendFile,
readFile,
watch,

kHandle,
},

FileHandle,
Expand Down
21 changes: 20 additions & 1 deletion lib/internal/webstreams/adapters.js
Original file line number Diff line number Diff line change
Expand Up @@ -859,12 +859,20 @@ function newWritableStreamFromStreamBase(streamBase, strategy) {
* @param {QueuingStrategy} strategy
* @returns {ReadableStream}
*/
function newReadableStreamFromStreamBase(streamBase, strategy) {
function newReadableStreamFromStreamBase(streamBase, strategy, options = {}) {
validateObject(streamBase, 'streamBase');
validateObject(options, 'options');

const {
ondone = () => {},
} = options;

if (typeof streamBase.onread === 'function')
throw new ERR_INVALID_STATE('StreamBase already has a consumer');

if (typeof ondone !== 'function')
throw new ERR_INVALID_ARG_TYPE('options.ondone', 'Function', ondone);

let controller;

streamBase.onread = (arrayBuffer) => {
Expand All @@ -877,6 +885,11 @@ function newReadableStreamFromStreamBase(streamBase, strategy) {
if (nread === UV_EOF) {
controller.close();
streamBase.readStop();
try {
ondone();
} catch (error) {
controller.error(error);
}
return;
}

Expand All @@ -899,6 +912,12 @@ function newReadableStreamFromStreamBase(streamBase, strategy) {

cancel() {
const promise = createDeferredPromise();
try {
ondone();
} catch (error) {
promise.reject(error);
return promise.promise;
}
const req = new ShutdownWrap();
req.oncomplete = () => promise.resolve();
const err = streamBase.shutdown(req);
Expand Down
5 changes: 5 additions & 0 deletions test/parallel/test-bootstrap-modules.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ const expectedModules = new Set([
'NativeModule internal/util/inspect',
'NativeModule internal/util/iterable_weak_map',
'NativeModule internal/util/types',
'NativeModule internal/webstreams/util',
'NativeModule internal/webstreams/writablestream',
'NativeModule internal/webstreams/readablestream',
'NativeModule internal/webstreams/queuingstrategies',
'NativeModule internal/webstreams/adapters',
'NativeModule internal/validators',
'NativeModule internal/vm/module',
'NativeModule internal/worker/io',
Expand Down
87 changes: 87 additions & 0 deletions test/parallel/test-filehandle-readablestream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
'use strict';

const common = require('../common');
const assert = require('assert');

const {
readFileSync,
} = require('fs');

const {
open,
} = require('fs/promises');

const check = readFileSync(__filename, { encoding: 'utf8' });

// Make sure the ReadableStream works...
(async () => {
const dec = new TextDecoder();
const file = await open(__filename);
let data = '';
for await (const chunk of file.readableWebStream())
data += dec.decode(chunk);

assert.strictEqual(check, data);

assert.throws(() => file.readableWebStream(), {
code: 'ERR_INVALID_STATE',
});

await file.close();
})().then(common.mustCall());

// Make sure that acquiring a ReadableStream fails if the
// FileHandle is already closed.
(async () => {
const file = await open(__filename);
await file.close();

assert.throws(() => file.readableWebStream(), {
code: 'ERR_INVALID_STATE',
});
})().then(common.mustCall());

// Make sure that acquiring a ReadableStream fails if the
// FileHandle is already closing.
(async () => {
const file = await open(__filename);
file.close();

assert.throws(() => file.readableWebStream(), {
code: 'ERR_INVALID_STATE',
});
})().then(common.mustCall());

// Make sure the ReadableStream is closed when the underlying
// FileHandle is closed.
(async () => {
const file = await open(__filename);
const readable = file.readableWebStream();
const reader = readable.getReader();
file.close();
await reader.closed;
})().then(common.mustCall());

// Make sure the ReadableStream is closed when the underlying
// FileHandle is closed.
(async () => {
const file = await open(__filename);
const readable = file.readableWebStream();
file.close();
const reader = readable.getReader();
await reader.closed;
})().then(common.mustCall());

// Make sure that the FileHandle is properly marked "in use"
// when a ReadableStream has been acquired for it.
(async () => {
const file = await open(__filename);
file.readableWebStream();
const mc = new MessageChannel();
mc.port1.onmessage = common.mustNotCall();
assert.throws(() => mc.port2.postMessage(file, [file]), {
code: 25 // DataCloneError
});
mc.port1.close();
await file.close();
})().then(common.mustCall());

0 comments on commit 6cd12be

Please sign in to comment.