From 2c9548a157b8c982383bbe7a1f126c626a228881 Mon Sep 17 00:00:00 2001 From: Matthieu Sieben Date: Tue, 24 Sep 2024 21:31:23 +0200 Subject: [PATCH] stream: add tests for Duplex.from() --- lib/internal/streams/duplexify.js | 3 -- test/parallel/test-stream-duplex-from.js | 57 ++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js index 8c60dea011c905..033884a5a7e5c9 100644 --- a/lib/internal/streams/duplexify.js +++ b/lib/internal/streams/duplexify.js @@ -257,9 +257,6 @@ function fromAsyncGen(fn, destructor) { asyncGenerator.throw = async function(err) { try { return await originalThrow.call(this, err); - } catch (error) { - err ??= error; - throw error; } finally { if (promise) { const _promise = promise; diff --git a/test/parallel/test-stream-duplex-from.js b/test/parallel/test-stream-duplex-from.js index a0f5200fbf3be1..dc54ef49c8fba3 100644 --- a/test/parallel/test-stream-duplex-from.js +++ b/test/parallel/test-stream-duplex-from.js @@ -507,6 +507,48 @@ function makeATestWritableStream(writeFunc) { ); } +{ + const r = Readable.from(['foo', 'bar', 'baz']); + const d = Duplex.from(async function(asyncGenerator) { + while (!(await asyncGenerator.next()).done) await sleep(100); + }); + + setTimeout(() => d.destroy(), 150); + + pipeline( + r, + d, + common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + assert.strictEqual(r.destroyed, true); + }) + ); +} + +{ + const r = Duplex.from(async function* () { + for (const value of ['foo', 'bar', 'baz']) { + await sleep(50); + yield value; + } + }); + const d = Duplex.from(async function(asyncGenerator) { + while (!(await asyncGenerator.next()).done); + }); + + setTimeout(() => r.destroy(), 75); + + pipeline( + r, + d, + common.mustCall((err) => { + assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); + assert.strictEqual(r.destroyed, true); + assert.strictEqual(d.destroyed, true); + }) + ); +} + { const r = Readable.from(['foo']); pipeline( @@ -535,3 +577,18 @@ function makeATestWritableStream(writeFunc) { }) ); } + +{ + const r = Readable.from(['foo', 'bar']); + pipeline( + r, + Duplex.from(async function(asyncGenerator) { + await asyncGenerator.next(); + await asyncGenerator.throw(); + }), + common.mustCall((err) => { + assert.strictEqual(err.code, 'ABORT_ERR'); + assert.strictEqual(r.destroyed, true); + }) + ); +}