From de22adde68e6fd8a406cc8bdd7f14e91d3bc6c1c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 9 Aug 2019 12:04:52 +0200 Subject: [PATCH] stream: throw on multiple async iterators --- doc/api/errors.md | 5 +++ lib/internal/errors.js | 1 + lib/internal/streams/async_iterator.js | 11 +++++++ .../test-readline-async-iterators-destroy.js | 19 +++-------- .../parallel/test-readline-async-iterators.js | 33 +------------------ .../test-stream-readable-async-iterators.js | 19 +++++++++-- 6 files changed, 40 insertions(+), 48 deletions(-) diff --git a/doc/api/errors.md b/doc/api/errors.md index e809b26f6d0d9c..f594627f04f25b 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -1703,6 +1703,11 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream. A stream method was called that cannot complete because the stream was destroyed using `stream.destroy()`. + +### ERR_STREAM_ITERATOR_EXISTS + +Stream cannot be consumed by multiple iterators. + ### ERR_STREAM_NULL_VALUES diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 6e3bfb29c03f27..fab65de57c1004 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -1130,6 +1130,7 @@ E('ERR_SRI_PARSE', SyntaxError); E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error); E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error); +E('ERR_STREAM_ITERATOR_EXISTS', 'Iterator already exists', Error); E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError); E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error); E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error); diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 89a1dae7fdfb02..ad108b49c636fd 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -4,6 +4,10 @@ const { Object } = primordials; const finished = require('internal/streams/end-of-stream'); +const { + ERR_STREAM_ITERATOR_EXISTS +} = require('internal/errors').codes; + const kLastResolve = Symbol('lastResolve'); const kLastReject = Symbol('lastReject'); const kError = Symbol('error'); @@ -11,6 +15,7 @@ const kEnded = Symbol('ended'); const kLastPromise = Symbol('lastPromise'); const kHandlePromise = Symbol('handlePromise'); const kStream = Symbol('stream'); +const kIterator = Symbol('iterator'); function createIterResult(value, done) { return { value, done }; @@ -126,6 +131,10 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({ }, AsyncIteratorPrototype); const createReadableStreamAsyncIterator = (stream) => { + if (stream[kIterator]) { + throw new ERR_STREAM_ITERATOR_EXISTS(); + } + const iterator = Object.create(ReadableStreamAsyncIteratorPrototype, { [kStream]: { value: stream, writable: true }, [kLastResolve]: { value: null, writable: true }, @@ -182,6 +191,8 @@ const createReadableStreamAsyncIterator = (stream) => { stream.on('readable', onReadable.bind(null, iterator)); + stream[kIterator] = true; + return iterator; }; diff --git a/test/parallel/test-readline-async-iterators-destroy.js b/test/parallel/test-readline-async-iterators-destroy.js index 746981a1ae7cfd..907eb653cec2d4 100644 --- a/test/parallel/test-readline-async-iterators-destroy.js +++ b/test/parallel/test-readline-async-iterators-destroy.js @@ -55,23 +55,14 @@ async function testMutualDestroy() { crlfDelay: Infinity }); - const expectedLines = fileContent.split('\n'); - if (expectedLines[expectedLines.length - 1] === '') { - expectedLines.pop(); - } - expectedLines.splice(2); - - const iteratedLines = []; for await (const k of rli) { - iteratedLines.push(k); - for await (const l of rli) { - iteratedLines.push(l); - break; + try { + for await (const l of rli) { + } + } catch (err) { + assert.strictEqual(err.code, 'ERR_STREAM_ITERATOR_EXISTS'); } - assert.deepStrictEqual(iteratedLines, expectedLines); } - - assert.deepStrictEqual(iteratedLines, expectedLines); } } diff --git a/test/parallel/test-readline-async-iterators.js b/test/parallel/test-readline-async-iterators.js index c3883e4f369fde..7d43ef77cb6df1 100644 --- a/test/parallel/test-readline-async-iterators.js +++ b/test/parallel/test-readline-async-iterators.js @@ -43,35 +43,4 @@ async function testSimple() { } } -async function testMutual() { - for (const fileContent of testContents) { - fs.writeFileSync(filename, fileContent); - - const readable = fs.createReadStream(filename); - const rli = readline.createInterface({ - input: readable, - crlfDelay: Infinity - }); - - const expectedLines = fileContent.split('\n'); - if (expectedLines[expectedLines.length - 1] === '') { - expectedLines.pop(); - } - const iteratedLines = []; - let iterated = false; - for await (const k of rli) { - // This outer loop should only iterate once. - assert.strictEqual(iterated, false); - iterated = true; - - iteratedLines.push(k); - for await (const l of rli) { - iteratedLines.push(l); - } - assert.deepStrictEqual(iteratedLines, expectedLines); - } - assert.deepStrictEqual(iteratedLines, expectedLines); - } -} - -testSimple().then(testMutual).then(common.mustCall()); +testSimple().then(common.mustCall()); diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index 12971cb2363a80..a98e91b1b9bb4f 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -375,11 +375,26 @@ async function tests() { // eslint-disable-next-line no-unused-vars for await (const a of r) { } - // eslint-disable-next-line no-unused-vars - for await (const b of r) { + try { + // eslint-disable-next-line no-unused-vars + for await (const b of r) { + } + } catch (err) { + assert.strictEqual(err.code, 'ERR_STREAM_ITERATOR_EXISTS'); } } + { + console.log('creating multiple iterators'); + const r = new Readable(); + r[Symbol.asyncIterator](); + common.expectsError(() => { + r[Symbol.asyncIterator](); + }, { + code: 'ERR_STREAM_ITERATOR_EXISTS' + }); + } + { console.log('destroy mid-stream does not error'); const r = new Readable({