Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: async iterator improvements #31316

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 44 additions & 20 deletions lib/internal/streams/async_iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const {
} = primordials;

const finished = require('internal/streams/end-of-stream');
const destroyImpl = require('internal/streams/destroy');

const kLastResolve = Symbol('lastResolve');
const kLastReject = Symbol('lastReject');
Expand All @@ -20,6 +21,8 @@ const kLastPromise = Symbol('lastPromise');
const kHandlePromise = Symbol('handlePromise');
const kStream = Symbol('stream');

let Readable;

function createIterResult(value, done) {
return { value, done };
}
Expand Down Expand Up @@ -60,6 +63,31 @@ function wrapForNext(lastPromise, iter) {
const AsyncIteratorPrototype = ObjectGetPrototypeOf(
ObjectGetPrototypeOf(async function* () {}).prototype);

function finish(self, err) {
return new Promise((resolve, reject) => {
const stream = self[kStream];

// TODO(ronag): Remove this check once finished() handles
// already ended and/or destroyed streams.
const ended = stream.destroyed || stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);

if (ended) {
resolve(createIterResult(undefined, true));
return;
}

finished(stream, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
reject(err);
} else {
resolve(createIterResult(undefined, true));
}
});
destroyImpl.destroyer(stream, err);
});
}

const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
get stream() {
return this[kStream];
Expand Down Expand Up @@ -120,31 +148,27 @@ const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
},

return() {
return new Promise((resolve, reject) => {
const stream = this[kStream];

// TODO(ronag): Remove this check once finished() handles
// already ended and/or destroyed streams.
const ended = stream.destroyed || stream.readableEnded ||
(stream._readableState && stream._readableState.endEmitted);
if (ended) {
resolve(createIterResult(undefined, true));
return;
}
return finish(this);
},

finished(stream, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
reject(err);
} else {
resolve(createIterResult(undefined, true));
}
});
stream.destroy();
});
throw(err) {
return finish(this, err);
},
}, AsyncIteratorPrototype);

const createReadableStreamAsyncIterator = (stream) => {
if (typeof stream.read !== 'function') {
// v1 stream

if (!Readable) {
Readable = require('_stream_readable');
}

const src = stream;
stream = new Readable({ objectMode: true }).wrap(src);
finished(stream, (err) => destroyImpl.destroyer(src, err));
}
ronag marked this conversation as resolved.
Show resolved Hide resolved

const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, {
[kStream]: { value: stream, writable: true },
[kLastResolve]: { value: null, writable: true },
Expand Down
13 changes: 13 additions & 0 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,21 @@ function errorOrDestroy(stream, err, sync) {
}
}

function isRequest(stream) {
return stream && stream.setHeader && typeof stream.abort === 'function';
}

// Normalize destroy for legacy.
function destroyer(stream, err) {
// request.destroy just do .end - .abort is what we want
if (isRequest(stream)) return stream.abort();
if (isRequest(stream.req)) return stream.req.abort();
if (typeof stream.destroy === 'function') return stream.destroy(err);
if (typeof stream.close === 'function') return stream.close();
}

module.exports = {
destroyer,
destroy,
undestroy,
errorOrDestroy
Expand Down
41 changes: 5 additions & 36 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const {
let eos;

const { once } = require('internal/util');
const destroyImpl = require('internal/streams/destroy');
const {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_RETURN_VALUE,
Expand All @@ -24,18 +25,6 @@ let EE;
let PassThrough;
let createReadableStreamAsyncIterator;

function isRequest(stream) {
return stream && stream.setHeader && typeof stream.abort === 'function';
}

function destroyStream(stream, err) {
// request.destroy just do .end - .abort is what we want
if (isRequest(stream)) return stream.abort();
if (isRequest(stream.req)) return stream.req.abort();
if (typeof stream.destroy === 'function') return stream.destroy(err);
if (typeof stream.close === 'function') return stream.close();
}

function destroyer(stream, reading, writing, callback) {
callback = once(callback);

Expand All @@ -57,7 +46,7 @@ function destroyer(stream, reading, writing, callback) {
if (destroyed) return;
destroyed = true;

destroyStream(stream, err);
destroyImpl.destroyer(stream, err);

callback(err || new ERR_STREAM_DESTROYED('pipe'));
};
Expand Down Expand Up @@ -101,39 +90,19 @@ function makeAsyncIterable(val) {
return val;
} else if (isReadable(val)) {
// Legacy streams are not Iterable.
return _fromReadable(val);
return fromReadable(val);
} else {
throw new ERR_INVALID_ARG_TYPE(
'val', ['Readable', 'Iterable', 'AsyncIterable'], val);
}
}

async function* _fromReadable(val) {
async function* fromReadable(val) {
if (!createReadableStreamAsyncIterator) {
createReadableStreamAsyncIterator =
require('internal/streams/async_iterator');
}

try {
if (typeof val.read !== 'function') {
// createReadableStreamAsyncIterator does not support
// v1 streams. Convert it into a v2 stream.

if (!PassThrough) {
PassThrough = require('_stream_passthrough');
}

const pt = new PassThrough();
val
.on('error', (err) => pt.destroy(err))
.pipe(pt);
yield* createReadableStreamAsyncIterator(pt);
} else {
yield* createReadableStreamAsyncIterator(val);
}
} finally {
destroyStream(val);
}
yield* createReadableStreamAsyncIterator(val);
}

async function pump(iterable, writable, finish) {
Expand Down
85 changes: 84 additions & 1 deletion test/parallel/test-stream-readable-async-iterators.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
'use strict';

const common = require('../common');
const { Readable, Transform, PassThrough, pipeline } = require('stream');
const {
Stream,
Readable,
Transform,
PassThrough,
pipeline
} = require('stream');
const assert = require('assert');

async function tests() {
Expand All @@ -14,6 +20,61 @@ async function tests() {
AsyncIteratorPrototype);
}

{
// v1 stream

const stream = new Stream();
stream.destroy = common.mustCall();
process.nextTick(() => {
stream.emit('data', 'hello');
stream.emit('data', 'world');
stream.emit('end');
});

let res = '';
stream[Symbol.asyncIterator] = Readable.prototype[Symbol.asyncIterator];
for await (const d of stream) {
res += d;
}
assert.strictEqual(res, 'helloworld');
}

{
// v1 stream error

const stream = new Stream();
stream.close = common.mustCall();
process.nextTick(() => {
stream.emit('data', 0);
stream.emit('data', 1);
stream.emit('error', new Error('asd'));
});

const iter = Readable.prototype[Symbol.asyncIterator].call(stream);
ronag marked this conversation as resolved.
Show resolved Hide resolved
iter.next().catch(common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
}));
}

{
// Non standard stream cleanup

const readable = new Readable({ autoDestroy: false, read() {} });
readable.push('asd');
readable.push('asd');
readable.destroy = null;
readable.close = common.mustCall(() => {
readable.emit('close');
});

await (async () => {
for await (const d of readable) {
d;
return;
}
})();
}

{
const readable = new Readable({ objectMode: true, read() {} });
readable.push(0);
Expand Down Expand Up @@ -221,6 +282,28 @@ async function tests() {
assert.strictEqual(received, 1);
}

{
// Iterator throw.

const readable = new Readable({
objectMode: true,
read() {
this.push('hello');
}
});

readable.on('error', common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));

const it = readable[Symbol.asyncIterator]();
it.throw(new Error('kaboom')).catch(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));

assert.strictEqual(readable.destroyed, true);
}

{
console.log('destroyed by throw');
const readable = new Readable({
Expand Down