From 18695595e13a0041f7619a1f96d2b8075397c1a8 Mon Sep 17 00:00:00 2001
From: Debadree Chatterjee <debadree333@gmail.com>
Date: Thu, 19 Jan 2023 14:38:50 +0530
Subject: [PATCH] stream: fix pipeline calling end on destination more than
 once

Fixes: https://github.com/nodejs/node/issues/42866
PR-URL: https://github.com/nodejs/node/pull/46226
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Minwoo Jung <nodecorelab@gmail.com>
---
 lib/internal/streams/pipeline.js      |  2 +-
 test/parallel/test-stream-pipeline.js | 35 +++++++++++++++++++++++++++
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js
index 77520a14d50a6f..b8a756330536c5 100644
--- a/lib/internal/streams/pipeline.js
+++ b/lib/internal/streams/pipeline.js
@@ -353,7 +353,7 @@ function pipe(src, dst, finish, { end }) {
     }
   });
 
-  src.pipe(dst, { end });
+  src.pipe(dst, { end: false }); // If end is true we already will have a listener to end dst.
 
   if (end) {
     // Compat. Before node v10.12.0 stdio used to throw an error so
diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js
index 529b18386e25a6..65ef5164c14b4c 100644
--- a/test/parallel/test-stream-pipeline.js
+++ b/test/parallel/test-stream-pipeline.js
@@ -1556,3 +1556,38 @@ const tsp = require('timers/promises');
     })
   );
 }
+
+{
+  class CustomReadable extends Readable {
+    _read() {
+      this.push('asd');
+      this.push(null);
+    }
+  }
+
+  class CustomWritable extends Writable {
+    constructor() {
+      super();
+      this.endCount = 0;
+      this.str = '';
+    }
+
+    _write(chunk, enc, cb) {
+      this.str += chunk;
+      cb();
+    }
+
+    end() {
+      this.endCount += 1;
+      super.end();
+    }
+  }
+
+  const readable = new CustomReadable();
+  const writable = new CustomWritable();
+
+  pipeline(readable, writable, common.mustSucceed(() => {
+    assert.strictEqual(writable.str, 'asd');
+    assert.strictEqual(writable.endCount, 1);
+  }));
+}