Skip to content

Commit

Permalink
Stopwatch polishing
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 22, 2022
1 parent b46eb59 commit 87fe8f8
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/CosmosStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type CosmosStoreSource =
token = ctx.epoch; latency = readElapsed; rc = ctx.requestCharge; age = age; docs = docs.Count
ingestLatency = pt.Elapsed; ingestQueued = cur }
(log |> Log.withMetric m).Information("Reader {partitionId} {token,9} age {age:dd\.hh\:mm\:ss} {count,4} docs {requestCharge,6:f1}RU {l,5:f1}s Wait {pausedS:f3}s Ahead {cur}/{max}",
ctx.rangeId, ctx.epoch, age, docs.Count, ctx.requestCharge, readElapsed.TotalSeconds, pt.ElapsedMilliseconds, cur, max)
ctx.rangeId, ctx.epoch, age, docs.Count, ctx.requestCharge, readElapsed.TotalSeconds, pt.ElapsedSeconds, cur, max)
sw.Restart() // restart the clock as we handoff back to the ChangeFeedProcessor
}

Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.DynamoStore/DynamoStoreIndex.fs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ module Reader =
// Returns flattened list of all spans, and flag indicating whether tail reached
let private loadIndexEpoch (log : Serilog.ILogger) (epochs : AppendsEpoch.Reader.Service) trancheId epochId
: Async<AppendsEpoch.Events.StreamSpan array * bool * int64> = async {
let sw = Stopwatch.start ()
let ts = Stopwatch.timestamp ()
let! maybeStreamBytes, _version, state = epochs.Read(trancheId, epochId, 0)
let sizeB, loadS = defaultValueArg maybeStreamBytes 0L, sw.ElapsedSeconds
let sizeB, loadS = defaultValueArg maybeStreamBytes 0L, Stopwatch.elapsedSeconds ts
let spans = state.changes |> Array.collect (fun struct (_i, spans) -> spans)
let totalEvents = spans |> Array.sumBy (fun x -> x.c.Length)
let totalStreams = spans |> AppendsEpoch.flatten |> Seq.length
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.EventStore/EventStoreReader.fs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ let pullAll (slicesStats : SliceStatsBuffer, overallStats : OverallStats) (conn
let rec aux () = async {
let! currentSlice = conn.ReadAllEventsForwardAsync(range.Current, batchSize, resolveLinkTos=false) |> Async.AwaitTaskCorrect
sw.Stop() // Stop the clock after the read call completes; transition to measuring time to traverse / filter / submit
let postSw = Stopwatch.start ()
let postTs = Stopwatch.timestamp ()
let batchEvents, batchBytes = slicesStats.Ingest currentSlice in overallStats.Ingest(int64 batchEvents, batchBytes)
let events = currentSlice.Events |> Seq.choose tryMapEvent |> Array.ofSeq
streams.Clear(); cats.Clear()
Expand All @@ -160,7 +160,7 @@ let pullAll (slicesStats : SliceStatsBuffer, overallStats : OverallStats) (conn
let! cur, max = postBatch currentSlice.NextPosition events
Log.Information("Read {pos,10} {pct:p1} {ft:n3}s {mb:n1}MB {count,4} {categories,4}c {streams,4}s {events,4}e Post {pt:n3}s {cur}/{max}",
range.Current.CommitPosition, range.PositionAsRangePercentage, (let e = sw.Elapsed in e.TotalSeconds), Log.miB batchBytes,
batchEvents, cats.Count, streams.Count, events.Length, (let e = postSw.Elapsed in e.TotalSeconds), cur, max)
batchEvents, cats.Count, streams.Count, events.Length, Stopwatch.elapsedSeconds postTs, cur, max)
if not (range.TryNext currentSlice.NextPosition && not once && not currentSlice.IsEndOfStream) then
if currentSlice.IsEndOfStream then return Eof
else return EndOfTranche
Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.Feed/FeedSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,10 @@ type FeedSource
fun (wasLast, pos) -> asyncSeq {
if wasLast then
do! Async.Sleep(TimeSpan.toMs tailSleepInterval)
let sw = Stopwatch.start ()
let readTs = Stopwatch.timestamp ()
let! page = readPage (trancheId, pos)
let items' = page.items |> Array.map (fun x -> struct (streamName, x))
yield struct (sw.Elapsed, ({ items = items'; checkpoint = page.checkpoint; isTail = page.isTail } : Core.Batch<_>))
yield struct (Stopwatch.elapsed readTs, ({ items = items'; checkpoint = page.checkpoint; isTail = page.isTail } : Core.Batch<_>))
}

/// Drives the continual loop of reading and checkpointing each tranche until a fault occurs. <br/>
Expand Down
7 changes: 7 additions & 0 deletions src/Propulsion/Internal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ module TimeSpan =
module Stopwatch =

let inline start () = System.Diagnostics.Stopwatch.StartNew()
let inline timestamp () = System.Diagnostics.Stopwatch.GetTimestamp()
let inline ticksToSeconds ticks = double ticks / double System.Diagnostics.Stopwatch.Frequency
let inline ticksToTimeSpan ticks = ticksToSeconds ticks |> TimeSpan.FromSeconds

let inline elapsedTicks (ts : int64) = timestamp () - ts
let inline elapsedSeconds (ts : int64) = elapsedTicks ts |> ticksToSeconds
let inline elapsed (ts : int64) = elapsedTicks ts |> ticksToTimeSpan // equivalent to .NET 7 System.Diagnostics.Stopwatch.GetElapsedTime()

type System.Diagnostics.Stopwatch with

Expand Down
8 changes: 4 additions & 4 deletions src/Propulsion/Parallel.fs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ module Scheduling =
let x = { elapsedMs = 0L; remaining = batch.messages.Length; faults = ConcurrentStack(); batch = batch }
x, seq {
for item in batch.messages -> async {
let sw = Stopwatch.start ()
let ts = Stopwatch.timestamp ()
try match! handle item with
| Choice1Of2 () -> x.RecordOk sw.Elapsed
| Choice2Of2 exn -> x.RecordExn(sw.Elapsed, exn)
| Choice1Of2 () -> x.RecordOk(Stopwatch.elapsed ts)
| Choice2Of2 exn -> x.RecordExn(Stopwatch.elapsed ts, exn)
// This exception guard _should_ technically not be necessary per the interface contract, but cannot risk an orphaned batch
with exn -> x.RecordExn(sw.Elapsed, exn) } }
with exn -> x.RecordExn(Stopwatch.elapsed ts, exn) } }

/// Infers whether a WipBatch is in a terminal state
let (|Busy|Completed|Faulted|) = function
Expand Down
59 changes: 27 additions & 32 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,14 @@ module Dispatch =
// On each iteration, we try to fill the in-flight queue, taking the oldest and/or heaviest streams first
let tryFillDispatcher (potential : seq<Item<'F>>) markStarted project markBusy =
let xs = potential.GetEnumerator()
let startTimestamp = System.Diagnostics.Stopwatch.GetTimestamp()
let startTs = Stopwatch.timestamp ()
let mutable hasCapacity, dispatched = true, false
while xs.MoveNext() && hasCapacity do
let item = xs.Current
let succeeded = inner.TryAdd(project struct (startTimestamp, item))
let succeeded = inner.TryAdd(project struct (startTs, item))
if succeeded then
markBusy item.stream
markStarted (item.stream, startTimestamp)
markStarted (item.stream, startTs)
hasCapacity <- succeeded
dispatched <- dispatched || succeeded // if we added any request, we'll skip sleeping
struct (dispatched, hasCapacity)
Expand Down Expand Up @@ -434,11 +434,6 @@ module Scheduling =

type [<Struct; NoEquality; NoComparison>] BufferState = Idle | Active | Full | Slipstreaming

module StopwatchTicks =

let inline elapsed (sw : System.Diagnostics.Stopwatch) = sw.ElapsedTicks
let inline toTimeSpan ticks = TimeSpan.FromSeconds(double ticks / double System.Diagnostics.Stopwatch.Frequency)

module Stats =

/// Manages state used to generate metrics (and summary logs) regarding streams currently being processed by a Handler
Expand All @@ -447,8 +442,8 @@ module Scheduling =
type private StreamState = { ts : int64; mutable count : int }
let private walkAges (state : Dictionary<_, _>) =
if state.Count = 0 then Seq.empty else
let currentTimestamp = System.Diagnostics.Stopwatch.GetTimestamp()
seq { for x in state.Values -> struct (currentTimestamp - x.ts, x.count) }
let currentTs = Stopwatch.timestamp ()
seq { for x in state.Values -> struct (currentTs - x.ts, x.count) }
let private renderState agesAndCounts =
let mutable oldest, newest, streams, attempts = Int64.MinValue, Int64.MaxValue, 0, 0
for struct (diff, count) in agesAndCounts do
Expand All @@ -457,7 +452,7 @@ module Scheduling =
streams <- streams + 1
attempts <- attempts + count
if streams = 0 then oldest <- 0L; newest <- 0L
struct (streams, attempts), struct (StopwatchTicks.toTimeSpan oldest, StopwatchTicks.toTimeSpan newest)
struct (streams, attempts), struct (Stopwatch.ticksToTimeSpan oldest, Stopwatch.ticksToTimeSpan newest)
/// Manages the list of currently dispatched Handlers
/// NOTE we are guaranteed we'll hear about a Start before a Finish (or another Start) per stream by the design of the Dispatcher
type private Active() =
Expand Down Expand Up @@ -507,17 +502,17 @@ module Scheduling =
type [<NoComparison; NoEquality>] Timers() =
let mutable results, dispatch, merge, ingest, stats, sleep = 0L, 0L, 0L, 0L, 0L, 0L
let sw = Stopwatch.start()
member _.RecordResults sw = results <- results + StopwatchTicks.elapsed sw
member _.RecordDispatch sw = dispatch <- dispatch + StopwatchTicks.elapsed sw
member _.RecordMerge sw = merge <- merge + StopwatchTicks.elapsed sw
member _.RecordIngest sw = ingest <- ingest + StopwatchTicks.elapsed sw
member _.RecordStats sw = stats <- stats + StopwatchTicks.elapsed sw
member _.RecordSleep sw = sleep <- sleep + StopwatchTicks.elapsed sw
member _.RecordResults ts = results <- results + Stopwatch.elapsedTicks ts
member _.RecordDispatch ts = dispatch <- dispatch + Stopwatch.elapsedTicks ts
member _.RecordMerge ts = merge <- merge + Stopwatch.elapsedTicks ts
member _.RecordIngest ts = ingest <- ingest + Stopwatch.elapsedTicks ts
member _.RecordStats ts = stats <- stats + Stopwatch.elapsedTicks ts
member _.RecordSleep ts = sleep <- sleep + Stopwatch.elapsedTicks ts
member _.Dump(log : ILogger) =
let dt, ft, mt = StopwatchTicks.toTimeSpan results, StopwatchTicks.toTimeSpan dispatch, StopwatchTicks.toTimeSpan merge
let it, st, zt = StopwatchTicks.toTimeSpan ingest, StopwatchTicks.toTimeSpan stats, StopwatchTicks.toTimeSpan sleep
let dt, ft, mt = Stopwatch.ticksToTimeSpan results, Stopwatch.ticksToTimeSpan dispatch, Stopwatch.ticksToTimeSpan merge
let it, st, zt = Stopwatch.ticksToTimeSpan ingest, Stopwatch.ticksToTimeSpan stats, Stopwatch.ticksToTimeSpan sleep
let m = Log.Metric.SchedulerCpu (mt, it, ft, dt, st)
let tot = StopwatchTicks.toTimeSpan (results + dispatch + merge + ingest + stats + sleep)
let tot = Stopwatch.ticksToTimeSpan (results + dispatch + merge + ingest + stats + sleep)
(log |> Log.withMetric m).Information(" Cpu Streams {mt:n1}s Batches {it:n1}s Dispatch {ft:n1}s Results {dt:n1}s Stats {st:n1}s Sleep {zt:n1}s Total {tot:n1}s Interval {int:n1}s",
mt.TotalSeconds, it.TotalSeconds, ft.TotalSeconds, dt.TotalSeconds, st.TotalSeconds, zt.TotalSeconds, tot.TotalSeconds, sw.ElapsedSeconds)
results <- 0L; dispatch <- 0L; merge <- 0L; ingest <- 0L; stats <- 0L; sleep <- 0L
Expand Down Expand Up @@ -632,9 +627,9 @@ module Scheduling =
static member Create(inner,
project : struct (FsCodec.StreamName * StreamSpan<'F>) -> CancellationToken -> Task<struct (bool * Choice<'P, 'E>)>,
interpretProgress, stats, dumpStreams) =
let project struct (startTicks, item : Dispatch.Item<'F>) (ct : CancellationToken) = task {
let project struct (startTs, item : Dispatch.Item<'F>) (ct : CancellationToken) = task {
let! progressed, res = project (item.stream, item.span) ct
return struct (System.Diagnostics.Stopwatch.GetTimestamp() - startTicks |> StopwatchTicks.toTimeSpan, item.stream, progressed, res) }
return struct (Stopwatch.elapsed startTs, item.stream, progressed, res) }
MultiDispatcher<_, _, _, _>(inner, project, interpretProgress, stats, dumpStreams)
static member Create(inner, handle, interpret, toIndex, stats, dumpStreams) =
let project item ct = task {
Expand Down Expand Up @@ -877,17 +872,17 @@ module Scheduling =

member _.Pump(ct : CancellationToken) = task {
use _ = dispatcher.Result.Subscribe(fun struct (t, s, pr, r) -> writeResult (Result (t, s, pr, r)))
let inline ssw () = Stopwatch.start ()
let inline ts () = Stopwatch.timestamp ()
while not ct.IsCancellationRequested do
let mutable s = { idle = true; dispatcherState = Idle; remaining = maxCycles; waitForPending = false; waitForCapacity = false }
let t = dispatcher.Timers
while s.remaining <> 0 do
s.remaining <- s.remaining - 1
// 1. propagate write write outcomes to buffer (can mark batches completed etc)
let processedResults = let sw = ssw () in let r = tryHandleResults () in t.RecordResults sw; r
let processedResults = let ts = ts () in let r = tryHandleResults () in t.RecordResults ts; r
// 2. top up provisioning of writers queue
// On each iteration, we try to fill the in-flight queue, taking the oldest and/or heaviest streams first
let struct (dispatched, hasCapacity) = let sw = ssw () in let r = tryDispatch s.IsSlipStreaming in t.RecordDispatch sw; r
let struct (dispatched, hasCapacity) = let ts = ts () in let r = tryDispatch s.IsSlipStreaming in t.RecordDispatch ts; r
s.idle <- s.idle && not processedResults && not dispatched
match s.dispatcherState with
| Idle when not hasCapacity ->
Expand All @@ -899,8 +894,8 @@ module Scheduling =
| Idle -> // need to bring more work into the pool as we can't fill the work queue from what we have
// If we're going to fill the write queue with random work, we should bring all read events into the state first
// Hence we potentially take more than one batch at a time based on maxBatches (but less buffered work is more optimal)
let mergeStreams batchStreams = let sw = ssw () in streams.Merge batchStreams; t.RecordMerge sw
let ingestBatch batch = let sw = ssw () in ingestBatch batch; t.RecordIngest sw
let mergeStreams batchStreams = let ts = ts () in streams.Merge batchStreams; t.RecordMerge ts
let ingestBatch batch = let ts = ts () in ingestBatch batch; t.RecordIngest ts
let struct (ingested, filled) = ingestBatches mergeStreams ingestBatch maxBatches
if ingested then s.waitForPending <- not filled // no need to wait if we ingested as many as needed
elif slipstreamingEnabled then s.dispatcherState <- Slipstreaming; s.waitForPending <- true // try some slip-streaming, but wait for proper items too
Expand All @@ -911,17 +906,17 @@ module Scheduling =
if s.remaining = 0 && hasCapacity then s.waitForPending <- true
if s.remaining = 0 && not hasCapacity && not wakeForResults then s.waitForCapacity <- true
// While the loop can take a long time, we don't attempt logging of stats per iteration on the basis that the maxCycles should be low
let sw = ssw () in dispatcher.RecordStats(pendingCount()); t.RecordStats sw
let ts = ts () in dispatcher.RecordStats(pendingCount()); t.RecordStats ts
// 4. Do a minimal sleep so we don't run completely hot when empty (unless we did something non-trivial)
if s.idle then
let sleepSw = ssw ()
let sleepTs = Stopwatch.timestamp ()
let wakeConditions : Task array = [|
if wakeForResults then awaitResults ct
elif s.waitForCapacity then dispatcher.AwaitCapacity()
if s.waitForPending then awaitPending ct
Task.Delay(int sleepIntervalMs) |]
do! Task.WhenAny(wakeConditions) :> Task
t.RecordSleep sleepSw
t.RecordSleep sleepTs
// 3. Record completion state once per full iteration; dumping streams is expensive so needs to be done infrequently
if dispatcher.RecordState(s.dispatcherState, streams, totalPurged) && purgeDue () then
purge () }
Expand Down Expand Up @@ -1222,11 +1217,11 @@ module Sync =

let attemptWrite struct (stream, span : FsCodec.ITimelineEvent<'F> array) ct = task {
let struct (met, span') = StreamSpan.slice<'F> sliceSize (maxEvents, maxBytes) span
let prepareSw = Stopwatch.start ()
let prepareTs = Stopwatch.timestamp ()
try let req = struct (stream, span')
let! res, outcome = Async.StartImmediateAsTask(handle req, cancellationToken = ct)
let index' = SpanResult.toIndex span' res
return struct (index' > span[0].Index, Choice1Of2 struct (index', struct (met, prepareSw.Elapsed), outcome))
return struct (index' > span[0].Index, Choice1Of2 struct (index', struct (met, Stopwatch.elapsed prepareTs), outcome))
with e -> return struct (false, Choice2Of2 struct (met, e)) }

let interpretWriteResultProgress _streams (stream : FsCodec.StreamName) = function
Expand Down
2 changes: 0 additions & 2 deletions tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,6 @@ module Helpers =
do! Async.Parallel [for i in 1 .. numConsumers -> mkConsumer i] |> Async.Ignore
}
#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)
type BatchesConsumer(testOutputHelper) =
inherit ConsumerIntegration(testOutputHelper, false)
Expand Down

0 comments on commit 87fe8f8

Please sign in to comment.