Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stuck Streams metrics #126

Merged
merged 10 commits into from
Dec 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]

### Added

- `Streams`: Added `propulsion_scheduler_busy` metrics: `count` and `seconds` [#126](https://github.com/jet/propulsion/pull/126)

### Changed
### Removed

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ The order in which the need for various components arose (as a side effect of bu

The things Propulsion in general accomplishes in the projections space:
- Uniform dashboards for throughput, successes vs failures, and latency distributions over CosmosDB, EventStoreDB, Kafka and generic Feeds
- Metrics to support trustworthy alerting and detailed analysis of busy, failing and stuck projections
- make reading, checkpointing, parsing and running independent asynchronous activities (all big perf boosts with Cosmos, less relevant for EventStoreDB)
- allow handlers to handle backlog of accumulated items for a stream as a batch if desired
- concurrency across streams
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.Cosmos/CosmosPruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ module Pruner =
let inline adds x (set:HashSet<_>) = set.Add x |> ignore
base.Handle message
match message with
| Scheduling.InternalMessage.Result (_duration, stream, Choice2Of2 (_, exn)) ->
| Scheduling.InternalMessage.Result (_duration, stream, _progressed, Choice2Of2 (_, exn)) ->
match classify exn with
| ExceptionKind.RateLimited ->
adds stream rlStreams; incr rateLimited
Expand Down
8 changes: 4 additions & 4 deletions src/Propulsion.Cosmos/CosmosSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ module Internal =
base.Handle message
match message with
| Scheduling.InternalMessage.Added _ -> () // Processed by standard logging already; we have nothing to add
| Scheduling.InternalMessage.Result (_duration, stream, Choice1Of2 ((es, bs), res)) ->
| Scheduling.InternalMessage.Result (_duration, stream, _progressed, Choice1Of2 ((es, bs), res)) ->
adds stream okStreams
okEvents <- okEvents + es
okBytes <- okBytes + int64 bs
Expand All @@ -115,7 +115,7 @@ module Internal =
| Writer.Result.PartialDuplicate _ -> incr resultPartialDup
| Writer.Result.PrefixMissing _ -> incr resultPrefix
this.HandleOk res
| Scheduling.InternalMessage.Result (_duration, stream, Choice2Of2 ((es, bs), exn)) ->
| Scheduling.InternalMessage.Result (_duration, stream, _progressed, Choice2Of2 ((es, bs), exn)) ->
adds stream failStreams
exnEvents <- exnEvents + es
exnBytes <- exnBytes + int64 bs
Expand Down Expand Up @@ -143,8 +143,8 @@ module Internal =
let selectedConnection = cosmosContexts.[index]
let met, span' = Buffering.StreamSpan.slice (maxEvents, maxBytes) span
try let! res = Writer.write log selectedConnection (StreamName.toString stream) span'
return Choice1Of2 (met, res)
with e -> return Choice2Of2 (met, e) }
return span'.events.Length > 0, Choice1Of2 (met, res)
with e -> return false, Choice2Of2 (met, e) }
let interpretWriteResultProgress (streams: Scheduling.StreamStates<_>) stream res =
let applyResultToStreamState = function
| Choice1Of2 (_stats, Writer.Ok pos) -> streams.InternalUpdate stream pos null, false
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/CosmosStorePruner.fs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ module Pruner =
let inline adds x (set:HashSet<_>) = set.Add x |> ignore
base.Handle message
match message with
| Propulsion.Streams.Scheduling.InternalMessage.Result (_duration, stream, Choice2Of2 (_, exn)) ->
| Propulsion.Streams.Scheduling.InternalMessage.Result (_duration, stream, _progressed, Choice2Of2 (_, exn)) ->
match classify exn with
| ExceptionKind.RateLimited ->
adds stream rlStreams; incr rateLimited
Expand Down
8 changes: 4 additions & 4 deletions src/Propulsion.CosmosStore/CosmosStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ module Internal =
base.Handle message
match message with
| Scheduling.InternalMessage.Added _ -> () // Processed by standard logging already; we have nothing to add
| Scheduling.InternalMessage.Result (_duration, stream, Choice1Of2 ((es, bs), res)) ->
| Scheduling.InternalMessage.Result (_duration, stream, _progressed, Choice1Of2 ((es, bs), res)) ->
adds stream okStreams
okEvents <- okEvents + es
okBytes <- okBytes + int64 bs
Expand All @@ -112,7 +112,7 @@ module Internal =
| Writer.Result.PartialDuplicate _ -> incr resultPartialDup
| Writer.Result.PrefixMissing _ -> incr resultPrefix
this.HandleOk res
| Scheduling.InternalMessage.Result (_duration, stream, Choice2Of2 ((es, bs), exn)) ->
| Scheduling.InternalMessage.Result (_duration, stream, _progressed, Choice2Of2 ((es, bs), exn)) ->
adds stream failStreams
exnEvents <- exnEvents + es
exnBytes <- exnBytes + int64 bs
Expand All @@ -137,8 +137,8 @@ module Internal =
let attemptWrite (stream, span) = async {
let met, span' = Buffering.StreamSpan.slice (maxEvents, maxBytes) span
try let! res = Writer.write log eventsContext (StreamName.toString stream) span'
return Choice1Of2 (met, res)
with e -> return Choice2Of2 (met, e) }
return span'.events.Length > 0, Choice1Of2 (met, res)
with e -> return false, Choice2Of2 (met, e) }
let interpretWriteResultProgress (streams: Scheduling.StreamStates<_>) stream res =
let applyResultToStreamState = function
| Choice1Of2 (_stats, Writer.Ok pos) -> streams.InternalUpdate stream pos null, false
Expand Down
8 changes: 4 additions & 4 deletions src/Propulsion.EventStore/EventStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ module Internal =
base.Handle message
match message with
| Scheduling.InternalMessage.Added _ -> () // Processed by standard logging already; we have nothing to add
| Scheduling.InternalMessage.Result (_duration, stream, Choice1Of2 ((es, bs), res)) ->
| Scheduling.InternalMessage.Result (_duration, stream, _progressed, Choice1Of2 ((es, bs), res)) ->
adds stream okStreams
okEvents <- okEvents + es
okBytes <- okBytes + int64 bs
Expand All @@ -99,7 +99,7 @@ module Internal =
| Writer.Result.PartialDuplicate _ -> incr resultPartialDup
| Writer.Result.PrefixMissing _ -> incr resultPrefix
this.HandleOk res
| Scheduling.InternalMessage.Result (_duration, stream, Choice2Of2 ((es, bs), exn)) ->
| Scheduling.InternalMessage.Result (_duration, stream, _progressed, Choice2Of2 ((es, bs), exn)) ->
adds stream failStreams
exnEvents <- exnEvents + es
exnBytes <- exnBytes + int64 bs
Expand All @@ -124,8 +124,8 @@ module Internal =
let maxEvents, maxBytes = 65536, 4 * 1024 * 1024 - (*fudge*)4096
let met, span' = Buffering.StreamSpan.slice (maxEvents, maxBytes) span
try let! res = Writer.write storeLog selectedConnection (FsCodec.StreamName.toString stream) span'
return Choice1Of2 (met, res)
with e -> return Choice2Of2 (met, e) }
return span'.events.Length > 0, Choice1Of2 (met, res)
with e -> return false, Choice2Of2 (met, e) }

let interpretWriteResultProgress (streams : Scheduling.StreamStates<_>) stream res =
let applyResultToStreamState = function
Expand Down
8 changes: 4 additions & 4 deletions src/Propulsion.Kafka/Consumers.fs
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ type BatchesConsumer =
logExternalState |> Option.iter (fun f -> f log)
streams.Dump(log, Streams.Buffering.StreamState.eventsSize)
let handle (items : Streams.Scheduling.DispatchItem<byte[]>[])
: Async<(TimeSpan * StreamName * Choice<int64 * (EventMetrics * unit), EventMetrics * exn>)[]> = async {
: Async<(TimeSpan * StreamName * bool * Choice<int64 * (EventMetrics * unit), EventMetrics * exn>)[]> = async {
let sw = Stopwatch.StartNew()
let avgElapsed () =
let tot = let e = sw.Elapsed in e.TotalMilliseconds
Expand All @@ -493,16 +493,16 @@ type BatchesConsumer =
| item, Choice1Of2 index' ->
let used : Streams.StreamSpan<_> = { item.span with events = item.span.events |> Seq.takeWhile (fun e -> e.Index <> index' ) |> Array.ofSeq }
let s = Streams.Buffering.StreamSpan.stats used
ae, item.stream, Choice1Of2 (index', (s, ()))
ae, item.stream, true, Choice1Of2 (index', (s, ()))
| item, Choice2Of2 exn ->
let s = Streams.Buffering.StreamSpan.stats item.span
ae, item.stream, Choice2Of2 (s, exn) |]
ae, item.stream, false, Choice2Of2 (s, exn) |]
with e ->
let ae = avgElapsed ()
return
[| for x in items ->
let s = Streams.Buffering.StreamSpan.stats x.span
ae, x.stream, Choice2Of2 (s, e) |] }
ae, x.stream, false, Choice2Of2 (s, e) |] }
let dispatcher = Streams.Scheduling.BatchedDispatcher(select, handle, stats, dumpStreams)
let streamsScheduler = Streams.Scheduling.StreamSchedulingEngine.Create(dispatcher, ?idleDelay=idleDelay, maxBatches=maxBatches)
let mapConsumedMessagesToStreamsBatch onCompletion (x : Submission.SubmissionBatch<TopicPartition, 'Info>) : Streams.Scheduling.StreamsBatch<_> =
Expand Down
25 changes: 19 additions & 6 deletions src/Propulsion/PropulsionPrometheus.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module private Impl =
let baseName stat = "propulsion_scheduler_" + stat
let baseDesc desc = "Propulsion Scheduler " + desc
let groupLabels = [| "group"; "state" |]
let groupWithKindLabels = [| "kind"; "group"; "state" |]
let activityLabels = [| "group"; "activity" |]
let latencyLabels = [| "group"; "kind" |]

Expand All @@ -26,6 +27,9 @@ module private Gauge =
let create (tagNames, tagValues) stat desc =
let config = Prometheus.GaugeConfiguration(LabelNames = append tagNames groupLabels)
make config (baseName stat) (baseDesc desc) tagValues
let createWithKind (tagNames, tagValues) kind stat desc =
let config = Prometheus.GaugeConfiguration(LabelNames = append tagNames groupWithKindLabels)
make config (baseName stat) (baseDesc desc) (Array.append tagValues [| kind |])

module private Counter =

Expand Down Expand Up @@ -89,6 +93,10 @@ type LogSink(customTags: seq<string * string>, group: string) =
let observeEvents = Gauge.create tags "events" "Current events"
let observeBytes = Gauge.create tags "bytes" "Current bytes"

let observeBusyCount = Gauge.create tags "busy_count" "Current Busy Streams count"
let observeBusyOldest = Gauge.createWithKind tags "oldest" "busy_seconds" "Busy Streams age, seconds"
let observeBusyNewest = Gauge.createWithKind tags "newest" "busy_seconds" "Busy Streams age, seconds"

let observeCpu = Counter.create tags "cpu" "Processing Time Breakdown"

let observeLatSum = Summary.latency tags "handler_summary" "Handler action"
Expand All @@ -102,10 +110,13 @@ type LogSink(customTags: seq<string * string>, group: string) =

let observeState = observeState group
let observeCpu = observeCpu group
let observeLatency kind latenciesS =
for v in latenciesS do
observeLatSum (group, kind) v
observeLatHis (group, kind) v
let observeLatency kind latency =
observeLatSum (group, kind) latency
observeLatHis (group, kind) latency
let observeBusy kind count oldest newest =
observeBusyCount group kind (float count)
observeBusyOldest group kind oldest
observeBusyNewest group kind newest

interface Serilog.Core.ILogEventSink with
member _.Emit logEvent = logEvent |> function
Expand All @@ -124,6 +135,8 @@ type LogSink(customTags: seq<string * string>, group: string) =
observeCpu "dispatch" dispatch.TotalSeconds
observeCpu "results" results.TotalSeconds
observeCpu "stats" stats.TotalSeconds
| Metric.AttemptLatencies (kind, latenciesS) ->
observeLatency kind latenciesS
| Metric.HandlerResult (kind, latency) ->
observeLatency kind latency
| Metric.StreamsBusy (kind, count, oldest, newest) ->
observeBusy kind count oldest newest
| _ -> ()
Loading