Skip to content

Commit

Permalink
stream: implement ReadableStream.from
Browse files Browse the repository at this point in the history
Fixes: nodejs#48389
PR-URL: nodejs#48395
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Matthew Aitken <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
  • Loading branch information
debadree25 authored and Ceres6 committed Aug 14, 2023
1 parent 80d52fe commit 4407d26
Show file tree
Hide file tree
Showing 14 changed files with 1,036 additions and 2 deletions.
43 changes: 43 additions & 0 deletions doc/api/webstreams.md
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,49 @@ port1.onmessage = ({ data }) => {
port2.postMessage(stream, [stream]);
```

### `ReadableStream.from(iterable)`

<!-- YAML
added: REPLACEME
-->

* `iterable` {Iterable} Object implementing the `Symbol.asyncIterator` or
`Symbol.iterator` iterable protocol.

A utility method that creates a new {ReadableStream} from an iterable.

```mjs
import { ReadableStream } from 'node:stream/web';

async function* asyncIterableGenerator() {
yield 'a';
yield 'b';
yield 'c';
}

const stream = ReadableStream.from(asyncIterableGenerator());

for await (const chunk of stream)
console.log(chunk); // Prints 'a', 'b', 'c'
```

```cjs
const { ReadableStream } = require('node:stream/web');

async function* asyncIterableGenerator() {
yield 'a';
yield 'b';
yield 'c';
}

(async () => {
const stream = ReadableStream.from(asyncIterableGenerator());

for await (const chunk of stream)
console.log(chunk); // Prints 'a', 'b', 'c'
})();
```

### Class: `ReadableStreamDefaultReader`

<!-- YAML
Expand Down
59 changes: 59 additions & 0 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ const {
nonOpCancel,
nonOpPull,
nonOpStart,
getIterator,
iteratorNext,
kType,
kState,
} = require('internal/webstreams/util');
Expand Down Expand Up @@ -314,6 +316,10 @@ class ReadableStream {
return isReadableStreamLocked(this);
}

static from(iterable) {
return readableStreamFromIterable(iterable);
}

/**
* @param {any} [reason]
* @returns { Promise<void> }
Expand Down Expand Up @@ -1250,6 +1256,59 @@ const isReadableStreamBYOBReader =

// ---- ReadableStream Implementation

function readableStreamFromIterable(iterable) {
let stream;
const iteratorRecord = getIterator(iterable, 'async');

const startAlgorithm = nonOpStart;

async function pullAlgorithm() {
const nextResult = iteratorNext(iteratorRecord);
const nextPromise = PromiseResolve(nextResult);
return PromisePrototypeThen(nextPromise, (iterResult) => {
if (typeof iterResult !== 'object' || iterResult === null) {
throw new ERR_INVALID_STATE.TypeError(
'The promise returned by the iterator.next() method must fulfill with an object');
}
if (iterResult.done) {
readableStreamDefaultControllerClose(stream[kState].controller);
} else {
readableStreamDefaultControllerEnqueue(stream[kState].controller, iterResult.value);
}
});
}

async function cancelAlgorithm(reason) {
const iterator = iteratorRecord.iterator;
const returnMethod = iterator.return;
if (returnMethod === undefined) {
return PromiseResolve();
}
const returnResult = FunctionPrototypeCall(returnMethod, iterator, reason);
const returnPromise = PromiseResolve(returnResult);
return PromisePrototypeThen(returnPromise, (iterResult) => {
if (typeof iterResult !== 'object' || iterResult === null) {
throw new ERR_INVALID_STATE.TypeError(
'The promise returned by the iterator.return() method must fulfill with an object');
}
return undefined;
});
}

stream = new ReadableStream({
start: startAlgorithm,
pull: pullAlgorithm,
cancel: cancelAlgorithm,
}, {
size() {
return 1;
},
highWaterMark: 0,
});

return stream;
}

function readableStreamPipeTo(
source,
dest,
Expand Down
53 changes: 53 additions & 0 deletions lib/internal/webstreams/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ const {
PromiseReject,
ReflectGet,
Symbol,
SymbolAsyncIterator,
SymbolIterator,
Uint8Array,
} = primordials;

const {
codes: {
ERR_INVALID_ARG_VALUE,
ERR_OPERATION_FAILED,
ERR_INVALID_STATE,
},
} = require('internal/errors');

Expand Down Expand Up @@ -217,6 +220,54 @@ function lazyTransfer() {
return transfer;
}

function createAsyncFromSyncIterator(syncIteratorRecord) {
const syncIterable = {
[SymbolIterator]: () => syncIteratorRecord.iterator,
};

const asyncIterator = (async function* () {
return yield* syncIterable;
}());

const nextMethod = asyncIterator.next;
return { iterator: asyncIterator, nextMethod, done: false };
}

function getIterator(obj, kind = 'sync', method) {
if (method === undefined) {
if (kind === 'async') {
method = obj[SymbolAsyncIterator];
if (method === undefined) {
const syncMethod = obj[SymbolIterator];
const syncIteratorRecord = getIterator(obj, 'sync', syncMethod);
return createAsyncFromSyncIterator(syncIteratorRecord);
}
} else {
method = obj[SymbolIterator];
}
}

const iterator = FunctionPrototypeCall(method, obj);
if (typeof iterator !== 'object' || iterator === null) {
throw new ERR_INVALID_STATE.TypeError('The iterator method must return an object');
}
const nextMethod = iterator.next;
return { iterator, nextMethod, done: false };
}

function iteratorNext(iteratorRecord, value) {
let result;
if (value === undefined) {
result = FunctionPrototypeCall(iteratorRecord.nextMethod, iteratorRecord.iterator);
} else {
result = FunctionPrototypeCall(iteratorRecord.nextMethod, iteratorRecord.iterator, [value]);
}
if (typeof result !== 'object' || result === null) {
throw new ERR_INVALID_STATE.TypeError('The iterator.next() method must return an object');
}
return result;
}

module.exports = {
ArrayBufferViewGetBuffer,
ArrayBufferViewGetByteLength,
Expand All @@ -243,6 +294,8 @@ module.exports = {
nonOpPull,
nonOpStart,
nonOpWrite,
getIterator,
iteratorNext,
kType,
kState,
};
2 changes: 1 addition & 1 deletion test/fixtures/wpt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Last update:
- performance-timeline: https://github.com/web-platform-tests/wpt/tree/17ebc3aea0/performance-timeline
- resource-timing: https://github.com/web-platform-tests/wpt/tree/22d38586d0/resource-timing
- resources: https://github.com/web-platform-tests/wpt/tree/919874f84f/resources
- streams: https://github.com/web-platform-tests/wpt/tree/51750bc8d7/streams
- streams: https://github.com/web-platform-tests/wpt/tree/517e945bbf/streams
- url: https://github.com/web-platform-tests/wpt/tree/84782d9315/url
- user-timing: https://github.com/web-platform-tests/wpt/tree/5ae85bf826/user-timing
- wasm/jsapi: https://github.com/web-platform-tests/wpt/tree/cde25e7e3c/wasm/jsapi
Expand Down
15 changes: 15 additions & 0 deletions test/fixtures/wpt/streams/piping/general-addition.any.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// META: global=window,worker
'use strict';

promise_test(async t => {
/** @type {ReadableStreamDefaultController} */
var con;
let synchronous = false;
new ReadableStream({ start(c) { con = c }}, { highWaterMark: 0 }).pipeTo(
new WritableStream({ write() { synchronous = true; } })
)
// wait until start algorithm finishes
await Promise.resolve();
con.enqueue();
assert_false(synchronous, 'write algorithm must not run synchronously');
}, "enqueue() must not synchronously call write algorithm");
Loading

0 comments on commit 4407d26

Please sign in to comment.