Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subscriber: use Span::record for attribute updates #264

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 196 additions & 0 deletions console-subscriber/src/attribute_new.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
use crate::ToProto;
use console_api as proto;
use proto::field::Value as UpdateValue;
use proto::{field::Name, MetaId};
use std::collections::HashMap;
use std::ptr;
use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicU8, Ordering::*};
use tracing::field::FieldSet;

#[derive(Debug)]
pub(crate) struct Attributes {
attributes: HashMap<proto::field::Name, Attribute>,
}

#[derive(Debug)]
pub(crate) struct Attribute {
name: proto::field::Name,
meta_id: MetaId,
value: Value,
unit: Option<String>,
}

#[derive(Debug)]
pub(crate) struct Value {
str_val: AtomicPtr<String>,
other_val: AtomicU64,
val_type: AtomicU8,
}

const EMPTY: u8 = 0;
const BOOL: u8 = 1;
const U64: u8 = 2;
const I64: u8 = 3;
const STR: u8 = 4;
const DEBUG: u8 = 5;

#[derive(Debug, Clone)]
pub(crate) struct Update {
name: proto::field::Name,
is_delta: bool,
value: proto::field::Value,
}

// // === impl Attributes ===

impl Attributes {
pub(crate) const STATE_PREFIX: &'static str = "state.";

pub(crate) fn new(meta_id: MetaId, fields: &FieldSet) -> Self {
let attributes = fields
.iter()
.filter_map(|field| {
if field.name().starts_with(Attributes::STATE_PREFIX) {
let mut parts = field.name().split('.');
parts.next();
if let Some(name) = parts.next() {
return Some((name.into(), parts.next()));
}
}
None
})
.map(|(name, unit): (Name, Option<&str>)| {
let value = Value {
str_val: AtomicPtr::new(ptr::null_mut()),
other_val: AtomicU64::new(0),
val_type: AtomicU8::new(0),
};
let unit = unit.map(Into::into);

let attr = Attribute {
name: name.clone(),
meta_id: meta_id.clone(),
unit,
value,
};
(name, attr)
})
.collect();

Self { attributes }
}

pub(crate) fn update(&self, update: &Update) {
if let Some(attr) = self.attributes.get(&update.name) {
let is_delta = update.is_delta;
let perv_type = attr.value.val_type.swap(update.update_type(), AcqRel);
match (perv_type, &update.value) {
(BOOL | EMPTY, UpdateValue::BoolVal(upd)) => {
attr.value.other_val.store(*upd as u64, Release);
}

(STR, UpdateValue::StrVal(upd)) => {
attr.value
.str_val
.store(Box::into_raw(Box::new(upd.clone())), Release);
}

(DEBUG, UpdateValue::DebugVal(upd)) => {
attr.value
.str_val
.store(Box::into_raw(Box::new(upd.clone())), Release);
}

(U64 | EMPTY, UpdateValue::U64Val(upd)) => {
if is_delta && perv_type != EMPTY {
attr.value.other_val.fetch_add(*upd, Release);
} else {
attr.value.other_val.store(*upd, Release);
}
}
(I64 | EMPTY, UpdateValue::I64Val(upd)) => {
if is_delta && perv_type != EMPTY {
attr.value
.other_val
.fetch_update(AcqRel, Acquire, |v| {
Some(((v as i64) + (*upd as i64)) as u64)
})
.unwrap();
} else {
attr.value.other_val.store(*upd as u64, Release);
}
}
(val, update) => {
tracing::warn!(
"attribute {:?} cannot be updated by update {:?}",
val,
update
);
}
}
}
}

pub(crate) fn values(&self) -> impl Iterator<Item = &Attribute> {
self.attributes.values()
}
}

// // === impl Update ===

impl Update {
pub(crate) fn new(
name: proto::field::Name,
value: proto::field::Value,
is_delta: bool,
) -> Self {
Self {
name,
is_delta,
value,
}
}
fn update_type(&self) -> u8 {
match self.value {
UpdateValue::BoolVal(_) => BOOL,
UpdateValue::StrVal(_) => STR,
UpdateValue::DebugVal(_) => DEBUG,
UpdateValue::U64Val(_) => U64,
UpdateValue::I64Val(_) => I64,
}
}
}

impl ToProto for Attribute {
type Output = Option<proto::Attribute>;

fn to_proto(&self) -> Self::Output {
if let Some(value) = self.value.to_proto() {
return Some(proto::Attribute {
field: Some(proto::Field {
metadata_id: Some(self.meta_id.clone()),
name: Some(self.name.clone()),
value: Some(value),
}),
unit: self.unit.clone(),
});
}
None
}
}

impl ToProto for Value {
type Output = Option<proto::field::Value>;

fn to_proto(&self) -> Self::Output {
use proto::field::Value as ProtoVal;
match self.val_type.load(Acquire) {
BOOL => Some(ProtoVal::BoolVal(self.other_val.load(Acquire) != 0)),
U64 => Some(ProtoVal::U64Val(self.other_val.load(Acquire) as u64)),
I64 => Some(ProtoVal::I64Val(self.other_val.load(Acquire) as i64)),
DEBUG => Some(ProtoVal::StrVal("HAHA".to_string())),
STR => Some(ProtoVal::StrVal("HAHA".to_string())),
_ => None,
}
}
}
83 changes: 80 additions & 3 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tracing_subscriber::{

mod aggregator;
mod attribute;
mod attribute_new;
mod builder;
mod callsites;
mod record;
Expand All @@ -40,7 +41,10 @@ pub use builder::Builder;
use callsites::Callsites;
use record::Recorder;
use stack::SpanStack;
use visitors::{AsyncOpVisitor, ResourceVisitor, ResourceVisitorResult, TaskVisitor, WakerVisitor};
use visitors::{
AsyncOpVisitor, NewStateUpdateVisitor, ResourceVisitor, ResourceVisitorResult, TaskVisitor,
WakerVisitor,
};

pub use builder::{init, spawn};

Expand Down Expand Up @@ -479,6 +483,55 @@ impl ConsoleLayer {
}
}
}

fn record_updates<S>(
&self,
span: SpanRef<'_, S>,
updates: Vec<attribute_new::Update>,
ctx: &Context<'_, S>,
) where
S: Subscriber + for<'a> LookupSpan<'a>,
{
if self.is_resource(span.metadata()) {
self.state_update_new(span, updates, ctx, |exts| {
exts.get::<Arc<stats::ResourceStats>>()
.map(<Arc<stats::ResourceStats> as std::ops::Deref>::deref)
})
} else if self.is_async_op(span.metadata()) {
self.state_update_new(span, updates, ctx, |exts| {
let async_op = exts.get::<Arc<stats::AsyncOpStats>>()?;
Some(&async_op.stats)
})
}
}

fn state_update_new<S>(
&self,
span: SpanRef<'_, S>,
updates: Vec<attribute_new::Update>,
ctx: &Context<'_, S>,
get_stats: impl for<'a> Fn(&'a Extensions) -> Option<&'a stats::ResourceStats>,
) where
S: Subscriber + for<'a> LookupSpan<'a>,
{
let exts = span.extensions();
let stats = match get_stats(&exts) {
Some(stats) => stats,
None => return,
};

for upd in updates.iter() {
stats.update_attribute_new(upd);
if let Some(parent) = stats.parent_id.as_ref().and_then(|parent| ctx.span(parent)) {
let exts = parent.extensions();
if let Some(stats) = get_stats(&exts) {
if stats.inherit_child_attributes {
stats.update_attribute_new(upd);
}
}
}
}
}
}

impl<S> Layer<S> for ConsoleLayer
Expand Down Expand Up @@ -570,10 +623,12 @@ where
self.first_entered(&stack.borrow(), |id| self.is_id_resource(id, &ctx))
});
if let Some(stats) = self.send_stats(&self.shared.dropped_resources, move || {
let meta_id = metadata.into();
let stats = Arc::new(stats::ResourceStats::new(
at,
inherit_child_attrs,
parent_id.clone(),
attribute_new::Attributes::new(meta_id, attrs.fields()),
));
let event = Event::Resource {
id: id.clone(),
Expand All @@ -587,7 +642,13 @@ where
};
(event, stats)
}) {
ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
let span = ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!");
span.extensions_mut().insert(stats);

// now record initial attrs
let mut attr_visitor = NewStateUpdateVisitor::default();
attrs.record(&mut attr_visitor);
self.record_updates(span, attr_visitor.updates, &ctx)
}
}
return;
Expand All @@ -609,10 +670,12 @@ where
if let Some(resource_id) = resource_id {
if let Some(stats) =
self.send_stats(&self.shared.dropped_async_ops, move || {
let meta_id = metadata.into();
let stats = Arc::new(stats::AsyncOpStats::new(
at,
inherit_child_attrs,
parent_id.clone(),
attribute_new::Attributes::new(meta_id, attrs.fields()),
));
let event = Event::AsyncResourceOp {
id: id.clone(),
Expand All @@ -625,7 +688,13 @@ where
(event, stats)
})
{
ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!").extensions_mut().insert(stats);
let span = ctx.span(id).expect("if `on_new_span` was called, the span must exist; this is a `tracing` bug!");
span.extensions_mut().insert(stats);

// now record initial attrs
let mut attr_visitor = NewStateUpdateVisitor::default();
attrs.record(&mut attr_visitor);
self.record_updates(span, attr_visitor.updates, &ctx)
}
}
}
Expand Down Expand Up @@ -842,6 +911,14 @@ where
});
}
}

fn on_record(&self, id: &span::Id, values: &span::Record<'_>, cx: Context<'_, S>) {
if let Some(span) = cx.span(id) {
let mut attr_visitor = NewStateUpdateVisitor::default();
values.record(&mut attr_visitor);
self.record_updates(span, attr_visitor.updates, &cx)
}
}
}

impl fmt::Debug for ConsoleLayer {
Expand Down
Loading