From e2f5bb7574e7e8189a3e38fc4d00c1213fd7160f Mon Sep 17 00:00:00 2001 From: Robert Nagy <ronagy@icloud.com> Date: Wed, 6 Jan 2021 17:59:39 +0100 Subject: [PATCH] http: align with stream.Writable Futher aligns OutgoingMessage with stream.Writable. In particular re-uses the construct/destroy logic from streams. Due to a lot of subtle assumptions this PR unfortunately touches a lot of different parts. PR-URL: https://github.com/nodejs/node/pull/36816 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> --- lib/_http_client.js | 52 +-- lib/_http_outgoing.js | 331 ++++++++++-------- lib/_http_server.js | 20 +- lib/internal/http.js | 1 - test/parallel/test-http-agent-keepalive.js | 17 +- .../test-http-outgoing-end-multiple.js | 20 +- ...http-outgoing-message-capture-rejection.js | 2 +- test/parallel/test-stream-pipeline.js | 34 -- 8 files changed, 245 insertions(+), 232 deletions(-) diff --git a/lib/_http_client.js b/lib/_http_client.js index 842a33a41fbe03..2ef04ef8de9997 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -58,7 +58,7 @@ const Agent = require('_http_agent'); const { Buffer } = require('buffer'); const { defaultTriggerAsyncIdScope } = require('internal/async_hooks'); const { URL, urlToHttpOptions, searchParamsSymbol } = require('internal/url'); -const { kOutHeaders, kNeedDrain } = require('internal/http'); +const { kOutHeaders } = require('internal/http'); const { connResetException, codes } = require('internal/errors'); const { ERR_HTTP_HEADERS_SENT, @@ -98,7 +98,7 @@ class HTTPClientAsyncResource { } function ClientRequest(input, options, cb) { - FunctionPrototypeCall(OutgoingMessage, this); + FunctionPrototypeCall(OutgoingMessage, this, { autoDestroy: false }); if (typeof input === 'string') { const urlStr = input; @@ -298,7 +298,7 @@ function ClientRequest(input, options, cb) { if (typeof options.createConnection === 'function') { const oncreate = once((err, socket) => { if (err) { - process.nextTick(() => this.emit('error', err)); + process.nextTick(() => emitError(this, err)); } else { this.onSocket(socket); } @@ -366,8 +366,8 @@ function emitAbortNT(req) { function ondrain() { const msg = this._httpMessage; - if (msg && !msg.finished && msg[kNeedDrain]) { - msg[kNeedDrain] = false; + if (msg && !msg.finished && msg._writableState.needDrain) { + msg._writableState.needDrain = false; msg.emit('drain'); } } @@ -393,8 +393,7 @@ function socketCloseListener() { if (!res.complete) { res.destroy(connResetException('aborted')); } - req._closed = true; - req.emit('close'); + emitClose(req); if (!res.aborted && res.readable) { res.push(null); } @@ -404,10 +403,9 @@ function socketCloseListener() { // receive a response. The error needs to // fire on the request. req.socket._hadError = true; - req.emit('error', connResetException('socket hang up')); + emitError(req, connResetException('socket hang up')); } - req._closed = true; - req.emit('close'); + emitClose(req); } // Too bad. That output wasn't getting written. @@ -431,7 +429,7 @@ function socketErrorListener(err) { // For Safety. Some additional errors might fire later on // and we need to make sure we don't double-fire the error event. req.socket._hadError = true; - req.emit('error', err); + emitError(req, err); } const parser = socket.parser; @@ -455,7 +453,7 @@ function socketOnEnd() { // If we don't have a response then we know that the socket // ended prematurely and we need to emit an error on the request. req.socket._hadError = true; - req.emit('error', connResetException('socket hang up')); + emitError(req, connResetException('socket hang up')); } if (parser) { parser.finish(); @@ -478,7 +476,7 @@ function socketOnData(d) { freeParser(parser, req, socket); socket.destroy(); req.socket._hadError = true; - req.emit('error', ret); + emitError(req, ret); } else if (parser.incoming && parser.incoming.upgrade) { // Upgrade (if status code 101) or CONNECT const bytesParsed = ret; @@ -510,9 +508,7 @@ function socketOnData(d) { socket.readableFlowing = null; req.emit(eventName, res, socket, bodyHead); - req.destroyed = true; - req._closed = true; - req.emit('close'); + emitClose(req); } else { // Requested Upgrade or used CONNECT method, but have no handler. socket.destroy(); @@ -697,8 +693,7 @@ function requestOnPrefinish() { } function emitFreeNT(req) { - req._closed = true; - req.emit('close'); + emitClose(req); if (req.socket) { req.socket.emit('free'); } @@ -779,10 +774,10 @@ function onSocketNT(req, socket, err) { err = connResetException('socket hang up'); } if (err) { - req.emit('error', err); + emitError(req, err); } req._closed = true; - req.emit('close'); + emitClose(req); } if (socket) { @@ -862,6 +857,23 @@ function setSocketTimeout(sock, msecs) { } } +function emitError(req, err) { + req.destroyed = true; + req._writableState.errored = err; + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; // eslint-disable-line no-unused-expressions + req._writableState.errorEmitted = true; + req.emit('error', err); +} + +function emitClose(req) { + req.destroyed = true; + req._closed = true; + req._writableState.closed = true; + req._writableState.closeEmitted = true; + req.emit('close'); +} + ClientRequest.prototype.setNoDelay = function setNoDelay(noDelay) { this._deferToConnect('setNoDelay', [noDelay]); }; diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index 37ef44f2f2eee2..59ef07e33e5aa4 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -27,6 +27,7 @@ const { ArrayPrototypeJoin, ArrayPrototypePush, ArrayPrototypeUnshift, + Error, FunctionPrototype, FunctionPrototypeBind, FunctionPrototypeCall, @@ -45,9 +46,9 @@ const { const { getDefaultHighWaterMark } = require('internal/streams/state'); const assert = require('internal/assert'); const EE = require('events'); -const Stream = require('stream'); +const { Stream, Writable } = require('stream'); const internalUtil = require('internal/util'); -const { kOutHeaders, utcDate, kNeedDrain } = require('internal/http'); +const { kOutHeaders, utcDate } = require('internal/http'); const { Buffer } = require('buffer'); const common = require('_http_common'); const checkIsHttpToken = common._checkIsHttpToken; @@ -76,11 +77,12 @@ const { } = require('internal/errors'); const { validateString } = require('internal/validators'); const { isUint8Array } = require('internal/util/types'); +const { construct, destroy } = require('internal/streams/destroy'); const HIGH_WATER_MARK = getDefaultHighWaterMark(); const { CRLF, debug } = common; -const kCorked = Symbol('corked'); +const kCorked = Symbol('kCorked'); const nop = FunctionPrototype; @@ -94,7 +96,7 @@ function isCookieField(s) { return s.length === 6 && StringPrototypeToLowerCase(s) === 'cookie'; } -function OutgoingMessage() { +function OutgoingMessage(opts) { FunctionPrototypeCall(Stream, this); // Queue that holds all currently pending data, until the response will be @@ -107,9 +109,6 @@ function OutgoingMessage() { // TCP socket and HTTP Parser and thus handle the backpressure. this.outputSize = 0; - this.writable = true; - this.destroyed = false; - this._last = false; this.chunkedEncoding = false; this.shouldKeepAlive = true; @@ -123,12 +122,9 @@ function OutgoingMessage() { this._contentLength = null; this._hasBody = true; this._trailer = ''; - this[kNeedDrain] = false; - this.finished = false; this._headerSent = false; this[kCorked] = 0; - this._closed = false; this.socket = null; this._header = null; @@ -137,42 +133,70 @@ function OutgoingMessage() { this._keepAliveTimeout = 0; this._onPendingData = nop; + + this._writableState = { + objectMode: false, + writable: true, + constructed: false, + corked: 0, + prefinished: false, + destroyed: false, + closed: false, + closeEmitted: false, + errored: null, + errorEmitted: false, + needDrain: false, + autoDestroy: opts?.autoDestroy == null ? true : false, + emitClose: true, + ended: false, + ending: false, + finished: false + }; + + construct(this, () => { + this._flush(); + }); } -ObjectSetPrototypeOf(OutgoingMessage.prototype, Stream.prototype); -ObjectSetPrototypeOf(OutgoingMessage, Stream); +ObjectSetPrototypeOf(OutgoingMessage.prototype, Writable.prototype); +ObjectSetPrototypeOf(OutgoingMessage, Writable); -ObjectDefineProperty(OutgoingMessage.prototype, 'writableFinished', { +ObjectDefineProperty(OutgoingMessage.prototype, 'finished', { get() { - return ( - this.finished && - this.outputSize === 0 && - (!this.socket || this.socket.writableLength === 0) - ); + return this._writableState.ended; + }, + set(value) { + this._writableState.ended = value; } }); -ObjectDefineProperty(OutgoingMessage.prototype, 'writableObjectMode', { +ObjectDefineProperty(OutgoingMessage.prototype, '_closed', { get() { - return false; + return this._writableState.closed; + }, + set(value) { + this._writableState.closed = value; } }); -ObjectDefineProperty(OutgoingMessage.prototype, 'writableLength', { +ObjectDefineProperty(OutgoingMessage.prototype, 'writable', { get() { - return this.outputSize + (this.socket ? this.socket.writableLength : 0); + // Compat. + return this._writableState.writable; + }, + set(value) { + this._writableState.writable = value; } }); -ObjectDefineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', { +ObjectDefineProperty(OutgoingMessage.prototype, 'writableLength', { get() { - return this.socket ? this.socket.writableHighWaterMark : HIGH_WATER_MARK; + return this.outputSize + (this.socket ? this.socket.writableLength : 0); } }); -ObjectDefineProperty(OutgoingMessage.prototype, 'writableCorked', { +ObjectDefineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', { get() { - const corked = this.socket ? this.socket.writableCorked : 0; - return corked + this[kCorked]; + return this.socket ? this.socket.writableHighWaterMark : HIGH_WATER_MARK; } }); @@ -239,7 +263,6 @@ ObjectDefineProperty(OutgoingMessage.prototype, '_headerNames', { }, 'OutgoingMessage.prototype._headerNames is deprecated', 'DEP0066') }); - OutgoingMessage.prototype._renderHeaders = function _renderHeaders() { if (this._header) { throw new ERR_HTTP_HEADERS_SENT('render'); @@ -261,6 +284,8 @@ OutgoingMessage.prototype._renderHeaders = function _renderHeaders() { }; OutgoingMessage.prototype.cork = function() { + this._writableState.corked++; + if (this.socket) { this.socket.cork(); } else { @@ -269,6 +294,10 @@ OutgoingMessage.prototype.cork = function() { }; OutgoingMessage.prototype.uncork = function() { + if (this._writableState.corked) { + this._writableState.corked--; + } + if (this.socket) { this.socket.uncork(); } else if (this[kCorked]) { @@ -277,7 +306,6 @@ OutgoingMessage.prototype.uncork = function() { }; OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) { - if (callback) { this.on('timeout', callback); } @@ -292,22 +320,48 @@ OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) { return this; }; +OutgoingMessage.prototype._construct = function(callback) { + if (this.socket) { + for (let n = 0; n < this[kCorked]; ++n) { + this.socket.cork(); + } + callback(); + } else { + // TODO(ronag): What if never assigned socket? + this.once('socket', function(socket) { + for (let n = 0; n < this[kCorked]; ++n) { + socket.cork(); + } + callback(); + }); + } +}; + +OutgoingMessage.prototype.destroy = destroy; -// It's possible that the socket will be destroyed, and removed from -// any messages, before ever calling this. In that case, just skip -// it, since something else is destroying this connection anyway. -OutgoingMessage.prototype.destroy = function destroy(error) { - if (this.destroyed) { - return this; +OutgoingMessage.prototype._destroy = function(err, callback) { + if (!this.writableEnded) { + this.aborted = true; + this.emit('aborted'); } - this.destroyed = true; + + // TODO(ronag): Why is this needed? + const cb = (err) => { + const triggerAsyncId = this.socket ? + this.socket[async_id_symbol] : undefined; + defaultTriggerAsyncIdScope(triggerAsyncId, callback, err); + }; if (this.socket) { - this.socket.destroy(error); - } else { - this.once('socket', function socketDestroyOnConnect(socket) { - socket.destroy(error); + Stream.finished(this.socket.destroy(err), (er) => { + if (er && er.code !== 'ERR_STREAM_PREMATURE_CLOSE') { + err = er; + } + + cb(err); }); + } else { + cb(err); } return this; @@ -667,16 +721,6 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'headersSent', { get: function() { return !!this._header; } }); -ObjectDefineProperty(OutgoingMessage.prototype, 'writableEnded', { - get: function() { return this.finished; } -}); - -ObjectDefineProperty(OutgoingMessage.prototype, 'writableNeedDrain', { - get: function() { - return !this.destroyed && !this.finished && this[kNeedDrain]; - } -}); - const crlf_buf = Buffer.from('\r\n'); OutgoingMessage.prototype.write = function write(chunk, encoding, callback) { if (typeof encoding === 'function') { @@ -684,30 +728,15 @@ OutgoingMessage.prototype.write = function write(chunk, encoding, callback) { encoding = null; } - const ret = write_(this, chunk, encoding, callback, false); + const ret = write_(this, chunk, encoding, callback, false) === true; if (!ret) - this[kNeedDrain] = true; + this._writableState.needDrain = true; return ret; }; -function onError(msg, err, callback) { - const triggerAsyncId = msg.socket ? msg.socket[async_id_symbol] : undefined; - defaultTriggerAsyncIdScope(triggerAsyncId, - process.nextTick, - emitErrorNt, - msg, - err, - callback); -} - -function emitErrorNt(msg, err, callback) { - callback(err); - if (typeof msg.emit === 'function' && !msg._closed) { - msg.emit('error', err); - } -} - function write_(msg, chunk, encoding, callback, fromEnd) { + const state = msg._writableState; + if (typeof callback !== 'function') callback = nop; @@ -724,19 +753,16 @@ function write_(msg, chunk, encoding, callback, fromEnd) { } let err; - if (msg.finished) { + if (state.ending) { err = new ERR_STREAM_WRITE_AFTER_END(); - } else if (msg.destroyed) { + } else if (state.destroyed) { err = new ERR_STREAM_DESTROYED('write'); } if (err) { - if (!msg.destroyed) { - onError(msg, err, callback); - } else { - process.nextTick(callback, err); - } - return false; + process.nextTick(callback, err); + msg.destroy(err); + return err; } if (!msg._header) { @@ -753,9 +779,9 @@ function write_(msg, chunk, encoding, callback, fromEnd) { return true; } - if (!fromEnd && msg.socket && !msg.socket.writableCorked) { - msg.socket.cork(); - process.nextTick(connectionCorkNT, msg.socket); + if (!fromEnd && !state.corked) { + msg.cork(); + process.nextTick(uncorkNT, msg); } let ret; @@ -773,8 +799,8 @@ function write_(msg, chunk, encoding, callback, fromEnd) { } -function connectionCorkNT(conn) { - conn.uncork(); +function uncorkNT(msg) { + msg.uncork(); } @@ -805,12 +831,24 @@ OutgoingMessage.prototype.addTrailers = function addTrailers(headers) { } }; -function onFinish(outmsg) { - if (outmsg && outmsg.socket && outmsg.socket._hadError) return; - outmsg.emit('finish'); +function onFinish(err) { + const state = this._writableState; + + if (err || state.errored || state.finished) { + return; + } + + state.finished = true; + this.emit('finish'); + + if (state.autoDestroy) { + this.destroy(); + } } OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { + const state = this._writableState; + if (typeof chunk === 'function') { callback = chunk; chunk = null; @@ -820,74 +858,90 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { encoding = null; } + this.cork(); + + let err; + if (chunk) { - if (this.finished) { - onError(this, - new ERR_STREAM_WRITE_AFTER_END(), - typeof callback !== 'function' ? nop : callback); - return this; + const ret = write_(this, chunk, encoding, null, true); + if (ret instanceof Error) { + err = ret; } + } - if (this.socket) { - this.socket.cork(); - } + if (err) { + // Do nothing... + } else if (!state.errored && !state.ending) { + state.ending = true; - write_(this, chunk, encoding, null, true); - } else if (this.finished) { - if (typeof callback === 'function') { - if (!this.writableFinished) { - this.on('finish', callback); - } else { - callback(new ERR_STREAM_ALREADY_FINISHED('end')); - } - } - return this; - } else if (!this._header) { - if (this.socket) { - this.socket.cork(); + if (!this._header) { + this._contentLength = 0; + this._implicitHeader(); } - this._contentLength = 0; - this._implicitHeader(); - } + const finish = FunctionPrototypeBind(onFinish, this); - if (typeof callback === 'function') - this.once('finish', callback); + state.finalCalled = true; - const finish = FunctionPrototypeBind(onFinish, undefined, this); + if (this._hasBody && this.chunkedEncoding) { + this._send('0\r\n' + this._trailer + '\r\n', 'latin1', finish); + } else { + // Force a flush, HACK. + this._send('', 'latin1', finish); + } - if (this._hasBody && this.chunkedEncoding) { - this._send('0\r\n' + this._trailer + '\r\n', 'latin1', finish); - } else { - // Force a flush, HACK. - this._send('', 'latin1', finish); - } + while (state.corked) { + // Fully uncork connection on end(). + this.uncork(); + } - if (this.socket) { - // Fully uncork connection on end(). - this.socket._writableState.corked = 1; - this.socket.uncork(); - } - this[kCorked] = 0; + state.ended = true; - this.finished = true; + // There is the first message on the outgoing queue, and we've sent + // everything to the socket. + debug('outgoing message end.'); + if (this.outputData.length === 0 && this.socket?._httpMessage === this) { + this._finish(); + } + } else if (state.finished) { + err = new ERR_STREAM_ALREADY_FINISHED('end'); + } else if (state.destroyed) { + err = new ERR_STREAM_DESTROYED('end'); + } - // There is the first message on the outgoing queue, and we've sent - // everything to the socket. - debug('outgoing message end.'); - if (this.outputData.length === 0 && - this.socket && - this.socket._httpMessage === this) { - this._finish(); + if (typeof callback === 'function') { + if (err || state.finished) { + process.nextTick(callback, err); + } else { + onFinished(this, callback); + } } + this.uncork(); + return this; }; +function onFinished(msg, callback) { + const onDone = (err) => { + msg + .off('finish', onDone) + .off(EE.errorMonitor, onDone); + callback(err); + }; + msg + .on('finish', onDone) + .on(EE.errorMonitor, onDone); +} OutgoingMessage.prototype._finish = function _finish() { - assert(this.socket); - this.emit('prefinish'); + const state = this._writableState; + + if (!state.prefinished) { + assert(this.socket); + state.prefinished = true; + this.emit('prefinish'); + } }; @@ -920,19 +974,14 @@ OutgoingMessage.prototype._flush = function _flush() { if (this.finished) { // This is a queue to the server or client to bring in the next this. this._finish(); - } else if (ret && this[kNeedDrain]) { - this[kNeedDrain] = false; + } else if (ret && this._writableState.needDrain) { + this._writableState.needDrain = false; this.emit('drain'); } } }; OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) { - while (this[kCorked]) { - this[kCorked]--; - socket.cork(); - } - const outputLength = this.outputData.length; if (outputLength <= 0) return undefined; diff --git a/lib/_http_server.js b/lib/_http_server.js index dac5fedf433533..ff6ad0b1897deb 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -59,7 +59,6 @@ const { const { OutgoingMessage } = require('_http_outgoing'); const { kOutHeaders, - kNeedDrain, emitStatistics } = require('internal/http'); const { @@ -227,11 +226,7 @@ function onServerResponseClose() { // Ergo, we need to deal with stale 'close' events and handle the case // where the ServerResponse object has already been deconstructed. // Fortunately, that requires only a single if check. :-) - if (this._httpMessage) { - this._httpMessage.destroyed = true; - this._httpMessage._closed = true; - this._httpMessage.emit('close'); - } + this._httpMessage?.destroy(); } ServerResponse.prototype.assignSocket = function assignSocket(socket) { @@ -240,7 +235,6 @@ ServerResponse.prototype.assignSocket = function assignSocket(socket) { socket.on('close', onServerResponseClose); this.socket = socket; this.emit('socket', socket); - this._flush(); }; ServerResponse.prototype.detachSocket = function detachSocket(socket) { @@ -556,8 +550,8 @@ function socketOnDrain(socket, state) { } const msg = socket._httpMessage; - if (msg && !msg.finished && msg[kNeedDrain]) { - msg[kNeedDrain] = false; + if (msg && !msg.finished && msg._writableState.needDrain) { + msg._writableState.needDrain = false; msg.emit('drain'); } } @@ -584,7 +578,6 @@ function abortIncoming(incoming) { const req = ArrayPrototypeShift(incoming); req.destroy(connResetException('aborted')); } - // Abort socket._httpMessage ? } function socketOnEnd(server, socket, parser, state) { @@ -804,7 +797,6 @@ function resOnFinish(req, res, socket, state, server) { res.detachSocket(socket); clearIncoming(req); - process.nextTick(emitCloseNT, res); if (res._last) { if (typeof socket.destroySoon === 'function') { @@ -826,12 +818,6 @@ function resOnFinish(req, res, socket, state, server) { } } -function emitCloseNT(self) { - self.destroyed = true; - self._closed = true; - self.emit('close'); -} - // The following callback is issued after the headers have been read on a // new message. In this callback we setup the response object and pass it // to the user. diff --git a/lib/internal/http.js b/lib/internal/http.js index f87dc8aa6cd01b..28c4b245ab559f 100644 --- a/lib/internal/http.js +++ b/lib/internal/http.js @@ -44,7 +44,6 @@ function emitStatistics(statistics) { module.exports = { kOutHeaders: Symbol('kOutHeaders'), - kNeedDrain: Symbol('kNeedDrain'), utcDate, emitStatistics }; diff --git a/test/parallel/test-http-agent-keepalive.js b/test/parallel/test-http-agent-keepalive.js index a1f902fab091e1..10c9e2372ddec4 100644 --- a/test/parallel/test-http-agent-keepalive.js +++ b/test/parallel/test-http-agent-keepalive.js @@ -105,17 +105,20 @@ function remoteClose() { function remoteError() { // Remote server will destroy the socket const req = get('/error', common.mustNotCall()); + req.on('socket', common.mustCall((socket) => { + socket.on('end', common.mustCall(() => { + assert.strictEqual(agent.sockets[name].length, 1); + assert.strictEqual(agent.freeSockets[name], undefined); + })); + })); req.on('error', common.mustCall((err) => { assert(err); assert.strictEqual(err.message, 'socket hang up'); - assert.strictEqual(agent.sockets[name].length, 1); + })); + req.on('close', common.mustCall((err) => { + assert.strictEqual(agent.sockets[name], undefined); assert.strictEqual(agent.freeSockets[name], undefined); - // Wait socket 'close' event emit - setTimeout(common.mustCall(() => { - assert.strictEqual(agent.sockets[name], undefined); - assert.strictEqual(agent.freeSockets[name], undefined); - server.close(); - }), common.platformTimeout(1)); + server.close(); })); } diff --git a/test/parallel/test-http-outgoing-end-multiple.js b/test/parallel/test-http-outgoing-end-multiple.js index 696443f9390cd0..8a3a42aa6ce77e 100644 --- a/test/parallel/test-http-outgoing-end-multiple.js +++ b/test/parallel/test-http-outgoing-end-multiple.js @@ -3,23 +3,21 @@ const common = require('../common'); const assert = require('assert'); const http = require('http'); -const onWriteAfterEndError = common.mustCall((err) => { - assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END'); -}, 2); - const server = http.createServer(common.mustCall(function(req, res) { res.end('testing ended state', common.mustCall()); assert.strictEqual(res.writableCorked, 0); res.end(common.mustCall((err) => { - assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED'); + assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END'); })); - assert.strictEqual(res.writableCorked, 0); - res.end('end', onWriteAfterEndError); - assert.strictEqual(res.writableCorked, 0); - res.on('error', onWriteAfterEndError); - res.on('finish', common.mustCall(() => { + res.end('end', common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END'); + })); + res.on('error', common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END'); + })); + res.on('close', common.mustCall(() => { res.end(common.mustCall((err) => { - assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED'); + assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED'); server.close(); })); })); diff --git a/test/parallel/test-http-outgoing-message-capture-rejection.js b/test/parallel/test-http-outgoing-message-capture-rejection.js index 9fe9bdb21331d8..763f125181e86d 100644 --- a/test/parallel/test-http-outgoing-message-capture-rejection.js +++ b/test/parallel/test-http-outgoing-message-capture-rejection.js @@ -14,7 +14,7 @@ events.captureRejections = true; throw _err; })); - res.socket.on('error', common.mustCall((err) => { + res.on('error', common.mustCall((err) => { assert.strictEqual(err, _err); })); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 86f9006d83c430..3ad8d82e785b95 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -253,40 +253,6 @@ const net = require('net'); }); } -{ - const server = http.createServer((req, res) => { - pipeline(req, res, common.mustSucceed()); - }); - - server.listen(0, () => { - const req = http.request({ - port: server.address().port - }); - - let sent = 0; - const rs = new Readable({ - read() { - if (sent++ > 10) { - return; - } - rs.push('hello'); - } - }); - - pipeline(rs, req, common.mustCall(() => { - server.close(); - })); - - req.on('response', (res) => { - let cnt = 10; - res.on('data', () => { - cnt--; - if (cnt === 0) rs.destroy(); - }); - }); - }); -} - { const makeTransform = () => { const tr = new Transform({