diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index bc34a0effa5e77..1df891d30c4dc4 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -11,6 +11,7 @@ const { } = primordials; const finished = require('internal/streams/end-of-stream'); +const destroyImpl = require('internal/streams/destroy'); const kLastResolve = Symbol('lastResolve'); const kLastReject = Symbol('lastReject'); @@ -22,15 +23,6 @@ const kStream = Symbol('stream'); let Readable; -function destroy(stream, err) { - // request.destroy just do .end - .abort is what we want - if (typeof stream.abort === 'function') return stream.abort(); - if (stream.req && - typeof stream.req.abort === 'function') return stream.req.abort(); - if (typeof stream.destroy === 'function') return stream.destroy(err); - if (typeof stream.close === 'function') return stream.close(); -} - function createIterResult(value, done) { return { value, done }; } @@ -92,7 +84,7 @@ function finish(self, err) { resolve(createIterResult(undefined, true)); } }); - destroy(stream, err); + destroyImpl.destroyer(stream, err); }); } @@ -172,7 +164,7 @@ const createReadableStreamAsyncIterator = (stream) => { const src = stream; stream = new Readable({ objectMode: true }).wrap(src); - finished(stream, (err) => destroy(src, err)); + finished(stream, (err) => destroyImpl.destroyer(src, err)); } const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, { diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index ded75b255934c4..d18bdaecd03886 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -128,8 +128,21 @@ function errorOrDestroy(stream, err) { stream.emit('error', err); } +function isRequest(stream) { + return stream && stream.setHeader && typeof stream.abort === 'function'; +} + +// Normalize destroy for legacy. +function destroyer(stream, err) { + // request.destroy just do .end - .abort is what we want + if (isRequest(stream)) return stream.abort(); + if (isRequest(stream.req)) return stream.req.abort(); + if (typeof stream.destroy === 'function') return stream.destroy(err); + if (typeof stream.close === 'function') return stream.close(); +} module.exports = { + destroyer, destroy, undestroy, errorOrDestroy diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 29b6be7b864087..069ce8e1d5d874 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -12,6 +12,7 @@ const { let eos; const { once } = require('internal/util'); +const destroyImpl = require('internal/streams/destroy'); const { ERR_INVALID_ARG_TYPE, ERR_INVALID_RETURN_VALUE, @@ -28,14 +29,6 @@ function isRequest(stream) { return stream && stream.setHeader && typeof stream.abort === 'function'; } -function destroyStream(stream, err) { - // request.destroy just do .end - .abort is what we want - if (isRequest(stream)) return stream.abort(); - if (isRequest(stream.req)) return stream.req.abort(); - if (typeof stream.destroy === 'function') return stream.destroy(err); - if (typeof stream.close === 'function') return stream.close(); -} - function destroyer(stream, reading, writing, final, callback) { callback = once(callback); let destroyed = false; @@ -46,7 +39,7 @@ function destroyer(stream, reading, writing, final, callback) { destroyed = true; const readable = stream.readable || isRequest(stream); if (err || !final || !readable) { - destroyStream(stream, err); + destroyImpl.destroyer(stream, err); } callback(err); }); @@ -54,7 +47,7 @@ function destroyer(stream, reading, writing, final, callback) { return (err) => { if (destroyed) return; destroyed = true; - destroyStream(stream, err); + destroyImpl.destroyer(stream, err); callback(err || new ERR_STREAM_DESTROYED('pipe')); }; }