Skip to content

Commit

Permalink
Refactor exemplars to not use generic argument (open-telemetry#5285)
Browse files Browse the repository at this point in the history
* Refactor exemplars to not use generic argument

* Update internal/aggregate

* Update metric SDK

* Test exemplar value type

* Add TestCollectExemplars

* Fix lint

---------

Co-authored-by: Sam Xie <[email protected]>
  • Loading branch information
MrAlias and XSAM authored May 7, 2024
1 parent f8b9fe3 commit 2f662db
Show file tree
Hide file tree
Showing 24 changed files with 334 additions and 126 deletions.
16 changes: 8 additions & 8 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ import (
// Note: This will only return non-nil values when the experimental exemplar
// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable
// is not set to always_off.
func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir[N] {
func reservoirFunc(agg Aggregation) func() exemplar.Reservoir {
if !x.Exemplars.Enabled() {
return nil
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
resF := func() func() exemplar.Reservoir[N] {
resF := func() func() exemplar.Reservoir {
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() exemplar.Reservoir[N] {
return func() exemplar.Reservoir {
bounds := cp
return exemplar.Histogram[N](bounds)
return exemplar.Histogram(bounds)
}
}

Expand Down Expand Up @@ -61,8 +61,8 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir
}
}

return func() exemplar.Reservoir[N] {
return exemplar.FixedSize[N](n)
return func() exemplar.Reservoir {
return exemplar.FixedSize(n)
}
}

Expand All @@ -73,12 +73,12 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.Reservoir
case "always_on":
return resF()
case "always_off":
return exemplar.Drop[N]
return exemplar.Drop
case "trace_based":
fallthrough
default:
newR := resF()
return func() exemplar.Reservoir[N] {
return func() exemplar.Reservoir {
return exemplar.SampledFilter(newR())
}
}
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Builder[N int64 | float64] struct {
//
// If this is not provided a default factory function that returns an
// exemplar.Drop reservoir will be used.
ReservoirFunc func() exemplar.Reservoir[N]
ReservoirFunc func() exemplar.Reservoir
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
Expand All @@ -50,12 +50,12 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}

func (b Builder[N]) resFunc() func() exemplar.Reservoir[N] {
func (b Builder[N]) resFunc() func() exemplar.Reservoir {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}

return exemplar.Drop[N]
return exemplar.Drop
}

type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ var (
}
)

func dropExemplars[N int64 | float64]() exemplar.Reservoir[N] {
return exemplar.Drop[N]()
func dropExemplars[N int64 | float64]() exemplar.Reservoir {
return exemplar.Drop()
}

func TestBuilderFilter(t *testing.T) {
Expand Down
42 changes: 42 additions & 0 deletions sdk/metric/internal/aggregate/exemplar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"sync"

"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

var exemplarPool = sync.Pool{
New: func() any { return new([]exemplar.Exemplar) },
}

func collectExemplars[N int64 | float64](out *[]metricdata.Exemplar[N], f func(*[]exemplar.Exemplar)) {
dest := exemplarPool.Get().(*[]exemplar.Exemplar)
defer func() {
*dest = (*dest)[:0]
exemplarPool.Put(dest)
}()

*dest = reset(*dest, len(*out), cap(*out))

f(dest)

*out = reset(*out, len(*dest), cap(*dest))
for i, e := range *dest {
(*out)[i].FilteredAttributes = e.FilteredAttributes
(*out)[i].Time = e.Time
(*out)[i].SpanID = e.SpanID
(*out)[i].TraceID = e.TraceID

switch e.Value.Type() {
case exemplar.Int64ValueType:
(*out)[i].Value = N(e.Value.Int64())
case exemplar.Float64ValueType:
(*out)[i].Value = N(e.Value.Float64())
}
}
}
50 changes: 50 additions & 0 deletions sdk/metric/internal/aggregate/exemplar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package aggregate

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func TestCollectExemplars(t *testing.T) {
t.Run("Int64", testCollectExemplars[int64]())
t.Run("Float64", testCollectExemplars[float64]())
}

func testCollectExemplars[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) {
now := time.Now()
alice := attribute.String("user", "Alice")
value := N(1)
spanID := [8]byte{0x1}
traceID := [16]byte{0x1}

out := new([]metricdata.Exemplar[N])
collectExemplars(out, func(in *[]exemplar.Exemplar) {
*in = reset(*in, 1, 1)
(*in)[0] = exemplar.Exemplar{
FilteredAttributes: []attribute.KeyValue{alice},
Time: now,
Value: exemplar.NewValue(value),
SpanID: spanID[:],
TraceID: traceID[:],
}
})

assert.Equal(t, []metricdata.Exemplar[N]{{
FilteredAttributes: []attribute.KeyValue{alice},
Time: now,
Value: value,
SpanID: spanID[:],
TraceID: traceID[:],
}}, *out)
}
}
12 changes: 6 additions & 6 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
// expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct {
attrs attribute.Set
res exemplar.Reservoir[N]
res exemplar.Reservoir

count uint64
min N
Expand Down Expand Up @@ -282,7 +282,7 @@ func (b *expoBuckets) downscale(delta int) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir[N]) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
Expand All @@ -305,7 +305,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int

newRes func() exemplar.Reservoir[N]
newRes func() exemplar.Reservoir
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
Expand Down Expand Up @@ -333,7 +333,7 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib
e.values[attr.Equivalent()] = v
}
v.record(value)
v.res.Offer(ctx, t, value, droppedAttr)
v.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr)
}

func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
Expand Down Expand Up @@ -376,7 +376,7 @@ func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
hDPts[i].Max = metricdata.NewExtrema(val.max)
}

val.res.Collect(&hDPts[i].Exemplars)
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)

i++
}
Expand Down Expand Up @@ -429,7 +429,7 @@ func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int {
hDPts[i].Max = metricdata.NewExtrema(val.max)
}

val.res.Collect(&hDPts[i].Exemplars)
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)

i++
// TODO (#3006): This will use an unbounded amount of memory if there
Expand Down
14 changes: 7 additions & 7 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

type buckets[N int64 | float64] struct {
attrs attribute.Set
res exemplar.Reservoir[N]
res exemplar.Reservoir

counts []uint64
count uint64
Expand Down Expand Up @@ -48,13 +48,13 @@ type histValues[N int64 | float64] struct {
noSum bool
bounds []float64

newRes func() exemplar.Reservoir[N]
newRes func() exemplar.Reservoir
limit limiter[*buckets[N]]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}

func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.Reservoir[N]) *histValues[N] {
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.Reservoir) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
Expand Down Expand Up @@ -106,12 +106,12 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
if !s.noSum {
b.sum(value)
}
b.res.Offer(ctx, t, value, droppedAttr)
b.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr)
}

// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir[N]) *histogram[N] {
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noSum, limit, r),
noMinMax: noMinMax,
Expand Down Expand Up @@ -163,7 +163,7 @@ func (s *histogram[N]) delta(dest *metricdata.Aggregation) int {
hDPts[i].Max = metricdata.NewExtrema(val.max)
}

val.res.Collect(&hDPts[i].Exemplars)
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)

i++
}
Expand Down Expand Up @@ -219,7 +219,7 @@ func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int {
hDPts[i].Max = metricdata.NewExtrema(val.max)
}

val.res.Collect(&hDPts[i].Exemplars)
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)

i++
// TODO (#3006): This will use an unbounded amount of memory if there
Expand Down
10 changes: 5 additions & 5 deletions sdk/metric/internal/aggregate/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ type datapoint[N int64 | float64] struct {
attrs attribute.Set
timestamp time.Time
value N
res exemplar.Reservoir[N]
res exemplar.Reservoir
}

func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) *lastValue[N] {
func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *lastValue[N] {
return &lastValue[N]{
newRes: r,
limit: newLimiter[datapoint[N]](limit),
Expand All @@ -33,7 +33,7 @@ func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir[N])
type lastValue[N int64 | float64] struct {
sync.Mutex

newRes func() exemplar.Reservoir[N]
newRes func() exemplar.Reservoir
limit limiter[datapoint[N]]
values map[attribute.Distinct]datapoint[N]
}
Expand All @@ -53,7 +53,7 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.
d.attrs = attr
d.timestamp = t
d.value = value
d.res.Offer(ctx, t, value, droppedAttr)
d.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr)

s.values[attr.Equivalent()] = d
}
Expand All @@ -72,7 +72,7 @@ func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) {
// ignored.
(*dest)[i].Time = v.timestamp
(*dest)[i].Value = v.value
v.res.Collect(&(*dest)[i].Exemplars)
collectExemplars(&(*dest)[i].Exemplars, v.res.Collect)
i++
}
// Do not report stale values.
Expand Down
Loading

0 comments on commit 2f662db

Please sign in to comment.