From 094e5717fe207fdf0a4ceb9bf86237edc77b040e Mon Sep 17 00:00:00 2001 From: Matthieu Sieben Date: Tue, 24 Sep 2024 11:04:07 +0200 Subject: [PATCH] stream: handle generator destruction from Duplex.from() --- lib/internal/streams/duplexify.js | 81 +++++++++++--- test/parallel/test-stream-duplex-from.js | 130 +++++++++++++++++++++++ 2 files changed, 197 insertions(+), 14 deletions(-) diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js index 4498f0f3905be2..9717bf80a8381f 100644 --- a/lib/internal/streams/duplexify.js +++ b/lib/internal/streams/duplexify.js @@ -83,15 +83,19 @@ module.exports = function duplexify(body, name) { } if (typeof body === 'function') { - const { value, write, final, destroy } = fromAsyncGen(body); + let d; + + const { value, write, final, destroy } = fromAsyncGen(body, (err) => { + if (d) destroyer(d, err); + }); // Body might be a constructor function instead of an async generator function. if (isDuplexNodeStream(value)) { - return value; + return d = value; } if (isIterable(value)) { - return from(Duplexify, value, { + return d = from(Duplexify, value, { // TODO (ronag): highWaterMark? objectMode: true, write, @@ -102,12 +106,11 @@ module.exports = function duplexify(body, name) { const then = value?.then; if (typeof then === 'function') { - let d; - const promise = FunctionPrototypeCall( then, value, (val) => { + destroyer(d, null); if (val != null) { throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val); } @@ -208,11 +211,14 @@ module.exports = function duplexify(body, name) { body); }; -function fromAsyncGen(fn) { +function fromAsyncGen(fn, onAbort) { let { promise, resolve } = createDeferredPromise(); const ac = new AbortController(); const signal = ac.signal; - const value = fn(async function*() { + let ended = false; + let error = null; + + const asyncGenerator = (async function* () { while (true) { const _promise = promise; promise = null; @@ -224,21 +230,68 @@ function fromAsyncGen(fn) { ({ promise, resolve } = createDeferredPromise()); yield chunk; } - }(), { signal }); + })(); + + // Not using try/finally because asyncGenerator.return() should work even + // if the generator was never started. + + const originalReturn = asyncGenerator.return; + asyncGenerator.return = async function(value) { + // eslint-disable-next-line node-core/avoid-prototype-pollution + if (ended) return { value, done: true }; + + const _promise = promise; + promise = null; + const { cb } = await _promise; + process.nextTick(cb); + + ended = true; + + onAbort(null); + return originalReturn.call(asyncGenerator, value); + }; + + const originalThrow = asyncGenerator.throw; + asyncGenerator.throw = async function(err) { + if (ended) throw err; + + const _promise = promise; + promise = null; + const { cb } = await _promise; + process.nextTick(cb); + + ended = true; + error = err; + onAbort(err); + + return originalThrow.call(asyncGenerator, err); + }; + + + const value = fn(asyncGenerator, { signal }); return { value, write(chunk, encoding, cb) { - const _resolve = resolve; - resolve = null; - _resolve({ chunk, done: false, cb }); + if (ended) { + cb(error); + } else { + const _resolve = resolve; + resolve = null; + _resolve({ chunk, done: false, cb }); + } }, final(cb) { - const _resolve = resolve; - resolve = null; - _resolve({ done: true, cb }); + if (ended) { + cb(error); + } else { + const _resolve = resolve; + resolve = null; + _resolve({ done: true, cb }); + } }, destroy(err, cb) { + ended = true; ac.abort(); cb(err); }, diff --git a/test/parallel/test-stream-duplex-from.js b/test/parallel/test-stream-duplex-from.js index e3c117ff8dedb0..384ba26dcb8ee2 100644 --- a/test/parallel/test-stream-duplex-from.js +++ b/test/parallel/test-stream-duplex-from.js @@ -5,6 +5,7 @@ const assert = require('assert'); const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream'); const { ReadableStream, WritableStream } = require('stream/web'); const { Blob } = require('buffer'); +const sleep = require('util').promisify(setTimeout); { const d = Duplex.from({ @@ -401,3 +402,132 @@ function makeATestWritableStream(writeFunc) { assert.strictEqual(d.writable, false); })); } + +{ + const r = Readable.from(['foo', 'bar', 'bar']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + // eslint-disable-next-line no-unused-vars + for await (const _ of asyncGenerator); + }), + common.mustSucceed(() => { + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'bar']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + // eslint-disable-next-line no-unused-vars + for await (const _ of asyncGenerator) break; + }), + common.mustSucceed(() => { + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + const a = await asyncGenerator.next(); + assert.strictEqual(a.done, false); + assert.strictEqual(a.value.toString(), 'foo'); + const b = await asyncGenerator.return(); + assert.strictEqual(b.done, true); + }), + common.mustSucceed(() => { + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + // Note: the generator is not even started at this point + await asyncGenerator.return(); + }), + common.mustSucceed(() => { + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + // Same as before, with a delay + await sleep(100); + await asyncGenerator.return(); + }), + common.mustSucceed(() => { + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) {}), + common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar', 'baz']); + pipeline( + r, + Duplex.from(function(asyncGenerator) { + return sleep(100); + }), + common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + await asyncGenerator.throw(new Error('my error')); + }), + common.mustCall((err) => { + assert.strictEqual(err.message, 'my error'); + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Readable.from(['foo', 'bar']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + await asyncGenerator.next(); + await asyncGenerator.throw(new Error('my error')); + }), + common.mustCall((err) => { + assert.strictEqual(err.message, 'my error'); + assert.strictEqual(r.destroyed, true); + }) + ); +}