diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js index 4498f0f3905be2..ca33dc1cdd61d7 100644 --- a/lib/internal/streams/duplexify.js +++ b/lib/internal/streams/duplexify.js @@ -83,15 +83,19 @@ module.exports = function duplexify(body, name) { } if (typeof body === 'function') { - const { value, write, final, destroy } = fromAsyncGen(body); + let d; + + const { value, write, final, destroy } = fromAsyncGen(body, () => { + if (d) destroyer(d); + }); // Body might be a constructor function instead of an async generator function. if (isDuplexNodeStream(value)) { - return value; + return d = value; } if (isIterable(value)) { - return from(Duplexify, value, { + return d = from(Duplexify, value, { // TODO (ronag): highWaterMark? objectMode: true, write, @@ -102,12 +106,11 @@ module.exports = function duplexify(body, name) { const then = value?.then; if (typeof then === 'function') { - let d; - const promise = FunctionPrototypeCall( then, value, (val) => { + destroyer(d, null); if (val != null) { throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val); } @@ -208,12 +211,14 @@ module.exports = function duplexify(body, name) { body); }; -function fromAsyncGen(fn) { +function fromAsyncGen(fn, destructor) { let { promise, resolve } = createDeferredPromise(); const ac = new AbortController(); const signal = ac.signal; - const value = fn(async function*() { - while (true) { + let ended = false; + + const asyncGenerator = (async function* () { + while (!ended) { const _promise = promise; promise = null; const { chunk, done, cb } = await _promise; @@ -224,7 +229,40 @@ function fromAsyncGen(fn) { ({ promise, resolve } = createDeferredPromise()); yield chunk; } - }(), { signal }); + })(); + + // Not using try/finally because asyncGenerator.return() should work even + // if the generator was never started. + + const end = async (err) => { + if (ended) return; + ended = true; + + // If generator.return() is called after the generator is done, the promise + // will be null. + if (promise) { + // Avoid ERR_STREAM_PREMATURE_CLOSE by calling cb() + const _promise = promise; + promise = null; + const { cb } = await _promise; + process.nextTick(cb, err); + + if (!err) destructor(); + } + }; + + const originalReturn = asyncGenerator.return; + asyncGenerator.return = function(value) { + return end(null).then(originalReturn.bind(this, value)); + }; + + const originalThrow = asyncGenerator.throw; + asyncGenerator.throw = function(err) { + return end(err).then(originalThrow.bind(this, err)); + }; + + + const value = fn(asyncGenerator, { signal }); return { value, @@ -239,6 +277,7 @@ function fromAsyncGen(fn) { _resolve({ done: true, cb }); }, destroy(err, cb) { + ended = true; ac.abort(); cb(err); }, diff --git a/test/parallel/test-stream-duplex-from.js b/test/parallel/test-stream-duplex-from.js index e3c117ff8dedb0..bb83527582d21b 100644 --- a/test/parallel/test-stream-duplex-from.js +++ b/test/parallel/test-stream-duplex-from.js @@ -5,6 +5,7 @@ const assert = require('assert'); const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream'); const { ReadableStream, WritableStream } = require('stream/web'); const { Blob } = require('buffer'); +const sleep = require('util').promisify(setTimeout); { const d = Duplex.from({ @@ -401,3 +402,136 @@ function makeATestWritableStream(writeFunc) { assert.strictEqual(d.writable, false); })); } + +{ + const r = Readable.from(['foo', 'bar', 'bar']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + const values = await Array.fromAsync(asyncGenerator); + assert.deepStrictEqual(values, ['foo', 'bar', 'bar']); + + await asyncGenerator.return(); + await asyncGenerator.return(); + await asyncGenerator.return(); + }), + common.mustSucceed(() => { + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'bar']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + // eslint-disable-next-line no-unused-vars + for await (const _ of asyncGenerator) break; + }), + common.mustSucceed(() => { + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + const a = await asyncGenerator.next(); + assert.strictEqual(a.done, false); + assert.strictEqual(a.value.toString(), 'foo'); + const b = await asyncGenerator.return(); + assert.strictEqual(b.done, true); + }), + common.mustSucceed(() => { + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + // Note: the generator is not even started at this point + await asyncGenerator.return(); + }), + common.mustSucceed(() => { + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + // Same as before, with a delay + await sleep(100); + await asyncGenerator.return(); + }), + common.mustSucceed(() => { + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) {}), + common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + await sleep(100); + }), + common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + await asyncGenerator.throw(new Error('my error')); + }), + common.mustCall((err) => { + assert.strictEqual(err.message, 'my error'); + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + await asyncGenerator.next(); + await asyncGenerator.throw(new Error('my error')); + }), + common.mustCall((err) => { + assert.strictEqual(err.message, 'my error'); + assert.strictEqual(r.destroyed, true); + }) + ); +}