Skip to content

Commit

Permalink
add option to .releaseLock a ReadableStream on finalization (#3771)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Oct 14, 2024
1 parent d40746d commit 7b69358
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 33 deletions.
5 changes: 5 additions & 0 deletions .changeset/tough-lobsters-guess.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

add option to .releaseLock a ReadableStream on finalization
34 changes: 25 additions & 9 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2120,10 +2120,16 @@ export const fromTQueue: <A>(queue: TDequeue<A>) => Stream<A> = internal.fromTQu
* @since 2.0.0
* @category constructors
*/
export const fromReadableStream: <A, E>(
evaluate: LazyArg<ReadableStream<A>>,
onError: (error: unknown) => E
) => Stream<A, E> = internal.fromReadableStream
export const fromReadableStream: {
<A, E>(
options: {
readonly evaluate: LazyArg<ReadableStream<A>>
readonly onError: (error: unknown) => E
readonly releaseLockOnEnd?: boolean | undefined
}
): Stream<A, E>
<A, E>(evaluate: LazyArg<ReadableStream<A>>, onError: (error: unknown) => E): Stream<A, E>
} = internal.fromReadableStream

/**
* Creates a stream from a `ReadableStreamBYOBReader`.
Expand All @@ -2134,11 +2140,21 @@ export const fromReadableStream: <A, E>(
* @since 2.0.0
* @category constructors
*/
export const fromReadableStreamByob: <E>(
evaluate: LazyArg<ReadableStream<Uint8Array>>,
onError: (error: unknown) => E,
allocSize?: number
) => Stream<Uint8Array, E> = internal.fromReadableStreamByob
export const fromReadableStreamByob: {
<E>(
options: {
readonly evaluate: LazyArg<ReadableStream<Uint8Array>>
readonly onError: (error: unknown) => E
readonly bufferSize?: number | undefined
readonly releaseLockOnEnd?: boolean | undefined
}
): Stream<Uint8Array, E>
<E>(
evaluate: LazyArg<ReadableStream<Uint8Array>>,
onError: (error: unknown) => E,
allocSize?: number
): Stream<Uint8Array, E>
} = internal.fromReadableStreamByob

/**
* Creates a stream from a `Schedule` that does not require any further
Expand Down
89 changes: 69 additions & 20 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import * as MergeDecision from "../MergeDecision.js"
import * as Option from "../Option.js"
import type * as Order from "../Order.js"
import { pipeArguments } from "../Pipeable.js"
import { hasProperty, isTagged, type Predicate, type Refinement } from "../Predicate.js"
import { hasProperty, type Predicate, type Refinement } from "../Predicate.js"
import * as PubSub from "../PubSub.js"
import * as Queue from "../Queue.js"
import * as RcRef from "../RcRef.js"
Expand Down Expand Up @@ -3261,14 +3261,38 @@ export const fromSchedule = <A, R>(schedule: Schedule.Schedule<A, unknown, R>):
)

/** @internal */
export const fromReadableStream = <A, E>(
evaluate: LazyArg<ReadableStream<A>>,
onError: (error: unknown) => E
): Stream.Stream<A, E> =>
unwrapScoped(Effect.map(
export const fromReadableStream: {
<A, E>(
options: {
readonly evaluate: LazyArg<ReadableStream<A>>
readonly onError: (error: unknown) => E
readonly releaseLockOnEnd?: boolean | undefined
}
): Stream.Stream<A, E>
<A, E>(
evaluate: LazyArg<ReadableStream<A>>,
onError: (error: unknown) => E
): Stream.Stream<A, E>
} = <A, E>(
...args: [options: {
readonly evaluate: LazyArg<ReadableStream<A>>
readonly onError: (error: unknown) => E
readonly releaseLockOnEnd?: boolean | undefined
}] | [
evaluate: LazyArg<ReadableStream<A>>,
onError: (error: unknown) => E
]
): Stream.Stream<A, E> => {
const evaluate = args.length === 1 ? args[0].evaluate : args[0]
const onError = args.length === 1 ? args[0].onError : args[1]
const releaseLockOnEnd = args.length === 1 ? args[0].releaseLockOnEnd === true : false
return unwrapScoped(Effect.map(
Effect.acquireRelease(
Effect.sync(() => evaluate().getReader()),
(reader) => Effect.promise(() => reader.cancel())
(reader) =>
releaseLockOnEnd
? Effect.sync(() => reader.releaseLock())
: Effect.promise(() => reader.cancel())
),
(reader) =>
repeatEffectOption(
Expand All @@ -3281,34 +3305,59 @@ export const fromReadableStream = <A, E>(
)
)
))
}

/** @internal */
export const fromReadableStreamByob = <E>(
evaluate: LazyArg<ReadableStream<Uint8Array>>,
onError: (error: unknown) => E,
allocSize = 4096
): Stream.Stream<Uint8Array, E> =>
unwrapScoped(Effect.map(
export const fromReadableStreamByob: {
<E>(
options: {
readonly evaluate: LazyArg<ReadableStream<Uint8Array>>
readonly onError: (error: unknown) => E
readonly bufferSize?: number | undefined
readonly releaseLockOnEnd?: boolean | undefined
}
): Stream.Stream<Uint8Array, E>
<E>(
evaluate: LazyArg<ReadableStream<Uint8Array>>,
onError: (error: unknown) => E,
allocSize?: number
): Stream.Stream<Uint8Array, E>
} = <E>(
...args: [options: {
readonly evaluate: LazyArg<ReadableStream<Uint8Array>>
readonly onError: (error: unknown) => E
readonly bufferSize?: number | undefined
readonly releaseLockOnEnd?: boolean | undefined
}] | [
evaluate: LazyArg<ReadableStream<Uint8Array>>,
onError: (error: unknown) => E,
allocSize?: number | undefined
]
): Stream.Stream<Uint8Array, E> => {
const evaluate = args.length === 1 ? args[0].evaluate : args[0]
const onError = args.length === 1 ? args[0].onError : args[1]
const allocSize = (args.length === 1 ? args[0].bufferSize : args[2]) ?? 4096
const releaseLockOnEnd = args.length === 1 ? args[0].releaseLockOnEnd === true : false
return unwrapScoped(Effect.map(
Effect.acquireRelease(
Effect.sync(() => evaluate().getReader({ mode: "byob" })),
(reader) => Effect.promise(() => reader.cancel())
(reader) => releaseLockOnEnd ? Effect.sync(() => reader.releaseLock()) : Effect.promise(() => reader.cancel())
),
(reader) =>
catchAll(
forever(readChunkStreamByobReader(reader, onError, allocSize)),
(error) => isTagged(error, "EOF") ? empty : fail(error as E)
(error) => error === EOF ? empty : fail(error)
)
))

interface EOF {
readonly _tag: "EOF"
}

const EOF = Symbol.for("effect/Stream/EOF")

const readChunkStreamByobReader = <E>(
reader: ReadableStreamBYOBReader,
onError: (error: unknown) => E,
size: number
): Stream.Stream<Uint8Array, E | EOF> => {
): Stream.Stream<Uint8Array, E | typeof EOF> => {
const buffer = new ArrayBuffer(size)
return paginateEffect(0, (offset) =>
Effect.flatMap(
Expand All @@ -3318,7 +3367,7 @@ const readChunkStreamByobReader = <E>(
}),
({ done, value }) => {
if (done) {
return Effect.fail({ _tag: "EOF" })
return Effect.fail(EOF)
}
const newOffset = offset + value.byteLength
return Effect.succeed([
Expand Down
8 changes: 4 additions & 4 deletions packages/effect/test/Stream/constructors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,10 @@ describe("Stream", () => {
}

const result = yield* $(
Stream.fromReadableStream(
() => new ReadableStream(new NumberSource()),
(error) => new FromReadableStreamError(error)
),
Stream.fromReadableStream({
evaluate: () => new ReadableStream(new NumberSource()),
onError: (error) => new FromReadableStreamError(error)
}),
Stream.take(10),
Stream.runCollect
)
Expand Down

0 comments on commit 7b69358

Please sign in to comment.