diff --git a/src/Propulsion.CosmosStore/CosmosStoreSource.fs b/src/Propulsion.CosmosStore/CosmosStoreSource.fs index 083e2015..99beb512 100644 --- a/src/Propulsion.CosmosStore/CosmosStoreSource.fs +++ b/src/Propulsion.CosmosStore/CosmosStoreSource.fs @@ -88,7 +88,7 @@ type CosmosStoreSource = token = ctx.epoch; latency = readElapsed; rc = ctx.requestCharge; age = age; docs = docs.Count ingestLatency = pt.Elapsed; ingestQueued = cur } (log |> Log.withMetric m).Information("Reader {partitionId} {token,9} age {age:dd\.hh\:mm\:ss} {count,4} docs {requestCharge,6:f1}RU {l,5:f1}s Wait {pausedS:f3}s Ahead {cur}/{max}", - ctx.rangeId, ctx.epoch, age, docs.Count, ctx.requestCharge, readElapsed.TotalSeconds, pt.ElapsedMilliseconds, cur, max) + ctx.rangeId, ctx.epoch, age, docs.Count, ctx.requestCharge, readElapsed.TotalSeconds, pt.ElapsedSeconds, cur, max) sw.Restart() // restart the clock as we handoff back to the ChangeFeedProcessor } diff --git a/src/Propulsion.DynamoStore/DynamoStoreIndex.fs b/src/Propulsion.DynamoStore/DynamoStoreIndex.fs index 5ac4ed2e..ae376694 100644 --- a/src/Propulsion.DynamoStore/DynamoStoreIndex.fs +++ b/src/Propulsion.DynamoStore/DynamoStoreIndex.fs @@ -96,9 +96,9 @@ module Reader = // Returns flattened list of all spans, and flag indicating whether tail reached let private loadIndexEpoch (log : Serilog.ILogger) (epochs : AppendsEpoch.Reader.Service) trancheId epochId : Async = async { - let sw = Stopwatch.start () + let ts = Stopwatch.timestamp () let! maybeStreamBytes, _version, state = epochs.Read(trancheId, epochId, 0) - let sizeB, loadS = defaultValueArg maybeStreamBytes 0L, sw.ElapsedSeconds + let sizeB, loadS = defaultValueArg maybeStreamBytes 0L, Stopwatch.elapsedSeconds ts let spans = state.changes |> Array.collect (fun struct (_i, spans) -> spans) let totalEvents = spans |> Array.sumBy (fun x -> x.c.Length) let totalStreams = spans |> AppendsEpoch.flatten |> Seq.length diff --git a/src/Propulsion.EventStore/EventStoreReader.fs b/src/Propulsion.EventStore/EventStoreReader.fs index e9062a34..fc48040f 100755 --- a/src/Propulsion.EventStore/EventStoreReader.fs +++ b/src/Propulsion.EventStore/EventStoreReader.fs @@ -150,7 +150,7 @@ let pullAll (slicesStats : SliceStatsBuffer, overallStats : OverallStats) (conn let rec aux () = async { let! currentSlice = conn.ReadAllEventsForwardAsync(range.Current, batchSize, resolveLinkTos=false) |> Async.AwaitTaskCorrect sw.Stop() // Stop the clock after the read call completes; transition to measuring time to traverse / filter / submit - let postSw = Stopwatch.start () + let postTs = Stopwatch.timestamp () let batchEvents, batchBytes = slicesStats.Ingest currentSlice in overallStats.Ingest(int64 batchEvents, batchBytes) let events = currentSlice.Events |> Seq.choose tryMapEvent |> Array.ofSeq streams.Clear(); cats.Clear() @@ -160,7 +160,7 @@ let pullAll (slicesStats : SliceStatsBuffer, overallStats : OverallStats) (conn let! cur, max = postBatch currentSlice.NextPosition events Log.Information("Read {pos,10} {pct:p1} {ft:n3}s {mb:n1}MB {count,4} {categories,4}c {streams,4}s {events,4}e Post {pt:n3}s {cur}/{max}", range.Current.CommitPosition, range.PositionAsRangePercentage, (let e = sw.Elapsed in e.TotalSeconds), Log.miB batchBytes, - batchEvents, cats.Count, streams.Count, events.Length, (let e = postSw.Elapsed in e.TotalSeconds), cur, max) + batchEvents, cats.Count, streams.Count, events.Length, Stopwatch.elapsedSeconds postTs, cur, max) if not (range.TryNext currentSlice.NextPosition && not once && not currentSlice.IsEndOfStream) then if currentSlice.IsEndOfStream then return Eof else return EndOfTranche diff --git a/src/Propulsion.Feed/FeedSource.fs b/src/Propulsion.Feed/FeedSource.fs index ccbe3090..21c74af5 100644 --- a/src/Propulsion.Feed/FeedSource.fs +++ b/src/Propulsion.Feed/FeedSource.fs @@ -278,10 +278,10 @@ type FeedSource fun (wasLast, pos) -> asyncSeq { if wasLast then do! Async.Sleep(TimeSpan.toMs tailSleepInterval) - let sw = Stopwatch.start () + let readTs = Stopwatch.timestamp () let! page = readPage (trancheId, pos) let items' = page.items |> Array.map (fun x -> struct (streamName, x)) - yield struct (sw.Elapsed, ({ items = items'; checkpoint = page.checkpoint; isTail = page.isTail } : Core.Batch<_>)) + yield struct (Stopwatch.elapsed readTs, ({ items = items'; checkpoint = page.checkpoint; isTail = page.isTail } : Core.Batch<_>)) } /// Drives the continual loop of reading and checkpointing each tranche until a fault occurs.
diff --git a/src/Propulsion/Internal.fs b/src/Propulsion/Internal.fs index 15d2c156..50562437 100644 --- a/src/Propulsion/Internal.fs +++ b/src/Propulsion/Internal.fs @@ -9,6 +9,13 @@ module TimeSpan = module Stopwatch = let inline start () = System.Diagnostics.Stopwatch.StartNew() + let inline timestamp () = System.Diagnostics.Stopwatch.GetTimestamp() + let inline ticksToSeconds ticks = double ticks / double System.Diagnostics.Stopwatch.Frequency + let inline ticksToTimeSpan ticks = ticksToSeconds ticks |> TimeSpan.FromSeconds + + let inline elapsedTicks (ts : int64) = timestamp () - ts + let inline elapsedSeconds (ts : int64) = elapsedTicks ts |> ticksToSeconds + let inline elapsed (ts : int64) = elapsedTicks ts |> ticksToTimeSpan // equivalent to .NET 7 System.Diagnostics.Stopwatch.GetElapsedTime() type System.Diagnostics.Stopwatch with diff --git a/src/Propulsion/Parallel.fs b/src/Propulsion/Parallel.fs index 5af97fd9..67b5fb24 100755 --- a/src/Propulsion/Parallel.fs +++ b/src/Propulsion/Parallel.fs @@ -62,12 +62,12 @@ module Scheduling = let x = { elapsedMs = 0L; remaining = batch.messages.Length; faults = ConcurrentStack(); batch = batch } x, seq { for item in batch.messages -> async { - let sw = Stopwatch.start () + let ts = Stopwatch.timestamp () try match! handle item with - | Choice1Of2 () -> x.RecordOk sw.Elapsed - | Choice2Of2 exn -> x.RecordExn(sw.Elapsed, exn) + | Choice1Of2 () -> x.RecordOk(Stopwatch.elapsed ts) + | Choice2Of2 exn -> x.RecordExn(Stopwatch.elapsed ts, exn) // This exception guard _should_ technically not be necessary per the interface contract, but cannot risk an orphaned batch - with exn -> x.RecordExn(sw.Elapsed, exn) } } + with exn -> x.RecordExn(Stopwatch.elapsed ts, exn) } } /// Infers whether a WipBatch is in a terminal state let (|Busy|Completed|Faulted|) = function diff --git a/src/Propulsion/Streams.fs b/src/Propulsion/Streams.fs index 05f055ab..59105858 100755 --- a/src/Propulsion/Streams.fs +++ b/src/Propulsion/Streams.fs @@ -277,14 +277,14 @@ module Dispatch = // On each iteration, we try to fill the in-flight queue, taking the oldest and/or heaviest streams first let tryFillDispatcher (potential : seq>) markStarted project markBusy = let xs = potential.GetEnumerator() - let startTimestamp = System.Diagnostics.Stopwatch.GetTimestamp() + let startTs = Stopwatch.timestamp () let mutable hasCapacity, dispatched = true, false while xs.MoveNext() && hasCapacity do let item = xs.Current - let succeeded = inner.TryAdd(project struct (startTimestamp, item)) + let succeeded = inner.TryAdd(project struct (startTs, item)) if succeeded then markBusy item.stream - markStarted (item.stream, startTimestamp) + markStarted (item.stream, startTs) hasCapacity <- succeeded dispatched <- dispatched || succeeded // if we added any request, we'll skip sleeping struct (dispatched, hasCapacity) @@ -434,11 +434,6 @@ module Scheduling = type [] BufferState = Idle | Active | Full | Slipstreaming - module StopwatchTicks = - - let inline elapsed (sw : System.Diagnostics.Stopwatch) = sw.ElapsedTicks - let inline toTimeSpan ticks = TimeSpan.FromSeconds(double ticks / double System.Diagnostics.Stopwatch.Frequency) - module Stats = /// Manages state used to generate metrics (and summary logs) regarding streams currently being processed by a Handler @@ -447,8 +442,8 @@ module Scheduling = type private StreamState = { ts : int64; mutable count : int } let private walkAges (state : Dictionary<_, _>) = if state.Count = 0 then Seq.empty else - let currentTimestamp = System.Diagnostics.Stopwatch.GetTimestamp() - seq { for x in state.Values -> struct (currentTimestamp - x.ts, x.count) } + let currentTs = Stopwatch.timestamp () + seq { for x in state.Values -> struct (currentTs - x.ts, x.count) } let private renderState agesAndCounts = let mutable oldest, newest, streams, attempts = Int64.MinValue, Int64.MaxValue, 0, 0 for struct (diff, count) in agesAndCounts do @@ -457,7 +452,7 @@ module Scheduling = streams <- streams + 1 attempts <- attempts + count if streams = 0 then oldest <- 0L; newest <- 0L - struct (streams, attempts), struct (StopwatchTicks.toTimeSpan oldest, StopwatchTicks.toTimeSpan newest) + struct (streams, attempts), struct (Stopwatch.ticksToTimeSpan oldest, Stopwatch.ticksToTimeSpan newest) /// Manages the list of currently dispatched Handlers /// NOTE we are guaranteed we'll hear about a Start before a Finish (or another Start) per stream by the design of the Dispatcher type private Active() = @@ -507,17 +502,17 @@ module Scheduling = type [] Timers() = let mutable results, dispatch, merge, ingest, stats, sleep = 0L, 0L, 0L, 0L, 0L, 0L let sw = Stopwatch.start() - member _.RecordResults sw = results <- results + StopwatchTicks.elapsed sw - member _.RecordDispatch sw = dispatch <- dispatch + StopwatchTicks.elapsed sw - member _.RecordMerge sw = merge <- merge + StopwatchTicks.elapsed sw - member _.RecordIngest sw = ingest <- ingest + StopwatchTicks.elapsed sw - member _.RecordStats sw = stats <- stats + StopwatchTicks.elapsed sw - member _.RecordSleep sw = sleep <- sleep + StopwatchTicks.elapsed sw + member _.RecordResults ts = results <- results + Stopwatch.elapsedTicks ts + member _.RecordDispatch ts = dispatch <- dispatch + Stopwatch.elapsedTicks ts + member _.RecordMerge ts = merge <- merge + Stopwatch.elapsedTicks ts + member _.RecordIngest ts = ingest <- ingest + Stopwatch.elapsedTicks ts + member _.RecordStats ts = stats <- stats + Stopwatch.elapsedTicks ts + member _.RecordSleep ts = sleep <- sleep + Stopwatch.elapsedTicks ts member _.Dump(log : ILogger) = - let dt, ft, mt = StopwatchTicks.toTimeSpan results, StopwatchTicks.toTimeSpan dispatch, StopwatchTicks.toTimeSpan merge - let it, st, zt = StopwatchTicks.toTimeSpan ingest, StopwatchTicks.toTimeSpan stats, StopwatchTicks.toTimeSpan sleep + let dt, ft, mt = Stopwatch.ticksToTimeSpan results, Stopwatch.ticksToTimeSpan dispatch, Stopwatch.ticksToTimeSpan merge + let it, st, zt = Stopwatch.ticksToTimeSpan ingest, Stopwatch.ticksToTimeSpan stats, Stopwatch.ticksToTimeSpan sleep let m = Log.Metric.SchedulerCpu (mt, it, ft, dt, st) - let tot = StopwatchTicks.toTimeSpan (results + dispatch + merge + ingest + stats + sleep) + let tot = Stopwatch.ticksToTimeSpan (results + dispatch + merge + ingest + stats + sleep) (log |> Log.withMetric m).Information(" Cpu Streams {mt:n1}s Batches {it:n1}s Dispatch {ft:n1}s Results {dt:n1}s Stats {st:n1}s Sleep {zt:n1}s Total {tot:n1}s Interval {int:n1}s", mt.TotalSeconds, it.TotalSeconds, ft.TotalSeconds, dt.TotalSeconds, st.TotalSeconds, zt.TotalSeconds, tot.TotalSeconds, sw.ElapsedSeconds) results <- 0L; dispatch <- 0L; merge <- 0L; ingest <- 0L; stats <- 0L; sleep <- 0L @@ -632,9 +627,9 @@ module Scheduling = static member Create(inner, project : struct (FsCodec.StreamName * StreamSpan<'F>) -> CancellationToken -> Task)>, interpretProgress, stats, dumpStreams) = - let project struct (startTicks, item : Dispatch.Item<'F>) (ct : CancellationToken) = task { + let project struct (startTs, item : Dispatch.Item<'F>) (ct : CancellationToken) = task { let! progressed, res = project (item.stream, item.span) ct - return struct (System.Diagnostics.Stopwatch.GetTimestamp() - startTicks |> StopwatchTicks.toTimeSpan, item.stream, progressed, res) } + return struct (Stopwatch.elapsed startTs, item.stream, progressed, res) } MultiDispatcher<_, _, _, _>(inner, project, interpretProgress, stats, dumpStreams) static member Create(inner, handle, interpret, toIndex, stats, dumpStreams) = let project item ct = task { @@ -877,17 +872,17 @@ module Scheduling = member _.Pump(ct : CancellationToken) = task { use _ = dispatcher.Result.Subscribe(fun struct (t, s, pr, r) -> writeResult (Result (t, s, pr, r))) - let inline ssw () = Stopwatch.start () + let inline ts () = Stopwatch.timestamp () while not ct.IsCancellationRequested do let mutable s = { idle = true; dispatcherState = Idle; remaining = maxCycles; waitForPending = false; waitForCapacity = false } let t = dispatcher.Timers while s.remaining <> 0 do s.remaining <- s.remaining - 1 // 1. propagate write write outcomes to buffer (can mark batches completed etc) - let processedResults = let sw = ssw () in let r = tryHandleResults () in t.RecordResults sw; r + let processedResults = let ts = ts () in let r = tryHandleResults () in t.RecordResults ts; r // 2. top up provisioning of writers queue // On each iteration, we try to fill the in-flight queue, taking the oldest and/or heaviest streams first - let struct (dispatched, hasCapacity) = let sw = ssw () in let r = tryDispatch s.IsSlipStreaming in t.RecordDispatch sw; r + let struct (dispatched, hasCapacity) = let ts = ts () in let r = tryDispatch s.IsSlipStreaming in t.RecordDispatch ts; r s.idle <- s.idle && not processedResults && not dispatched match s.dispatcherState with | Idle when not hasCapacity -> @@ -899,8 +894,8 @@ module Scheduling = | Idle -> // need to bring more work into the pool as we can't fill the work queue from what we have // If we're going to fill the write queue with random work, we should bring all read events into the state first // Hence we potentially take more than one batch at a time based on maxBatches (but less buffered work is more optimal) - let mergeStreams batchStreams = let sw = ssw () in streams.Merge batchStreams; t.RecordMerge sw - let ingestBatch batch = let sw = ssw () in ingestBatch batch; t.RecordIngest sw + let mergeStreams batchStreams = let ts = ts () in streams.Merge batchStreams; t.RecordMerge ts + let ingestBatch batch = let ts = ts () in ingestBatch batch; t.RecordIngest ts let struct (ingested, filled) = ingestBatches mergeStreams ingestBatch maxBatches if ingested then s.waitForPending <- not filled // no need to wait if we ingested as many as needed elif slipstreamingEnabled then s.dispatcherState <- Slipstreaming; s.waitForPending <- true // try some slip-streaming, but wait for proper items too @@ -911,17 +906,17 @@ module Scheduling = if s.remaining = 0 && hasCapacity then s.waitForPending <- true if s.remaining = 0 && not hasCapacity && not wakeForResults then s.waitForCapacity <- true // While the loop can take a long time, we don't attempt logging of stats per iteration on the basis that the maxCycles should be low - let sw = ssw () in dispatcher.RecordStats(pendingCount()); t.RecordStats sw + let ts = ts () in dispatcher.RecordStats(pendingCount()); t.RecordStats ts // 4. Do a minimal sleep so we don't run completely hot when empty (unless we did something non-trivial) if s.idle then - let sleepSw = ssw () + let sleepTs = Stopwatch.timestamp () let wakeConditions : Task array = [| if wakeForResults then awaitResults ct elif s.waitForCapacity then dispatcher.AwaitCapacity() if s.waitForPending then awaitPending ct Task.Delay(int sleepIntervalMs) |] do! Task.WhenAny(wakeConditions) :> Task - t.RecordSleep sleepSw + t.RecordSleep sleepTs // 3. Record completion state once per full iteration; dumping streams is expensive so needs to be done infrequently if dispatcher.RecordState(s.dispatcherState, streams, totalPurged) && purgeDue () then purge () } @@ -1222,11 +1217,11 @@ module Sync = let attemptWrite struct (stream, span : FsCodec.ITimelineEvent<'F> array) ct = task { let struct (met, span') = StreamSpan.slice<'F> sliceSize (maxEvents, maxBytes) span - let prepareSw = Stopwatch.start () + let prepareTs = Stopwatch.timestamp () try let req = struct (stream, span') let! res, outcome = Async.StartImmediateAsTask(handle req, cancellationToken = ct) let index' = SpanResult.toIndex span' res - return struct (index' > span[0].Index, Choice1Of2 struct (index', struct (met, prepareSw.Elapsed), outcome)) + return struct (index' > span[0].Index, Choice1Of2 struct (index', struct (met, Stopwatch.elapsed prepareTs), outcome)) with e -> return struct (false, Choice2Of2 struct (met, e)) } let interpretWriteResultProgress _streams (stream : FsCodec.StreamName) = function diff --git a/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs b/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs index 4c7f6c72..4b3950c7 100644 --- a/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs +++ b/tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs @@ -202,8 +202,6 @@ module Helpers = do! Async.Parallel [for i in 1 .. numConsumers -> mkConsumer i] |> Async.Ignore } -#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) - type BatchesConsumer(testOutputHelper) = inherit ConsumerIntegration(testOutputHelper, false)