Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fs: allow overring fs for streams #29083

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1668,6 +1668,10 @@ changes:
- version: v2.3.0
pr-url: https://github.com/nodejs/node/pull/1845
description: The passed `options` object can be a string now.
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/REPLACEME
description: The `fs` options allow overriding the used `fs`
implementation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Late nit: it would be ideal to keep the versioning order the next time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

versioning order?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes entry should be at the top but it's at the bottom of the changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, my mistake. I though it was the other way around.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no clear rule for it so far. So it's different each time and it's just good to keep it consistent in the changes itself. We might want to define an order at some point.

-->

* `path` {string|Buffer|URL}
Expand All @@ -1682,7 +1686,8 @@ changes:
* `start` {integer}
* `end` {integer} **Default:** `Infinity`
* `highWaterMark` {integer} **Default:** `64 * 1024`
* Returns: {fs.ReadStream}
* `fs` {Object|null} **Default:** `null`
* Returns: {fs.ReadStream} See [Readable Stream][].

Unlike the 16 kb default `highWaterMark` for a readable stream, the stream
returned by this method has a default `highWaterMark` of 64 kb.
Expand All @@ -1709,6 +1714,10 @@ By default, the stream will not emit a `'close'` event after it has been
destroyed. This is the opposite of the default for other `Readable` streams.
Set the `emitClose` option to `true` to change this behavior.

By providing the `fs` option it is possible to override the corresponding `fs`
implementations for `open`, `read` and `close`. When providing the `fs` option,
you must override `open`, `close` and `read`.

```js
const fs = require('fs');
// Create a stream from some character device.
Expand Down Expand Up @@ -1762,6 +1771,10 @@ changes:
- version: v2.3.0
pr-url: https://github.com/nodejs/node/pull/1845
description: The passed `options` object can be a string now.
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/REPLACEME
description: The `fs` options allow overriding the used `fs`
implementation.
-->

* `path` {string|Buffer|URL}
Expand All @@ -1774,7 +1787,8 @@ changes:
* `autoClose` {boolean} **Default:** `true`
* `emitClose` {boolean} **Default:** `false`
* `start` {integer}
* Returns: {fs.WriteStream}
* `fs` {Object|null} **Default:** `null`
* Returns: {fs.WriteStream} See [Writable Stream][].
ronag marked this conversation as resolved.
Show resolved Hide resolved

`options` may also include a `start` option to allow writing data at
some position past the beginning of the file, allowed values are in the
Expand All @@ -1793,6 +1807,12 @@ By default, the stream will not emit a `'close'` event after it has been
destroyed. This is the opposite of the default for other `Writable` streams.
Set the `emitClose` option to `true` to change this behavior.

By providing the `fs` option it is possible to override the corresponding `fs`
implementations for `open`, `write`, `writev` and `close`. Overriding `write()`
without `writev()` can reduce performance as some optimizations (`_writev()`)
will be disabled. When providing the `fs` option, you must override `open`,
`close` and at least one of `write` and `writev`.

Like [`ReadStream`][], if `fd` is specified, [`WriteStream`][] will ignore the
`path` argument and will use the specified file descriptor. This means that no
`'open'` event will be emitted. `fd` should be blocking; non-blocking `fd`s
Expand Down Expand Up @@ -5514,6 +5534,7 @@ the file contents.
[`Number.MAX_SAFE_INTEGER`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER
[`ReadDirectoryChangesW`]: https://docs.microsoft.com/en-us/windows/desktop/api/winbase/nf-winbase-readdirectorychangesw
[`ReadStream`]: #fs_class_fs_readstream
[Readable Stream]: #stream_class_stream_readable
[`URL`]: url.html#url_the_whatwg_url_api
[`UV_THREADPOOL_SIZE`]: cli.html#cli_uv_threadpool_size_size
[`WriteStream`]: #fs_class_fs_writestream
Expand Down Expand Up @@ -5571,3 +5592,4 @@ the file contents.
[chcp]: https://ss64.com/nt/chcp.html
[inode]: https://en.wikipedia.org/wiki/Inode
[support of file system `flags`]: #fs_file_system_flags
[Writable Stream]: #stream_class_stream_writable
128 changes: 91 additions & 37 deletions lib/internal/fs/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const {
} = primordials;

const {
ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE,
ERR_STREAM_DESTROYED
} = require('internal/errors').codes;
Expand All @@ -28,6 +29,7 @@ const kIoDone = Symbol('kIoDone');
const kIsPerformingIO = Symbol('kIsPerformingIO');

const kMinPoolSpace = 128;
const kFs = Symbol('kFs');

let pool;
// It can happen that we expect to read a large chunk of data, and reserve
Expand Down Expand Up @@ -76,6 +78,23 @@ function ReadStream(path, options) {
options.emitClose = false;
}

this[kFs] = options.fs || fs;

if (typeof this[kFs].open !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
this[kFs].open);
}

if (typeof this[kFs].read !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.read', 'function',
this[kFs].read);
}

if (typeof this[kFs].close !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function',
this[kFs].close);
}

Readable.call(this, options);

// Path will be ignored when fd is specified, so it can be falsy
Expand Down Expand Up @@ -136,7 +155,7 @@ function _openReadFs(stream) {
return;
}

fs.open(stream.path, stream.flags, stream.mode, (er, fd) => {
stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
if (er) {
if (stream.autoClose) {
stream.destroy();
Expand Down Expand Up @@ -186,42 +205,43 @@ ReadStream.prototype._read = function(n) {

// the actual read.
this[kIsPerformingIO] = true;
fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) return this.emit(kIoDone, er);

if (er) {
if (this.autoClose) {
this.destroy();
}
this.emit('error', er);
} else {
let b = null;
// Now that we know how much data we have actually read, re-wind the
// 'used' field if we can, and otherwise allow the remainder of our
// reservation to be used as a new pool later.
if (start + toRead === thisPool.used && thisPool === pool) {
const newUsed = thisPool.used + bytesRead - toRead;
thisPool.used = roundUpToMultipleOf8(newUsed);
this[kFs].read(
this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) return this.emit(kIoDone, er);

if (er) {
if (this.autoClose) {
this.destroy();
}
this.emit('error', er);
} else {
// Round down to the next lowest multiple of 8 to ensure the new pool
// fragment start and end positions are aligned to an 8 byte boundary.
const alignedEnd = (start + toRead) & ~7;
const alignedStart = roundUpToMultipleOf8(start + bytesRead);
if (alignedEnd - alignedStart >= kMinPoolSpace) {
poolFragments.push(thisPool.slice(alignedStart, alignedEnd));
let b = null;
// Now that we know how much data we have actually read, re-wind the
// 'used' field if we can, and otherwise allow the remainder of our
// reservation to be used as a new pool later.
if (start + toRead === thisPool.used && thisPool === pool) {
const newUsed = thisPool.used + bytesRead - toRead;
thisPool.used = roundUpToMultipleOf8(newUsed);
} else {
// Round down to the next lowest multiple of 8 to ensure the new pool
// fragment start and end positions are aligned to an 8 byte boundary.
const alignedEnd = (start + toRead) & ~7;
const alignedStart = roundUpToMultipleOf8(start + bytesRead);
if (alignedEnd - alignedStart >= kMinPoolSpace) {
poolFragments.push(thisPool.slice(alignedStart, alignedEnd));
}
}
}

if (bytesRead > 0) {
this.bytesRead += bytesRead;
b = thisPool.slice(start, start + bytesRead);
}
if (bytesRead > 0) {
this.bytesRead += bytesRead;
b = thisPool.slice(start, start + bytesRead);
}

this.push(b);
}
});
this.push(b);
}
});

// Move the pool positions, and internal position for reading.
if (this.pos !== undefined)
Expand All @@ -245,7 +265,7 @@ ReadStream.prototype._destroy = function(err, cb) {
};

function closeFsStream(stream, cb, err) {
fs.close(stream.fd, (er) => {
stream[kFs].close(stream.fd, (er) => {
er = er || err;
cb(er);
stream.closed = true;
Expand Down Expand Up @@ -279,6 +299,40 @@ function WriteStream(path, options) {
options.emitClose = false;
}

this[kFs] = options.fs || fs;
if (typeof this[kFs].open !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
this[kFs].open);
}

if (!this[kFs].write && !this[kFs].writev) {
throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
this[kFs].write);
}

if (this[kFs].write && typeof this[kFs].write !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
this[kFs].write);
}

if (this[kFs].writev && typeof this[kFs].writev !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.writev', 'function',
this[kFs].writev);
}

if (typeof this[kFs].close !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function',
this[kFs].close);
}

// It's enough to override either, in which case only one will be used.
if (!this[kFs].write) {
this._write = null;
}
if (!this[kFs].writev) {
this._writev = null;
}

Writable.call(this, options);

// Path will be ignored when fd is specified, so it can be falsy
Expand Down Expand Up @@ -335,7 +389,7 @@ function _openWriteFs(stream) {
return;
}

fs.open(stream.path, stream.flags, stream.mode, (er, fd) => {
stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
if (er) {
if (stream.autoClose) {
stream.destroy();
Expand All @@ -361,7 +415,7 @@ WriteStream.prototype._write = function(data, encoding, cb) {
if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));

this[kIsPerformingIO] = true;
fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {
Expand Down Expand Up @@ -405,7 +459,7 @@ WriteStream.prototype._writev = function(data, cb) {
}

this[kIsPerformingIO] = true;
fs.writev(this.fd, chunks, this.pos, (er, bytes) => {
this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {
Expand Down
13 changes: 11 additions & 2 deletions test/parallel/test-fs-read-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ const fixtures = require('../common/fixtures');
const fn = fixtures.path('elipses.txt');
const rangeFile = fixtures.path('x.txt');

{
function test1(options) {
let paused = false;
let bytesRead = 0;

const file = fs.createReadStream(fn);
const file = fs.createReadStream(fn, options);
const fileSize = fs.statSync(fn).size;

assert.strictEqual(file.bytesRead, 0);
Expand Down Expand Up @@ -88,6 +88,15 @@ const rangeFile = fixtures.path('x.txt');
});
}

test1({});
test1({
fs: {
open: common.mustCall(fs.open),
read: common.mustCallAtLeast(fs.read, 1),
close: common.mustCall(fs.close),
}
});

{
const file = fs.createReadStream(fn, { encoding: 'utf8' });
file.length = 0;
Expand Down
38 changes: 38 additions & 0 deletions test/parallel/test-fs-write-stream-fs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
'use strict';
const common = require('../common');
const path = require('path');
const fs = require('fs');

const tmpdir = require('../common/tmpdir');
tmpdir.refresh();

{
const file = path.join(tmpdir.path, 'write-end-test0.txt');
const stream = fs.createWriteStream(file, {
fs: {
open: common.mustCall(fs.open),
write: common.mustCallAtLeast(fs.write, 1),
close: common.mustCall(fs.close),
}
});
stream.end('asd');
stream.on('close', common.mustCall());
}
ronag marked this conversation as resolved.
Show resolved Hide resolved


{
const file = path.join(tmpdir.path, 'write-end-test1.txt');
const stream = fs.createWriteStream(file, {
fs: {
open: common.mustCall(fs.open),
write: fs.write,
writev: common.mustCallAtLeast(fs.writev, 1),
close: common.mustCall(fs.close),
}
});
stream.write('asd');
stream.write('asd');
stream.write('asd');
stream.end();
stream.on('close', common.mustCall());
}