diff --git a/doc/api/stream.md b/doc/api/stream.md
index c2a257cb5a5af4..d5c255177494d6 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -1198,7 +1198,8 @@ on the type of stream being created, as detailed in the chart below:
[Writable](#stream_class_stream_writable)
- [_write][stream-_write] , [_writev][stream-_writev]
+ [_write][stream-_write] , [_writev][stream-_writev] ,
+ [_final][stream-_final]
|
@@ -1209,7 +1210,8 @@ on the type of stream being created, as detailed in the chart below:
[Duplex](#stream_class_stream_duplex)
- [_read][stream-_read] , [_write][stream-_write] , [_writev][stream-_writev]
+ [_read][stream-_read] , [_write][stream-_write] , [_writev][stream-_writev] ,
+ [_final][stream-_final]
|
@@ -1220,7 +1222,8 @@ on the type of stream being created, as detailed in the chart below:
[Transform](#stream_class_stream_transform)
- [_transform][stream-_transform] , [_flush][stream-_flush]
+ [_transform][stream-_transform] , [_flush][stream-_flush] ,
+ [_final][stream-_final]
|
@@ -1279,6 +1282,8 @@ constructor and implement the `writable._write()` method. The
[`stream._writev()`][stream-_writev] method.
* `destroy` {Function} Implementation for the
[`stream._destroy()`][writable-_destroy] method.
+ * `final` {Function} Implementation for the
+ [`stream._final()`][stream-_final] method.
For example:
@@ -1398,6 +1403,22 @@ added: REPLACEME
* `callback` {Function} A callback function that takes an optional error argument
which is invoked when the writable is destroyed.
+#### writable.\_final(callback)
+
+
+* `callback` {Function} Call this function (optionally with an error
+ argument) when you are done writing any remaining data.
+
+Note: `_final()` **must not** be called directly. It MAY be implemented
+by child classes, and if so, will be called by the internal Writable
+class methods only.
+
+This optional function will be called before the stream closes, delaying the
+`finish` event until `callback` is called. This is useful to close resources
+or write buffered data before a stream ends.
+
#### Errors While Writing
It is recommended that errors occurring during the processing of the
@@ -2115,6 +2136,7 @@ readable buffer so there is nothing for a user to consume.
[stream-_transform]: #stream_transform_transform_chunk_encoding_callback
[stream-_write]: #stream_writable_write_chunk_encoding_callback_1
[stream-_writev]: #stream_writable_writev_chunks_callback
+[stream-_final]: #stream_writable_final_callback
[stream-end]: #stream_writable_end_chunk_encoding_callback
[stream-pause]: #stream_readable_pause
[stream-push]: #stream_readable_push_chunk_encoding
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index 4e2a19f12c822f..8540d180c75ad7 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -58,6 +58,12 @@ function WritableState(options, stream) {
// cast to ints.
this.highWaterMark = Math.floor(this.highWaterMark);
+ // if _final has been called
+ this.finalCalled = false;
+
+ // if _final has been called
+ this.finalCalled = false;
+
// drain event flag.
this.needDrain = false;
// at the start of calling end()
@@ -199,6 +205,9 @@ function Writable(options) {
if (typeof options.destroy === 'function')
this._destroy = options.destroy;
+
+ if (typeof options.final === 'function')
+ this._final = options.final;
}
Stream.call(this);
@@ -520,23 +529,37 @@ function needFinish(state) {
!state.finished &&
!state.writing);
}
-
-function prefinish(stream, state) {
- if (!state.prefinished) {
+function callFinal(stream, state) {
+ stream._final((err) => {
+ state.pendingcb--;
+ if (err) {
+ stream.emit('error', err);
+ }
state.prefinished = true;
stream.emit('prefinish');
+ finishMaybe(stream, state);
+ });
+}
+function prefinish(stream, state) {
+ if (!state.prefinished && !state.finalCalled) {
+ if (typeof stream._final === 'function') {
+ state.pendingcb++;
+ state.finalCalled = true;
+ process.nextTick(callFinal, stream, state);
+ } else {
+ state.prefinished = true;
+ stream.emit('prefinish');
+ }
}
}
function finishMaybe(stream, state) {
var need = needFinish(state);
if (need) {
+ prefinish(stream, state);
if (state.pendingcb === 0) {
- prefinish(stream, state);
state.finished = true;
stream.emit('finish');
- } else {
- prefinish(stream, state);
}
}
return need;
diff --git a/test/parallel/test-stream-readable-constructor-set-methods.js b/test/parallel/test-stream-readable-constructor-set-methods.js
index e5e3114de456db..1b9f0496b991ec 100644
--- a/test/parallel/test-stream-readable-constructor-set-methods.js
+++ b/test/parallel/test-stream-readable-constructor-set-methods.js
@@ -1,19 +1,11 @@
'use strict';
-require('../common');
-const assert = require('assert');
+const common = require('../common');
const Readable = require('stream').Readable;
-let _readCalled = false;
-function _read(n) {
- _readCalled = true;
+const _read = common.mustCall(function _read(n) {
this.push(null);
-}
+});
const r = new Readable({ read: _read });
r.resume();
-
-process.on('exit', function() {
- assert.strictEqual(r._read, _read);
- assert(_readCalled);
-});
diff --git a/test/parallel/test-stream-transform-constructor-set-methods.js b/test/parallel/test-stream-transform-constructor-set-methods.js
index 1423f4de10942d..3e1325c0fd1e27 100644
--- a/test/parallel/test-stream-transform-constructor-set-methods.js
+++ b/test/parallel/test-stream-transform-constructor-set-methods.js
@@ -1,24 +1,25 @@
'use strict';
-require('../common');
+const common = require('../common');
const assert = require('assert');
const Transform = require('stream').Transform;
-let _transformCalled = false;
-function _transform(d, e, n) {
- _transformCalled = true;
+const _transform = common.mustCall(function _transform(d, e, n) {
n();
-}
+});
-let _flushCalled = false;
-function _flush(n) {
- _flushCalled = true;
+const _final = common.mustCall(function _final(n) {
n();
-}
+});
+
+const _flush = common.mustCall(function _flush(n) {
+ n();
+});
const t = new Transform({
transform: _transform,
- flush: _flush
+ flush: _flush,
+ final: _final
});
const t2 = new Transform({});
@@ -34,6 +35,5 @@ assert.throws(() => {
process.on('exit', () => {
assert.strictEqual(t._transform, _transform);
assert.strictEqual(t._flush, _flush);
- assert.strictEqual(_transformCalled, true);
- assert.strictEqual(_flushCalled, true);
+ assert.strictEqual(t._final, _final);
});
diff --git a/test/parallel/test-stream-transform-final-sync.js b/test/parallel/test-stream-transform-final-sync.js
new file mode 100644
index 00000000000000..de3f0904885bb9
--- /dev/null
+++ b/test/parallel/test-stream-transform-final-sync.js
@@ -0,0 +1,100 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+
+const stream = require('stream');
+let state = 0;
+
+/*
+What you do
+var stream = new tream.Transform({
+ transform: function transformCallback(chunk, _, next) {
+ // part 1
+ this.push(chunk);
+ //part 2
+ next();
+ },
+ final: function endCallback(done) {
+ // part 1
+ process.nextTick(function () {
+ // part 2
+ done();
+ });
+ },
+ flush: function flushCallback(done) {
+ // part 1
+ process.nextTick(function () {
+ // part 2
+ done();
+ });
+ }
+});
+t.on('data', dataListener);
+t.on('end', endListener);
+t.on('finish', finishListener);
+t.write(1);
+t.write(4);
+t.end(7, endMethodCallback);
+
+The order things are called
+
+1. transformCallback part 1
+2. dataListener
+3. transformCallback part 2
+4. transformCallback part 1
+5. dataListener
+6. transformCallback part 2
+7. transformCallback part 1
+8. dataListener
+9. transformCallback part 2
+10. finalCallback part 1
+11. finalCallback part 2
+12. flushCallback part 1
+13. finishListener
+14. endMethodCallback
+15. flushCallback part 2
+16. endListener
+*/
+
+const t = new stream.Transform({
+ objectMode: true,
+ transform: common.mustCall(function(chunk, _, next) {
+ assert.strictEqual(++state, chunk, 'transformCallback part 1');
+ this.push(state);
+ assert.strictEqual(++state, chunk + 2, 'transformCallback part 2');
+ process.nextTick(next);
+ }, 3),
+ final: common.mustCall(function(done) {
+ state++;
+ assert.strictEqual(state, 10, 'finalCallback part 1');
+ state++;
+ assert.strictEqual(state, 11, 'finalCallback part 2');
+ done();
+ }, 1),
+ flush: common.mustCall(function(done) {
+ state++;
+ assert.strictEqual(state, 12, 'flushCallback part 1');
+ process.nextTick(function() {
+ state++;
+ assert.strictEqual(state, 15, 'flushCallback part 2');
+ done();
+ });
+ }, 1)
+});
+t.on('finish', common.mustCall(function() {
+ state++;
+ assert.strictEqual(state, 13, 'finishListener');
+}, 1));
+t.on('end', common.mustCall(function() {
+ state++;
+ assert.strictEqual(state, 16, 'end event');
+}, 1));
+t.on('data', common.mustCall(function(d) {
+ assert.strictEqual(++state, d + 1, 'dataListener');
+}, 3));
+t.write(1);
+t.write(4);
+t.end(7, common.mustCall(function() {
+ state++;
+ assert.strictEqual(state, 14, 'endMethodCallback');
+}, 1));
diff --git a/test/parallel/test-stream-transform-final.js b/test/parallel/test-stream-transform-final.js
new file mode 100644
index 00000000000000..56566152e69165
--- /dev/null
+++ b/test/parallel/test-stream-transform-final.js
@@ -0,0 +1,102 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+
+const stream = require('stream');
+let state = 0;
+
+/*
+What you do
+var stream = new tream.Transform({
+ transform: function transformCallback(chunk, _, next) {
+ // part 1
+ this.push(chunk);
+ //part 2
+ next();
+ },
+ final: function endCallback(done) {
+ // part 1
+ process.nextTick(function () {
+ // part 2
+ done();
+ });
+ },
+ flush: function flushCallback(done) {
+ // part 1
+ process.nextTick(function () {
+ // part 2
+ done();
+ });
+ }
+});
+t.on('data', dataListener);
+t.on('end', endListener);
+t.on('finish', finishListener);
+t.write(1);
+t.write(4);
+t.end(7, endMethodCallback);
+
+The order things are called
+
+1. transformCallback part 1
+2. dataListener
+3. transformCallback part 2
+4. transformCallback part 1
+5. dataListener
+6. transformCallback part 2
+7. transformCallback part 1
+8. dataListener
+9. transformCallback part 2
+10. finalCallback part 1
+11. finalCallback part 2
+12. flushCallback part 1
+13. finishListener
+14. endMethodCallback
+15. flushCallback part 2
+16. endListener
+*/
+
+const t = new stream.Transform({
+ objectMode: true,
+ transform: common.mustCall(function(chunk, _, next) {
+ assert.strictEqual(++state, chunk, 'transformCallback part 1');
+ this.push(state);
+ assert.strictEqual(++state, chunk + 2, 'transformCallback part 2');
+ process.nextTick(next);
+ }, 3),
+ final: common.mustCall(function(done) {
+ state++;
+ assert.strictEqual(state, 10, 'finalCallback part 1');
+ setTimeout(function() {
+ state++;
+ assert.strictEqual(state, 11, 'finalCallback part 2');
+ done();
+ }, 100);
+ }, 1),
+ flush: common.mustCall(function(done) {
+ state++;
+ assert.strictEqual(state, 12, 'flushCallback part 1');
+ process.nextTick(function() {
+ state++;
+ assert.strictEqual(state, 15, 'flushCallback part 2');
+ done();
+ });
+ }, 1)
+});
+t.on('finish', common.mustCall(function() {
+ state++;
+ assert.strictEqual(state, 13, 'finishListener');
+}, 1));
+t.on('end', common.mustCall(function() {
+ state++;
+ assert.strictEqual(state, 16, 'end event');
+}, 1));
+t.on('data', common.mustCall(function(d) {
+ assert.strictEqual(++state, d + 1, 'dataListener');
+}, 3));
+t.write(1);
+t.write(4);
+t.end(7, common.mustCall(function() {
+ state++;
+ assert.strictEqual(state, 14, 'endMethodCallback');
+}, 1));
diff --git a/test/parallel/test-stream-write-final.js b/test/parallel/test-stream-write-final.js
new file mode 100644
index 00000000000000..56537bd7fae94d
--- /dev/null
+++ b/test/parallel/test-stream-write-final.js
@@ -0,0 +1,24 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+
+const stream = require('stream');
+let shutdown = false;
+
+const w = new stream.Writable({
+ final: common.mustCall(function(cb) {
+ assert.strictEqual(this, w);
+ setTimeout(function() {
+ shutdown = true;
+ cb();
+ }, 100);
+ }),
+ write: function(chunk, e, cb) {
+ process.nextTick(cb);
+ }
+});
+w.on('finish', common.mustCall(function() {
+ assert(shutdown);
+}));
+w.write(Buffer.allocUnsafe(1));
+w.end(Buffer.allocUnsafe(0));
diff --git a/test/parallel/test-stream2-writable.js b/test/parallel/test-stream2-writable.js
index c046194d6bf63b..8e1ea9a6034cea 100644
--- a/test/parallel/test-stream2-writable.js
+++ b/test/parallel/test-stream2-writable.js
@@ -408,3 +408,25 @@ test('finish is emitted if last chunk is empty', function(t) {
w.write(Buffer.allocUnsafe(1));
w.end(Buffer.alloc(0));
});
+
+test('finish is emitted after shutdown', function(t) {
+ const w = new W();
+ let shutdown = false;
+
+ w._final = common.mustCall(function(cb) {
+ assert.strictEqual(this, w);
+ setTimeout(function() {
+ shutdown = true;
+ cb();
+ }, 100);
+ });
+ w._write = function(chunk, e, cb) {
+ process.nextTick(cb);
+ };
+ w.on('finish', common.mustCall(function() {
+ assert(shutdown);
+ t.end();
+ }));
+ w.write(Buffer.allocUnsafe(1));
+ w.end(Buffer.allocUnsafe(0));
+});