Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Backing off at Stream Level #129

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Propulsion.Cosmos/CosmosPruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ module Pruner =
totalDeferred <- totalDeferred + deferred

/// Used to render exceptions that don't fall into the rate-limiting or timed-out categories
override _.HandleExn(log, exn) =
override _.HandleExn(log, _stream, exn) =
match classify exn with
| ExceptionKind.RateLimited | ExceptionKind.TimedOut ->
() // Outcomes are already included in the statistics - no logging is warranted
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/CosmosStorePruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ module Pruner =
totalDeferred <- totalDeferred + deferred

/// Used to render exceptions that don't fall into the rate-limiting or timed-out categories
override _.HandleExn(log, exn) =
override _.HandleExn(log, _stream, exn) =
match classify exn with
| ExceptionKind.RateLimited | ExceptionKind.TimedOut ->
() // Outcomes are already included in the statistics - no logging is warranted
Expand Down
62 changes: 49 additions & 13 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,10 @@ module Scheduling =
let timeSpanFromStopwatchTicks = function
| ticks when ticks > 0L -> TimeSpan.FromSeconds(double ticks / ticksPerSecond)
| _ -> TimeSpan.Zero
let private dateTimeOffsetToTimeStamp (dto : DateTimeOffset) : int64 =
let now, nowTs = DateTimeOffset.UtcNow, System.Diagnostics.Stopwatch.GetTimestamp()
let totalWaitS = (dto - now).TotalSeconds
nowTs + int64 (totalWaitS * ticksPerSecond)
type private StreamState = { ts : int64; mutable count : int }
let private walkAges (state : Dictionary<_, _>) =
let now = System.Diagnostics.Stopwatch.GetTimestamp()
Expand Down Expand Up @@ -551,32 +555,55 @@ module Scheduling =
| true, v -> v.count <- v.count + 1
| false, _ -> state.Add(sn, { ts = startTs; count = 1 })
member _.State = walkAges state |> renderState
/// Maintains a list of streams that have been marked to backing off on processing
type private Waiting() =
let state = Dictionary<FsCodec.StreamName, StreamState>() // NOTE ts is a cutoff time, not a start time here
let prune cutoff =
for sn in [| for kv in state do if kv.Value.ts <= cutoff then kv.Key |] do
state.Remove sn |> ignore
let walk now = if state.Count = 0 then Seq.empty else seq { for x in state.Values -> struct (x.ts - now, x.count) }
member _.HandleBackOff(sn, untilTs) = state.Add(sn, { ts = untilTs; count = 1 })
member _.CanDispatch(sn, ts) =
match state.TryGetValue sn with
| true, { ts = until } when until > ts -> false
| true, _ -> state.Remove sn |> ignore; true
| false, _ -> true
member _.State : (int * int) * (TimeSpan * TimeSpan) =
let now = System.Diagnostics.Stopwatch.GetTimestamp()
prune now
walk now |> renderState
/// Collates all state and reactions to manage the list of busy streams based on callbacks/notifications from the Dispatcher
type Monitor() =
let active, failing, stuck = Active(), Repeating(), Repeating()
let active, failing, stuck, waiting = Active(), Repeating(), Repeating(), Waiting()
let emit (log : ILogger) state (streams, attempts) (oldest : TimeSpan, newest : TimeSpan) =
log.Information(" {state} {streams} for {newest:n1}-{oldest:n1}s, {attempts} attempts",
state, streams, newest.TotalSeconds, oldest.TotalSeconds, attempts)
member _.CanDispatch(sn, ts) =
waiting.CanDispatch(sn, ts)
member _.HandleStarted(sn, ts) =
active.HandleStarted(sn, ts)
member _.HandleResult(sn, succeeded, progressed) =
let startTs = active.TakeFinished(sn)
failing.HandleResult(sn, not succeeded, startTs)
stuck.HandleResult(sn, succeeded && not progressed, startTs)
member _.HandleBackoff(sn, untilTs) =
waiting.HandleBackOff(sn, dateTimeOffsetToTimeStamp untilTs)
member _.DumpState(log : ILogger) =
let inline dump state (streams, attempts) ages =
if streams <> 0 then
emit log state (streams, attempts) ages
active.State ||> dump "active"
failing.State ||> dump "failing"
stuck.State ||> dump "stalled"
waiting.State ||> dump "waiting"
member _.EmitMetrics(log : ILogger) =
let inline report state (streams, attempts) (oldest : TimeSpan, newest : TimeSpan) =
let m = Log.Metric.StreamsBusy (state, streams, oldest.TotalSeconds, newest.TotalSeconds)
emit (log |> Log.metric m) state (streams, attempts) (oldest, newest)
active.State ||> report "active"
failing.State ||> report "failing"
stuck.State ||> report "stalled"
waiting.State ||> report "waiting"

/// Gathers stats pertaining to the core projection/ingestion activity
[<AbstractClass>]
Expand Down Expand Up @@ -625,6 +652,14 @@ module Scheduling =
if stucksDue () then
mon.EmitMetrics metricsLog

abstract BackoffUntil : stream : FsCodec.StreamName * until : DateTimeOffset -> unit
default _.BackoffUntil(stream, until) =
mon.HandleBackoff(stream, until)

/// Enables one to configure backoffs for streams that are failing
abstract CanDispatch : stream : FsCodec.StreamName * stopwatchTicks : int64 -> bool
default _.CanDispatch(stream, stopwatchTicks) = mon.CanDispatch(stream, stopwatchTicks)

abstract MarkStarted : stream : FsCodec.StreamName * stopwatchTicks : int64 -> unit
default _.MarkStarted(stream, stopwatchTicks) =
mon.HandleStarted(stream, stopwatchTicks)
Expand Down Expand Up @@ -685,27 +720,28 @@ module Scheduling =
let inner = DopDispatcher<TimeSpan * FsCodec.StreamName * bool * 'R>(maxDop)

// On each iteration, we try to fill the in-flight queue, taking the oldest and/or heaviest streams first
let tryFillDispatcher (pending, markStarted) project markBusy =
let tryFillDispatcher (pending, canDispatchAt, markStarted) project markBusy =
let mutable hasCapacity, dispatched = inner.HasCapacity, false
if hasCapacity then
let potential : seq<DispatchItem<byte[]>> = pending ()
let xs = potential.GetEnumerator()
let ts = System.Diagnostics.Stopwatch.GetTimestamp()
while xs.MoveNext() && hasCapacity do
let item = xs.Current
let succeeded = inner.TryAdd(project ts item)
if succeeded then
markBusy item.stream
markStarted (item.stream, ts)
hasCapacity <- succeeded
dispatched <- dispatched || succeeded // if we added any request, we'll skip sleeping
if canDispatchAt (item.stream, ts) then
let succeeded = inner.TryAdd(project ts item)
if succeeded then
markBusy item.stream
markStarted (item.stream, ts)
hasCapacity <- succeeded
dispatched <- dispatched || succeeded // if we added any request, we'll skip sleeping
hasCapacity, dispatched

member _.Pump() = inner.Pump()
[<CLIEvent>] member _.Result = inner.Result
member _.State = inner.State
member _.TryReplenish (pending, markStarted) project markStreamBusy =
tryFillDispatcher (pending, markStarted) project markStreamBusy
member _.TryReplenish (pending, canDispatchAt, markStarted) project markStreamBusy =
tryFillDispatcher (pending, canDispatchAt, markStarted) project markStreamBusy

/// Defines interface between Scheduler (which owns the pending work) and the Dispatcher which periodically selects work to commence based on a policy
type IDispatcher<'P, 'R, 'E> =
Expand Down Expand Up @@ -743,7 +779,7 @@ module Scheduling =

interface IDispatcher<'P, 'R, 'E> with
override _.TryReplenish pending markStreamBusy =
inner.TryReplenish (pending, stats.MarkStarted) project markStreamBusy
inner.TryReplenish (pending, stats.CanDispatch, stats.MarkStarted) project markStreamBusy
[<CLIEvent>] override _.Result = inner.Result
override _.InterpretProgress(streams : StreamStates<_>, stream : FsCodec.StreamName, res : Choice<'P, 'E>) =
interpretProgress streams stream res
Expand Down Expand Up @@ -998,9 +1034,9 @@ type Stats<'Outcome>(log : ILogger, statsInterval, statesInterval) =
exnEvents <- exnEvents + es
exnBytes <- exnBytes + int64 bs
resultExnOther <- resultExnOther + 1
this.HandleExn(log.ForContext("stream", stream).ForContext("events", es).ForContext("duration", duration), exn)
this.HandleExn(log.ForContext("stream", stream).ForContext("events", es).ForContext("duration", duration), stream, exn)
abstract member HandleOk : outcome : 'Outcome -> unit
abstract member HandleExn : log : ILogger * exn : exn -> unit
abstract member HandleExn : log : ILogger * streamName : FsCodec.StreamName * exn : exn -> unit

module Projector =

Expand Down
2 changes: 1 addition & 1 deletion tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ module Helpers =
inherit Propulsion.Streams.Stats<unit>(log, statsInterval, stateInterval)

override _.HandleOk(()) = ()
override _.HandleExn(log, exn) = log.Information(exn, "Unhandled")
override _.HandleExn(log, _stream, exn) = log.Information(exn, "Unhandled")

let runConsumersBatch log (config : KafkaConsumerConfig) (numConsumers : int) (timeout : TimeSpan option) (handler : ConsumerCallback) = async {
let mkConsumer (consumerId : int) = async {
Expand Down
2 changes: 1 addition & 1 deletion tools/Propulsion.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ type Stats(log, statsInterval, statesInterval) =
inherit Propulsion.Streams.Stats<unit>(log, statsInterval=statsInterval, statesInterval=statesInterval)
member val StatsInterval = statsInterval
override _.HandleOk(_log) = ()
override _.HandleExn(_log, _exn) = ()
override _.HandleExn(_log, _stream, _exn) = ()

[<EntryPoint>]
let main argv =
Expand Down