diff --git a/Cargo.lock b/Cargo.lock index f3afd2043..8519d621f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2833,6 +2833,7 @@ dependencies = [ "crossterm", "dns-resolver", "futures", + "futures-util", "human_bytes", "humantime", "indoc", diff --git a/crates/kcli/Cargo.toml b/crates/kcli/Cargo.toml index 2be3e480f..99ac14547 100644 --- a/crates/kcli/Cargo.toml +++ b/crates/kcli/Cargo.toml @@ -14,6 +14,7 @@ clap = {version="4.5", features=["derive", "wrap_help"]} clap-markdown = "0.1" dns-resolver = {path="../dns-resolver"} futures = "0.3" +futures-util = "0.3" humantime = "2.1" lexicmp = "0.1" message = {path="../message", default-features=false} diff --git a/crates/kcli/src/main.rs b/crates/kcli/src/main.rs index 26d48f16c..1a5ab9ddd 100644 --- a/crates/kcli/src/main.rs +++ b/crates/kcli/src/main.rs @@ -9,6 +9,7 @@ mod bounce_cancel; mod bounce_list; mod inspect_message; mod logfilter; +mod provider_summary; mod queue_summary; mod rebind; mod suspend; @@ -64,6 +65,7 @@ enum SubCommand { SuspendReadyQCancel(suspend_ready_q_cancel::SuspendReadyQCancelCommand), SetLogFilter(logfilter::SetLogFilterCommand), InspectMessage(inspect_message::InspectMessageCommand), + ProviderSummary(provider_summary::ProviderSummaryCommand), QueueSummary(queue_summary::QueueSummaryCommand), TraceSmtpClient(trace_smtp_client::TraceSmtpClientCommand), TraceSmtpServer(trace_smtp_server::TraceSmtpServerCommand), @@ -123,6 +125,7 @@ impl SubCommand { Self::SuspendReadyQList(cmd) => cmd.run(endpoint).await, Self::SetLogFilter(cmd) => cmd.run(endpoint).await, Self::InspectMessage(cmd) => cmd.run(endpoint).await, + Self::ProviderSummary(cmd) => cmd.run(endpoint).await, Self::QueueSummary(cmd) => cmd.run(endpoint).await, Self::TraceSmtpClient(cmd) => cmd.run(endpoint).await, Self::TraceSmtpServer(cmd) => cmd.run(endpoint).await, diff --git a/crates/kcli/src/provider_summary.rs b/crates/kcli/src/provider_summary.rs new file mode 100644 index 000000000..74038b83a --- /dev/null +++ b/crates/kcli/src/provider_summary.rs @@ -0,0 +1,375 @@ +use crate::queue_summary::get_metrics; +use anyhow::Context; +use clap::Parser; +use dns_resolver::MailExchanger; +use futures_util::stream::FuturesUnordered; +use futures_util::StreamExt; +use lexicmp::natural_lexical_cmp; +use message::message::QueueNameComponents; +use reqwest::Url; +use std::cmp::Ordering; +use std::collections::HashMap; +use std::sync::Arc; +use tabout::{Alignment, Column}; + +/// Prints a summary of the aggregate state of the queues from the perspective +/// of the provider or destination site. +/// +/// Note that this output format is subject to change and is not suitable +/// for a machine to parse. It is expressly unstable and you must not +/// depend upon it in automation. +/// +/// The data behind this output is pulled from the metrics endpoint, +/// which is machine readable. +/// +/// The default output mode is to show the total volume of traffic +/// grouped by the provider, or, if not applicable provider matching +/// rules were defined on the server, the site name that is derived +/// from the MX records for a domain. +/// +/// The data is shown ordered by descending volume, where volume is the sum +/// of the delivered, failed, transiently failed and queued message counts. +/// +/// The --by-pool flag will further sub-divide the display by the egress pool. +/// +/// The column labels have the following meanings: +/// +/// PROVIDER - either the provider (if explicitly set through the config on the server), +/// or the site name for the underlying domain. +/// +/// POOL - (when --by-pool is used) the name of the egress pool +/// +/// D - the total number of delivered messages +/// +/// T - the total number of transiently failed messages +/// +/// F - the total number of failed/bounced messages +/// +/// Q - the total number of ready and scheduled messages in queue +/// +/// C - the current number of open connections +/// +/// DOMAINS - (when --show-domains is used) a list of domains that correspond to +/// rows that do not have an explicitly configured provider. +#[derive(Debug, Parser)] +pub struct ProviderSummaryCommand { + /// Include a POOL column in the output, and break down the volume on + /// a per-pool basis. + #[arg(long)] + by_pool: bool, + + /// For rows that were not matched on the server by provider rules we will + /// normally show a site-name in the place of the PROVIDER column. + /// + /// When --show-domains is enabled, an additional DOMAINS column will be + /// added to the output to hold a list of domains which correspond to + /// that site-name. + /// + /// This option is only capable of filling in the list + /// of domains if any of those domains have delayed messages residing + /// in their respective scheduled queues at the time that this command + /// was invoked. + #[arg(long)] + show_domains: bool, + + /// Limit results to LIMIT results + #[arg(long)] + limit: Option, +} + +#[derive(Default)] +struct ProviderMetrics { + name: String, + delivered: usize, + transfail: usize, + fail: usize, + queue_size: usize, + pool: Option, + connections: usize, +} + +impl ProviderMetrics { + fn volume(&self) -> usize { + self.queue_size + self.delivered + self.transfail + self.fail + } +} + +impl ProviderSummaryCommand { + pub async fn run(&self, endpoint: &Url) -> anyhow::Result<()> { + let mut provider_metrics = HashMap::new(); + let mut provider_by_pool = HashMap::new(); + let mut domain_resolution = FuturesUnordered::new(); + let limiter = Arc::new(tokio::sync::Semaphore::new(128)); + let _: Vec<()> = get_metrics(endpoint, |m| { + let name = m.name().as_str(); + match name { + "scheduled_count" if self.show_domains => { + let queue = m.labels().get("queue").unwrap(); + let components = QueueNameComponents::parse(queue); + let domain = components + .routing_domain + .unwrap_or(components.domain) + .to_string(); + let limiter = limiter.clone(); + domain_resolution.push(tokio::spawn(async move { + match limiter.acquire().await { + Ok(permit) => { + let mx_result = MailExchanger::resolve(&domain).await; + drop(permit); + (domain, mx_result) + } + Err(err) => (domain, Err(err).context("failed to acquire permit")), + } + })); + } + + "total_messages_delivered_by_provider" + | "connection_count_by_provider" + | "total_messages_fail_by_provider" + | "total_messages_transfail_by_provider" + | "queued_count_by_provider" => { + if !self.by_pool { + let provider = m.labels().get("provider").unwrap(); + let value = m.value() as usize; + + let entry = + provider_metrics + .entry(provider.to_string()) + .or_insert_with(|| ProviderMetrics { + name: provider.to_string(), + ..Default::default() + }); + + match name { + "total_messages_delivered_by_provider" => { + entry.delivered += value; + } + "total_messages_fail_by_provider" => { + entry.fail += value; + } + "total_messages_transfail_by_provider" => { + entry.transfail += value; + } + "queued_count_by_provider" => { + entry.queue_size += value; + } + "connection_count_by_provider" => { + entry.connections += value; + } + _ => {} + } + } + } + "total_messages_delivered_by_provider_and_source" + | "connection_count_by_provider_and_pool" + | "total_messages_fail_by_provider_and_source" + | "total_messages_transfail_by_provider_and_source" + | "queued_count_by_provider_and_pool" => { + if self.by_pool { + let provider = m.labels().get("provider").unwrap(); + let pool = m.labels().get("pool").unwrap(); + let value = m.value() as usize; + + let entry = provider_by_pool + .entry((provider.to_string(), pool.to_string())) + .or_insert_with(|| ProviderMetrics { + name: provider.to_string(), + pool: Some(pool.to_string()), + ..Default::default() + }); + + match name { + "total_messages_delivered_by_provider_and_source" => { + entry.delivered += value; + } + "total_messages_fail_by_provider_and_source" => { + entry.fail += value; + } + "total_messages_transfail_by_provider_and_source" => { + entry.transfail += value; + } + "queued_count_by_provider_and_pool" => { + entry.queue_size += value; + } + "connection_count_by_provider_and_pool" => { + entry.connections += value; + } + _ => {} + } + } + } + _ => {} + } + None + }) + .await?; + + let mut site_to_domains: HashMap> = HashMap::new(); + while let Some(Ok((domain, result))) = domain_resolution.next().await { + if let Ok(mx) = result { + site_to_domains + .entry(mx.site_name.to_string()) + .or_default() + .push(domain); + } + } + + if self.by_pool { + let mut provider_by_pool: Vec<_> = + provider_by_pool.into_iter().map(|(_k, v)| v).collect(); + + provider_by_pool.sort_by(|a, b| match b.volume().cmp(&a.volume()) { + Ordering::Equal => match natural_lexical_cmp(&a.name, &b.name) { + Ordering::Equal => { + natural_lexical_cmp(&a.pool.as_ref().unwrap(), &b.pool.as_ref().unwrap()) + } + ordering => ordering, + }, + ordering => ordering, + }); + let mut columns = vec![ + Column { + name: "PROVIDER".to_string(), + alignment: Alignment::Left, + }, + Column { + name: "POOL".to_string(), + alignment: Alignment::Left, + }, + Column { + name: "D".to_string(), + alignment: Alignment::Right, + }, + Column { + name: "T".to_string(), + alignment: Alignment::Right, + }, + Column { + name: "F".to_string(), + alignment: Alignment::Right, + }, + Column { + name: "Q".to_string(), + alignment: Alignment::Right, + }, + Column { + name: "C".to_string(), + alignment: Alignment::Right, + }, + ]; + + if !site_to_domains.is_empty() { + columns.push(Column { + name: "DOMAINS".to_string(), + alignment: Alignment::Left, + }); + } + + let mut rows = vec![]; + for m in &provider_by_pool { + if let Some(limit) = self.limit { + if rows.len() >= limit { + break; + } + } + let mut row = vec![ + m.name.to_string(), + m.pool.as_ref().unwrap().to_string(), + m.delivered.to_string(), + m.transfail.to_string(), + m.fail.to_string(), + m.queue_size.to_string(), + m.connections.to_string(), + ]; + + if let Some(domains) = resolve_domains(&mut site_to_domains, &m.name) { + row.push(domains); + } + + rows.push(row); + } + + tabout::tabulate_output(&columns, &rows, &mut std::io::stdout())?; + } else { + let mut provider_metrics: Vec<_> = + provider_metrics.into_iter().map(|(_k, v)| v).collect(); + // Order by queue size DESC, then name + provider_metrics.sort_by(|a, b| match b.volume().cmp(&a.volume()) { + Ordering::Equal => natural_lexical_cmp(&a.name, &b.name), + ordering => ordering, + }); + let mut columns = vec![ + Column { + name: "PROVIDER".to_string(), + alignment: Alignment::Left, + }, + Column { + name: "D".to_string(), + alignment: Alignment::Right, + }, + Column { + name: "T".to_string(), + alignment: Alignment::Right, + }, + Column { + name: "F".to_string(), + alignment: Alignment::Right, + }, + Column { + name: "Q".to_string(), + alignment: Alignment::Right, + }, + Column { + name: "C".to_string(), + alignment: Alignment::Right, + }, + ]; + + if !site_to_domains.is_empty() { + columns.push(Column { + name: "DOMAINS".to_string(), + alignment: Alignment::Left, + }); + } + + let mut rows = vec![]; + for m in &provider_metrics { + if let Some(limit) = self.limit { + if rows.len() >= limit { + break; + } + } + let mut row = vec![ + m.name.to_string(), + m.delivered.to_string(), + m.transfail.to_string(), + m.fail.to_string(), + m.queue_size.to_string(), + m.connections.to_string(), + ]; + + if let Some(domains) = resolve_domains(&mut site_to_domains, &m.name) { + row.push(domains); + } + + rows.push(row); + } + + tabout::tabulate_output(&columns, &rows, &mut std::io::stdout())?; + } + + Ok(()) + } +} + +fn resolve_domains( + site_to_domains: &mut HashMap>, + site: &str, +) -> Option { + if let Some(domains) = site_to_domains.get_mut(site) { + domains.sort(); + Some(domains.join(", ")) + } else { + None + } +} diff --git a/crates/kumo-prometheus/src/counter_bundle.rs b/crates/kumo-prometheus/src/counter_bundle.rs new file mode 100644 index 000000000..7d8909a40 --- /dev/null +++ b/crates/kumo-prometheus/src/counter_bundle.rs @@ -0,0 +1,42 @@ +/// counter_bundle declares a struct holding a bundle of counters +/// that should be incremented or decremented together. This +/// is used to facilitate computing rolled up metrics. +#[macro_export] +macro_rules! counter_bundle { + (pub struct $name:ident { + $( + pub $fieldname:ident: AtomicCounter, + )* + } + ) => { + #[derive(Clone)] + pub struct $name { + $( + pub $fieldname: AtomicCounter, + )* + } + + impl $name { + pub fn inc(&self) { + $( + self.$fieldname.inc(); + )* + } + pub fn dec(&self) { + $( + self.$fieldname.dec(); + )* + } + pub fn sub(&self, n: usize) { + $( + self.$fieldname.sub(n); + )* + } + pub fn inc_by(&self, n: usize) { + $( + self.$fieldname.inc_by(n); + )* + } + } + }; +} diff --git a/crates/kumo-prometheus/src/lib.rs b/crates/kumo-prometheus/src/lib.rs index ecdddcd96..3ec47dfc8 100644 --- a/crates/kumo-prometheus/src/lib.rs +++ b/crates/kumo-prometheus/src/lib.rs @@ -12,6 +12,7 @@ use std::hash::Hash; use std::sync::Arc; mod counter; +pub mod counter_bundle; #[macro_use] pub mod labels; diff --git a/crates/kumod/src/delivery_metrics.rs b/crates/kumod/src/delivery_metrics.rs index a55570b0c..a472fdd1c 100644 --- a/crates/kumod/src/delivery_metrics.rs +++ b/crates/kumod/src/delivery_metrics.rs @@ -1,29 +1,55 @@ -use kumo_prometheus::AtomicCounter; +use crate::metrics_helper::{ + BorrowedProviderAndPoolKey, BorrowedProviderAndSourceKey, BorrowedProviderKey, + ProviderAndPoolKeyTrait, ProviderAndSourceKeyTrait, ProviderKeyTrait, + QUEUED_COUNT_GAUGE_BY_PROVIDER, QUEUED_COUNT_GAUGE_BY_PROVIDER_AND_POOL, + TOTAL_MSGS_DELIVERED_BY_PROVIDER, TOTAL_MSGS_DELIVERED_BY_PROVIDER_AND_SOURCE, + TOTAL_MSGS_FAIL_BY_PROVIDER, TOTAL_MSGS_FAIL_BY_PROVIDER_AND_SOURCE, + TOTAL_MSGS_TRANSFAIL_BY_PROVIDER, TOTAL_MSGS_TRANSFAIL_BY_PROVIDER_AND_SOURCE, +}; +use kumo_prometheus::{counter_bundle, AtomicCounter}; use once_cell::sync::Lazy; use parking_lot::Mutex; use prometheus::Histogram; use std::collections::HashMap; use std::sync::Arc; +counter_bundle! { + pub struct ReadyCountBundle { + pub ready_count_by_service: AtomicCounter, + pub global_ready_count: AtomicCounter, + pub queued_by_provider: AtomicCounter, + pub queued_by_provider_and_pool: AtomicCounter, + } +} +counter_bundle! { + pub struct DispositionBundle { + pub msgs: AtomicCounter, + pub provider: AtomicCounter, + pub source_provider: AtomicCounter, + pub global: AtomicCounter, + } +} +counter_bundle! { + pub struct ConnectionGaugeBundle { + pub connections: AtomicCounter, + pub global: AtomicCounter, + pub provider: AtomicCounter, + pub provider_and_pool: AtomicCounter, + } +} + #[derive(Clone)] pub struct DeliveryMetrics { - connection_gauge: AtomicCounter, - global_connection_gauge: AtomicCounter, + connection_gauge: ConnectionGaugeBundle, connection_total: AtomicCounter, global_connection_total: AtomicCounter, - pub ready_count: AtomicCounter, - pub global_ready_count: AtomicCounter, + pub ready_count: ReadyCountBundle, pub ready_full: AtomicCounter, - msgs_delivered: AtomicCounter, - global_msgs_delivered: AtomicCounter, - - msgs_transfail: AtomicCounter, - global_msgs_transfail: AtomicCounter, - - msgs_fail: AtomicCounter, - global_msgs_fail: AtomicCounter, + delivered: DispositionBundle, + transfail: DispositionBundle, + fail: DispositionBundle, pub deliver_message_rollup: Histogram, } @@ -37,7 +63,6 @@ impl std::fmt::Debug for DeliveryMetrics { impl DeliveryMetrics { pub fn wrap_connection(&self, client: T) -> MetricsWrappedConnection { self.connection_gauge.inc(); - self.global_connection_gauge.inc(); self.connection_total.inc(); self.global_connection_total.inc(); MetricsWrappedConnection { @@ -47,7 +72,14 @@ impl DeliveryMetrics { } } - pub fn new(service: &str, service_type: &str) -> Self { + pub fn new( + service: &str, + service_type: &str, + pool: &str, + source: &str, + provider_name: &Option, + site_name: &str, + ) -> Self { // Since these metrics live in a pruning registry, we want to take an extra // step to pin these global counters into the register. We do that by holding // on to them for the life of the program. @@ -93,49 +125,105 @@ impl DeliveryMetrics { } }; + let provider = provider_name + .as_ref() + .map(|s| s.as_str()) + .unwrap_or(site_name); + + let provider_pool_key = BorrowedProviderAndPoolKey { provider, pool }; + let provider_source_key = BorrowedProviderAndSourceKey { + provider, + source, + pool, + }; + let provider_key = BorrowedProviderKey { provider }; + + let source_provider_msgs_fail = TOTAL_MSGS_FAIL_BY_PROVIDER_AND_SOURCE + .get_or_create(&provider_source_key as &dyn ProviderAndSourceKeyTrait); + let source_provider_msgs_delivered = TOTAL_MSGS_DELIVERED_BY_PROVIDER_AND_SOURCE + .get_or_create(&provider_source_key as &dyn ProviderAndSourceKeyTrait); + let source_provider_msgs_transfail = TOTAL_MSGS_TRANSFAIL_BY_PROVIDER_AND_SOURCE + .get_or_create(&provider_source_key as &dyn ProviderAndSourceKeyTrait); + + let provider_msgs_fail = + TOTAL_MSGS_FAIL_BY_PROVIDER.get_or_create(&provider_key as &dyn ProviderKeyTrait); + let provider_msgs_delivered = + TOTAL_MSGS_DELIVERED_BY_PROVIDER.get_or_create(&provider_key as &dyn ProviderKeyTrait); + let provider_msgs_transfail = + TOTAL_MSGS_TRANSFAIL_BY_PROVIDER.get_or_create(&provider_key as &dyn ProviderKeyTrait); + + let ready_count = ReadyCountBundle { + ready_count_by_service: crate::metrics_helper::ready_count_gauge_for_service(&service), + global_ready_count: globals.global_ready_count.clone(), + queued_by_provider: QUEUED_COUNT_GAUGE_BY_PROVIDER + .get_or_create(&provider_key as &dyn ProviderKeyTrait), + queued_by_provider_and_pool: QUEUED_COUNT_GAUGE_BY_PROVIDER_AND_POOL + .get_or_create(&provider_pool_key as &dyn ProviderAndPoolKeyTrait), + }; + + let delivered = DispositionBundle { + msgs: crate::metrics_helper::total_msgs_delivered_for_service(&service), + global: globals.global_msgs_delivered.clone(), + provider: provider_msgs_delivered, + source_provider: source_provider_msgs_delivered, + }; + + let transfail = DispositionBundle { + msgs: crate::metrics_helper::total_msgs_transfail_for_service(&service), + global: globals.global_msgs_transfail.clone(), + provider: provider_msgs_transfail, + source_provider: source_provider_msgs_transfail, + }; + + let fail = DispositionBundle { + msgs: crate::metrics_helper::total_msgs_fail_for_service(&service), + global: globals.global_msgs_fail.clone(), + provider: provider_msgs_fail, + source_provider: source_provider_msgs_fail, + }; + + let connection_gauge = ConnectionGaugeBundle { + connections: crate::metrics_helper::connection_gauge_for_service(&service), + global: globals.global_connection_gauge.clone(), + provider: crate::metrics_helper::CONN_GAUGE_BY_PROVIDER + .get_or_create(&provider_key as &dyn ProviderKeyTrait), + provider_and_pool: crate::metrics_helper::CONN_GAUGE_BY_PROVIDER_AND_POOL + .get_or_create(&provider_pool_key as &dyn ProviderAndPoolKeyTrait), + }; + DeliveryMetrics { - connection_gauge: crate::metrics_helper::connection_gauge_for_service(&service), - global_connection_gauge: globals.global_connection_gauge.clone(), + connection_gauge, connection_total: crate::metrics_helper::connection_total_for_service(&service), global_connection_total: globals.global_connection_total.clone(), ready_full: crate::metrics_helper::ready_full_counter_for_service(&service), - ready_count: crate::metrics_helper::ready_count_gauge_for_service(&service), - global_ready_count: globals.global_ready_count.clone(), - msgs_delivered: crate::metrics_helper::total_msgs_delivered_for_service(&service), - global_msgs_delivered: globals.global_msgs_delivered.clone(), - msgs_transfail: crate::metrics_helper::total_msgs_transfail_for_service(&service), - global_msgs_transfail: globals.global_msgs_transfail.clone(), - msgs_fail: crate::metrics_helper::total_msgs_fail_for_service(&service), - global_msgs_fail: globals.global_msgs_fail.clone(), + ready_count, deliver_message_rollup: crate::metrics_helper::deliver_message_rollup_for_service( service_type, ), + delivered, + transfail, + fail, } } pub fn inc_transfail(&self) { - self.msgs_transfail.inc(); - self.global_msgs_transfail.inc(); + self.transfail.inc(); } pub fn inc_transfail_by(&self, amount: usize) { - self.msgs_transfail.inc_by(amount); - self.global_msgs_transfail.inc_by(amount); + self.transfail.inc_by(amount); } pub fn inc_fail(&self) { - self.msgs_fail.inc(); - self.global_msgs_fail.inc(); + self.fail.inc(); } pub fn inc_fail_by(&self, amount: usize) { - self.msgs_fail.inc_by(amount); - self.global_msgs_fail.inc_by(amount); + self.fail.inc_by(amount); } pub fn inc_delivered(&self) { - self.msgs_delivered.inc(); - self.global_msgs_delivered.inc(); + self.delivered.inc(); } } @@ -163,7 +251,6 @@ impl MetricsWrappedConnection { pub fn take(mut self) -> T { if self.armed { self.metrics.connection_gauge.dec(); - self.metrics.global_connection_gauge.dec(); self.armed = false; } self.client.take().expect("to take only once") @@ -174,7 +261,6 @@ impl Drop for MetricsWrappedConnection { fn drop(&mut self) { if self.armed { self.metrics.connection_gauge.dec(); - self.metrics.global_connection_gauge.dec(); } } } diff --git a/crates/kumod/src/metrics_helper.rs b/crates/kumod/src/metrics_helper.rs index 14f407125..accb1c33e 100644 --- a/crates/kumod/src/metrics_helper.rs +++ b/crates/kumod/src/metrics_helper.rs @@ -7,10 +7,41 @@ label_key! { pub service: String, } } +label_key! { + pub struct ProviderKey { + pub provider: String, + } +} +label_key! { + pub struct ProviderAndSourceKey { + pub provider: String, + pub source: String, + pub pool: String, + } +} +label_key! { + pub struct ProviderAndPoolKey { + pub provider: String, + pub pool: String, + } +} pub static CONN_GAUGE: Lazy> = Lazy::new(|| { PruningCounterRegistry::register_gauge("connection_count", "number of active connections") }); +pub static CONN_GAUGE_BY_PROVIDER: Lazy> = Lazy::new(|| { + PruningCounterRegistry::register_gauge( + "connection_count_by_provider", + "number of active connections", + ) +}); +pub static CONN_GAUGE_BY_PROVIDER_AND_POOL: Lazy> = + Lazy::new(|| { + PruningCounterRegistry::register_gauge( + "connection_count_by_provider_and_pool", + "number of active connections", + ) + }); pub static CONN_DENIED: Lazy> = Lazy::new(|| { PruningCounterRegistry::register( "total_connections_denied", @@ -43,9 +74,74 @@ pub static TOTAL_MSGS_FAIL: Lazy> = Lazy::new "total number of message delivery attempts that permanently failed", ) }); + +pub static TOTAL_MSGS_DELIVERED_BY_PROVIDER: Lazy> = + Lazy::new(|| { + PruningCounterRegistry::register( + "total_messages_delivered_by_provider", + "total number of messages ever delivered", + ) + }); +pub static TOTAL_MSGS_TRANSFAIL_BY_PROVIDER: Lazy> = + Lazy::new(|| { + PruningCounterRegistry::register( + "total_messages_transfail_by_provider", + "total number of message delivery attempts that transiently failed", + ) + }); +pub static TOTAL_MSGS_FAIL_BY_PROVIDER: Lazy> = + Lazy::new(|| { + PruningCounterRegistry::register( + "total_messages_fail_by_provider", + "total number of message delivery attempts that permanently failed", + ) + }); + +pub static TOTAL_MSGS_DELIVERED_BY_PROVIDER_AND_SOURCE: Lazy< + PruningCounterRegistry, +> = Lazy::new(|| { + PruningCounterRegistry::register( + "total_messages_delivered_by_provider_and_source", + "total number of messages ever delivered", + ) +}); +pub static TOTAL_MSGS_TRANSFAIL_BY_PROVIDER_AND_SOURCE: Lazy< + PruningCounterRegistry, +> = Lazy::new(|| { + PruningCounterRegistry::register( + "total_messages_transfail_by_provider_and_source", + "total number of message delivery attempts that transiently failed", + ) +}); +pub static TOTAL_MSGS_FAIL_BY_PROVIDER_AND_SOURCE: Lazy< + PruningCounterRegistry, +> = Lazy::new(|| { + PruningCounterRegistry::register( + "total_messages_fail_by_provider_and_source", + "total number of message delivery attempts that permanently failed", + ) +}); + pub static READY_COUNT_GAUGE: Lazy> = Lazy::new(|| { - PruningCounterRegistry::register("ready_count", "number of messages in the ready queue") + PruningCounterRegistry::register_gauge("ready_count", "number of messages in the ready queue") }); + +pub static QUEUED_COUNT_GAUGE_BY_PROVIDER: Lazy> = + Lazy::new(|| { + PruningCounterRegistry::register_gauge( + "queued_count_by_provider", + "number of messages in the scheduled and ready queue", + ) + }); +pub static QUEUED_COUNT_GAUGE_BY_PROVIDER_AND_POOL: Lazy< + PruningCounterRegistry, +> = Lazy::new(|| { + PruningCounterRegistry::register_gauge( + "queued_count_by_provider_and_pool", + "number of messages in the scheduled and ready queue", + ) +}); + pub static TOTAL_MSGS_RECVD: Lazy> = Lazy::new(|| { CounterRegistry::register( "total_messages_received", diff --git a/crates/kumod/src/queue.rs b/crates/kumod/src/queue.rs index 2ba3d73f4..a132fcc13 100644 --- a/crates/kumod/src/queue.rs +++ b/crates/kumod/src/queue.rs @@ -5,6 +5,10 @@ use crate::http_server::admin_suspend_v1::AdminSuspendEntry; use crate::http_server::inject_v1::{make_generate_queue_config, GENERATOR_QUEUE_NAME}; use crate::logging::disposition::{log_disposition, LogDisposition, RecordType}; use crate::lua_deliver::LuaDeliveryProtocol; +use crate::metrics_helper::{ + BorrowedProviderAndPoolKey, BorrowedProviderKey, ProviderAndPoolKeyTrait, ProviderKeyTrait, + QUEUED_COUNT_GAUGE_BY_PROVIDER, QUEUED_COUNT_GAUGE_BY_PROVIDER_AND_POOL, +}; use crate::ready_queue::ReadyQueueManager; use crate::smtp_dispatcher::SmtpProtocol; use crate::spool::SpoolManager; @@ -14,7 +18,7 @@ use config::epoch::{get_current_epoch, ConfigEpoch}; use config::{load_config, CallbackSignature, LuaConfig}; use crossbeam_skiplist::SkipSet; use kumo_api_types::egress_path::ConfigRefreshStrategy; -use kumo_prometheus::{label_key, AtomicCounter, PruningCounterRegistry}; +use kumo_prometheus::{counter_bundle, label_key, AtomicCounter, PruningCounterRegistry}; use kumo_server_common::config_handle::ConfigHandle; use kumo_server_lifecycle::{is_shutting_down, Activity, ShutdownSubcription}; use kumo_server_runtime::{get_main_runtime, spawn, spawn_blocking_on, Runtime}; @@ -170,10 +174,18 @@ const ONE_DAY: Duration = Duration::from_secs(86400); const ONE_MINUTE: Duration = Duration::from_secs(60); const TEN_MINUTES: Duration = Duration::from_secs(10 * 60); +counter_bundle! { + pub struct ScheduledCountBundle { + pub delay_gauge: AtomicCounter, + pub queued_by_provider: AtomicCounter, + pub queued_by_provider_and_pool: AtomicCounter, + pub by_domain: AtomicCounter, + } +} + struct ScheduledMetrics { name: Arc, - scheduled: AtomicCounter, - by_domain: AtomicCounter, + scheduled: ScheduledCountBundle, by_tenant: Option, by_tenant_campaign: Option, delay_due_to_message_rate_throttle: OnceCell, @@ -182,7 +194,7 @@ struct ScheduledMetrics { } impl ScheduledMetrics { - pub fn new(name: Arc) -> Self { + pub fn new(name: Arc, pool: &str, site: &str, provider_name: &Option) -> Self { let components = QueueNameComponents::parse(&name); let queue_key = BorrowedQueueKey { @@ -192,8 +204,6 @@ impl ScheduledMetrics { domain: components.domain, }; - let scheduled = DELAY_GAUGE.get_or_create(&queue_key as &dyn QueueKeyTrait); - let by_domain = DOMAIN_GAUGE.get_or_create(&domain_key as &dyn DomainKeyTrait); let by_tenant = components.tenant.map(|tenant| { @@ -211,9 +221,29 @@ impl ScheduledMetrics { None => None, }; + let provider_key = match provider_name { + Some(provider) => BorrowedProviderKey { provider }, + None => BorrowedProviderKey { provider: site }, + }; + let provider_pool_key = match provider_name { + Some(provider) => BorrowedProviderAndPoolKey { provider, pool }, + None => BorrowedProviderAndPoolKey { + provider: site, + pool, + }, + }; + + let scheduled = ScheduledCountBundle { + delay_gauge: DELAY_GAUGE.get_or_create(&queue_key as &dyn QueueKeyTrait), + queued_by_provider: QUEUED_COUNT_GAUGE_BY_PROVIDER + .get_or_create(&provider_key as &dyn ProviderKeyTrait), + queued_by_provider_and_pool: QUEUED_COUNT_GAUGE_BY_PROVIDER_AND_POOL + .get_or_create(&provider_pool_key as &dyn ProviderAndPoolKeyTrait), + by_domain, + }; + Self { name, - by_domain, by_tenant, by_tenant_campaign, scheduled, @@ -252,7 +282,6 @@ impl ScheduledMetrics { pub fn inc(&self) { TOTAL_DELAY_GAUGE.inc(); self.scheduled.inc(); - self.by_domain.inc(); self.by_tenant.as_ref().map(|m| m.inc()); self.by_tenant_campaign.as_ref().map(|m| m.inc()); } @@ -260,7 +289,6 @@ impl ScheduledMetrics { pub fn sub(&self, amount: usize) { TOTAL_DELAY_GAUGE.sub(amount as i64); self.scheduled.sub(amount); - self.by_domain.sub(amount); self.by_tenant.as_ref().map(|m| m.sub(amount)); self.by_tenant_campaign.as_ref().map(|m| m.sub(amount)); } @@ -878,6 +906,7 @@ pub struct Queue { next_config_refresh: StdMutex, warned_strategy_change: AtomicBool, config_epoch: StdMutex, + site_name: String, } impl Queue { @@ -917,13 +946,26 @@ impl Queue { let activity = Activity::get(format!("Queue {name}"))?; let strategy = queue_config.strategy; let next_config_refresh = StdMutex::new(Instant::now() + queue_config.refresh_interval); + + let queue_config = ConfigHandle::new(queue_config); + let site_name = match ReadyQueueManager::compute_queue_name( + &name, + &queue_config, + "unspecified", + ) + .await + { + Ok(ready_name) => ready_name.site_name, + Err(_) => name.clone(), + }; + let name = Arc::new(name); let handle = Arc::new(Queue { name: name.clone(), queue: QueueStructure::new(strategy), last_change: StdMutex::new(Instant::now()), - queue_config: ConfigHandle::new(queue_config), + queue_config, notify_maintainer: Arc::new(Notify::new()), metrics: OnceCell::new(), activity, @@ -931,6 +973,7 @@ impl Queue { next_config_refresh, warned_strategy_change: AtomicBool::new(false), config_epoch: StdMutex::new(epoch), + site_name, }); if !matches!(strategy, QueueStrategy::SingletonTimerWheel) { @@ -1433,8 +1476,15 @@ impl Queue { } fn metrics(&self) -> &ScheduledMetrics { - self.metrics - .get_or_init(|| ScheduledMetrics::new(self.name.clone())) + self.metrics.get_or_init(|| { + let queue_config = self.queue_config.borrow(); + ScheduledMetrics::new( + self.name.clone(), + queue_config.egress_pool.as_deref().unwrap_or("unspecified"), + &self.site_name, + &queue_config.provider_name, + ) + }) } #[instrument(skip(self, msg))] diff --git a/crates/kumod/src/ready_queue.rs b/crates/kumod/src/ready_queue.rs index 1e43e1645..ea0393c01 100644 --- a/crates/kumod/src/ready_queue.rs +++ b/crates/kumod/src/ready_queue.rs @@ -1,4 +1,4 @@ -use crate::delivery_metrics::DeliveryMetrics; +use crate::delivery_metrics::{DeliveryMetrics, ReadyCountBundle}; use crate::egress_source::EgressSource; use crate::http_server::admin_bounce_v1::AdminBounceEntry; use crate::http_server::admin_suspend_ready_q_v1::{ @@ -19,7 +19,6 @@ use config::{load_config, CallbackSignature}; use crossbeam_queue::ArrayQueue; use dns_resolver::MailExchanger; use kumo_api_types::egress_path::{ConfigRefreshStrategy, EgressPathConfig}; -use kumo_prometheus::AtomicCounter; use kumo_server_common::config_handle::ConfigHandle; use kumo_server_lifecycle::{is_shutting_down, Activity, ShutdownSubcription}; use kumo_server_memory::{get_headroom, low_memory, subscribe_to_memory_status_changes}; @@ -59,23 +58,20 @@ pub fn set_readyq_threads(n: usize) { pub struct Fifo { queue: ArcSwap>, - count: AtomicCounter, - global_count: AtomicCounter, + count: ReadyCountBundle, } impl Fifo { - pub fn new(capacity: usize, count: AtomicCounter, global_count: AtomicCounter) -> Self { + pub fn new(capacity: usize, count: ReadyCountBundle) -> Self { Self { queue: Arc::new(ArrayQueue::new(capacity)).into(), count, - global_count, } } pub fn push(&self, msg: Message) -> Result<(), Message> { self.queue.load().push(msg)?; self.count.inc(); - self.global_count.inc(); Ok(()) } @@ -83,7 +79,6 @@ impl Fifo { pub fn pop(&self) -> Option { let msg = self.queue.load().pop()?; self.count.dec(); - self.global_count.dec(); Some(msg) } @@ -95,7 +90,6 @@ impl Fifo { messages.push(msg); } self.count.sub(messages.len()); - self.global_count.sub(messages.len()); messages } @@ -128,7 +122,6 @@ impl Fifo { } } self.count.sub(messages.len()); - self.global_count.sub(messages.len()); messages } @@ -333,11 +326,19 @@ impl ReadyQueueManager { .expect("failed to spawn maintainer"); let proto = queue_config.borrow().protocol.metrics_protocol_name(); let service = format!("{proto}:{name}"); - let metrics = DeliveryMetrics::new(&service, &proto); + let metrics = DeliveryMetrics::new( + &service, + &proto, + egress_pool, + &egress_source.name, + &path_config.provider_name, + mx.as_ref() + .map(|m| m.site_name.as_str()) + .unwrap_or(queue_name), + ); let ready = Arc::new(Fifo::new( path_config.max_ready, metrics.ready_count.clone(), - metrics.global_ready_count.clone(), )); let notify_dispatcher = Arc::new(Notify::new()); let next_config_refresh = Instant::now() + path_config.refresh_interval; @@ -656,7 +657,7 @@ impl ReadyQueue { self.name, suspend.is_some(), self.ready_count(), - self.metrics.ready_count.get(), + self.metrics.ready_count.ready_count_by_service.get(), ); if self.activity.is_shutting_down() {