Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: fix stream.finished on Duplex #33133

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,25 @@ function eos(stream, opts, callback) {
if (opts.error !== false) stream.on('error', onerror);
stream.on('close', onclose);

const closed = (wState && wState.closed) || (rState && rState.closed) ||
(wState && wState.errorEmitted) || (rState && rState.errorEmitted) ||
(wState && wState.finished) || (rState && rState.endEmitted) ||
(rState && stream.req && stream.aborted);
const closed = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify this a bit? Perhaps at the very least group together similar dependencies, like:

const closed = (
  (wState && (wState.closed || wState.errorEmitted)) ||
  (rState && (rState.closed || rState.errorEmitted || (stream.req && stream.aborted))) ||
  (
    (!writable || (wState && wState.finished)) &&
    (!readable || (rState && rState.endEmitted))
  )
);

Copy link
Member Author

@ronag ronag Apr 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are grouped? See below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless there is a performance benefit to the above suggestion, I prefer @ronag's grouping because it's easier to read. Though it has more lines, it has less parentheses and the lines are ordered by the properties (e.g. closed) of the states. It reads like "is either state closed, or either state errorEmitted, or ..".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not personally benchmarked it, so I cannot say if it is measurable or not. However, it is reducing the number of duplicate checks so V8 should be performing less work.

However, my code suggestion was merely one possibility. I'm not opposed to rearranging the checks in other ways, such as introducing separate if statements, etc.

Copy link
Member Author

@ronag ronag Apr 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer the current formatting and believe any performance implication here would be negligible in practice. This is not a hot path as far as I'm aware. A future simplification could be to use the ?. operator.

(wState && wState.closed) ||
(rState && rState.closed) ||
(wState && wState.errorEmitted) ||
(rState && rState.errorEmitted) ||
(rState && stream.req && stream.aborted) ||
(
(!writable || (wState && wState.finished)) &&
(!readable || (rState && rState.endEmitted))
)
);

if (closed) {
// TODO(ronag): Re-throw error if errorEmitted?
// TODO(ronag): Throw premature close as if finished was called?
// before being closed? i.e. if closed but not errored, ended or finished.
// TODO(ronag): Throw some kind of error? Does it make sense
// to call finished() on a "finished" stream?
// TODO(ronag): willEmitClose?
process.nextTick(() => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For bonus points this could also be simplified to process.nextTick(callback); if you want to change it while we're in here. Either way is fine though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mscdex Strangely CI fails with your suggestion. Not sure why. Leaving it as is for the purposes of this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it's because we re-assign callback in the disposer.

callback();
});
Expand Down
73 changes: 72 additions & 1 deletion test/parallel/test-stream-finished.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
'use strict';

const common = require('../common');
const { Writable, Readable, Transform, finished, Duplex } = require('stream');
const {
Writable,
Readable,
Transform,
finished,
Duplex,
PassThrough
} = require('stream');
const assert = require('assert');
const EE = require('events');
const fs = require('fs');
Expand Down Expand Up @@ -396,3 +403,67 @@ testClosed((opts) => new Writable({ write() {}, ...opts }));
r.destroyed = true;
r.push(null);
}

{
// Regression https://github.com/nodejs/node/issues/33130

const response = new PassThrough();
setTimeout(() => response.write('chunk 1'), 500);
setTimeout(() => response.write('chunk 2'), 1000);
setTimeout(() => response.write('chunk 3'), 1500);
setTimeout(() => response.end(), 2000);
ronag marked this conversation as resolved.
Show resolved Hide resolved

class HelloWorld extends Duplex {
constructor(response) {
super();

this.response = response;
this.readMore = false;

response.once('end', () => {
this.push(null);
});

response.on('readable', () => {
if (this.readMore) {
this._read();
}
});
}

_read() {
const { response } = this;

this.readMore = true;

if (response.readableLength) {
this.readMore = false;
}

let data;
while ((data = response.read()) !== null) {
this.push(data);
}
}
}

const instance = new HelloWorld(response);
instance.end();

(async () => {
await new Promise((resolve) => setTimeout(resolve, 100));
ronag marked this conversation as resolved.
Show resolved Hide resolved

let res = '';
for await (const data of instance) {
res += data.toString();
ronag marked this conversation as resolved.
Show resolved Hide resolved
}

assert.strictEqual(res, 'chunk 1chunk 2chunk 3');
})();
ronag marked this conversation as resolved.
Show resolved Hide resolved
}

{
const p = new PassThrough();
p.end();
finished(p, common.mustNotCall());
}