Skip to content

Commit

Permalink
Use github.com/openhistogram/circonusllhist for TrendSinks
Browse files Browse the repository at this point in the history
This is a proof-of-concept for how we can use HDR/Sparse histograms for k6 Trend metrics.
  • Loading branch information
na-- committed Jul 17, 2023
1 parent 3a8e387 commit 66bda38
Show file tree
Hide file tree
Showing 12 changed files with 1,921 additions and 86 deletions.
1 change: 1 addition & 0 deletions cmd/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func getAllOutputConstructors() (map[string]output.Constructor, error) {
// TODO: re-enable, currently it results in a compile error because this type doesn't
// implement the new metrics.Sink interface, with the Drain() and Merge() methods:
// https://github.com/grafana/xk6-output-prometheus-remote/blob/v0.2.1/pkg/remotewrite/remotewrite.go#L388
// it also initializes &metrics.TrendSink{} directly, which will no longer work
//
// "experimental-prometheus-rw": func(params output.Params) (output.Output, error) {
// return remotewrite.New(params)
Expand Down
2 changes: 1 addition & 1 deletion cmd/tests/cmd_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1770,7 +1770,7 @@ export default function() {};`
}

func TestPrometheusRemoteWriteOutput(t *testing.T) {
t.Skip("test currently panics, since prometheus-rw implements a custom Sink that no longer satisfies the interface")
t.Skip("test currently panics, since prometheus-rw implements a custom Sink that no longer satisfies the interface (and directly uses &metrics.TrendSink{})")
t.Parallel()

ts := NewGlobalTestState(t)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd
github.com/mstoykov/envconfig v1.4.1-0.20220114105314-765c6d8c76f1
github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d
github.com/openhistogram/circonusllhist v0.3.1-0.20210609143308-c78ce013c914
github.com/pmezard/go-difflib v1.0.0
github.com/serenize/snaker v0.0.0-20201027110005-a7ad2135616e
github.com/sirupsen/logrus v1.9.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d/go.mod h1:YUTz3bUH
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.20.2 h1:8uQq0zMgLEfa0vRrrBgaJF2gyW9Da9BmfGV+OyUzfkY=
github.com/openhistogram/circonusllhist v0.3.1-0.20210609143308-c78ce013c914 h1:U6w4Ft711fCT6VbLnG1q/VR0oQYUOa1dazg+9tGdR+4=
github.com/openhistogram/circonusllhist v0.3.1-0.20210609143308-c78ce013c914/go.mod h1:PfeYJ/RW2+Jfv3wTz0upbY2TRour/LLqIm2K2Kw5zg0=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
21 changes: 13 additions & 8 deletions js/summary_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package js

// TODO: rewrite this so checks for Trend metrics are adjusted for the approximate nature of the histograms
/*
import (
"context"
"encoding/json"
Expand Down Expand Up @@ -56,7 +59,7 @@ func TestTextSummary(t *testing.T) {
t, "/script.js",
fmt.Sprintf(`
exports.options = {summaryTrendStats: %s};
exports.default = function() {/* we don't run this, metrics are mocked */};
exports.default = function() {}; // we don't run this, metrics are mocked
`, string(trendStats)),
lib.RuntimeOptions{CompatibilityMode: null.NewString("base", true)},
)
Expand Down Expand Up @@ -111,7 +114,7 @@ func TestTextSummaryWithSubMetrics(t *testing.T) {
runner, err := getSimpleRunner(
t,
"/script.js",
"exports.default = function() {/* we don't run this, metrics are mocked */};",
"exports.default = function() { };", // we don't run this, metrics are mocked
lib.RuntimeOptions{CompatibilityMode: null.NewString("base", true)},
)
require.NoError(t, err)
Expand Down Expand Up @@ -296,7 +299,7 @@ func TestOldJSONExport(t *testing.T) {
t, "/script.js",
`
exports.options = {summaryTrendStats: ["avg", "min", "med", "max", "p(90)", "p(95)", "p(99)", "count"]};
exports.default = function() {/* we don't run this, metrics are mocked */};
exports.default = function() { }; // we don't run this, metrics are mocked
`,
lib.RuntimeOptions{
CompatibilityMode: null.NewString("base", true),
Expand Down Expand Up @@ -562,7 +565,7 @@ func TestRawHandleSummaryData(t *testing.T) {
t, "/script.js",
`
exports.options = {summaryTrendStats: ["avg", "min", "med", "max", "p(90)", "p(95)", "p(99)", "count"]};
exports.default = function() { /* we don't run this, metrics are mocked */ };
exports.default = function() {}; // we don't run this, metrics are mocked
exports.handleSummary = function(data) {
return {'rawdata.json': JSON.stringify(data)};
};
Expand Down Expand Up @@ -599,7 +602,7 @@ func TestRawHandleSummaryDataWithSetupData(t *testing.T) {
t, "/script.js",
`
exports.options = {summaryTrendStats: ["avg", "min", "med", "max", "p(90)", "p(95)", "p(99)", "count"]};
exports.default = function() { /* we don't run this, metrics are mocked */ };
exports.default = function() {}; // we don't run this, metrics are mocked
exports.handleSummary = function(data) {
if(data.setup_data != 5) {
throw new Error("handleSummary: wrong data: " + JSON.stringify(data))
Expand All @@ -625,7 +628,7 @@ func TestRawHandleSummaryPromise(t *testing.T) {
t, "/script.js",
`
exports.options = {summaryTrendStats: ["avg", "min", "med", "max", "p(90)", "p(95)", "p(99)", "count"]};
exports.default = function() { /* we don't run this, metrics are mocked */ };
exports.default = function() { }; // we don't run this, metrics are mocked
exports.handleSummary = async function(data) {
return await Promise.resolve({'dataWithSetup.json': JSON.stringify(data)});
};
Expand All @@ -652,7 +655,7 @@ func TestWrongSummaryHandlerExportTypes(t *testing.T) {
t.Parallel()
runner, err := getSimpleRunner(t, "/script.js",
fmt.Sprintf(`
exports.default = function() { /* we don't run this, metrics are mocked */ };
exports.default = function() {}; // we don't run this, metrics are mocked
exports.handleSummary = %s;
`, tc),
lib.RuntimeOptions{CompatibilityMode: null.NewString("base", true)},
Expand All @@ -675,7 +678,7 @@ func TestExceptionInHandleSummaryFallsBackToTextSummary(t *testing.T) {
logger.AddHook(logHook)
runner, err := getSimpleRunner(t, "/script.js", `
exports.default = function() {/* we don't run this, metrics are mocked */};
exports.default = function() {}; // we don't run this, metrics are mocked
exports.handleSummary = function(data) {
throw new Error('intentional error');
};
Expand All @@ -700,3 +703,5 @@ func TestExceptionInHandleSummaryFallsBackToTextSummary(t *testing.T) {
require.NoError(t, err)
assert.Contains(t, errMsg, "intentional error")
}
*/
3 changes: 2 additions & 1 deletion metrics/engine/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

func TestIngesterOutputFlushMetrics(t *testing.T) {
t.Skipf("fix, this test relies on the fact that Trends work with precise numbers, not HDR histograms")
t.Parallel()

piState := newTestPreInitState(t)
Expand Down Expand Up @@ -44,7 +45,7 @@ func TestIngesterOutputFlushMetrics(t *testing.T) {
assert.Equal(t, testMetric, metric)

sink := metric.Sink.(*metrics.TrendSink) //nolint:forcetypeassert
assert.Equal(t, 42.0, sink.Sum)
assert.Equal(t, 42.0, sink.Avg()*float64(sink.Count()))
}

func TestIngesterOutputFlushSubmetrics(t *testing.T) {
Expand Down
120 changes: 47 additions & 73 deletions metrics/sink.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package metrics

import (
"encoding/json"
"bytes"
"fmt"
"math"
"sort"
"time"

"github.com/openhistogram/circonusllhist"
)

var (
Expand Down Expand Up @@ -159,91 +160,62 @@ func (g *GaugeSink) Merge(from []byte) error {

// NewTrendSink makes a Trend sink with the OpenHistogram circllhist histogram.
func NewTrendSink() *TrendSink {
return &TrendSink{}
return &TrendSink{
hist: circonusllhist.New(circonusllhist.NoLocks()),
}
}

// TrendSink uses the OpenHistogram circllhist histogram to store metrics data.
type TrendSink struct {
values []float64
sorted bool
hist *circonusllhist.Histogram

count uint64
min, max float64
// TODO: unexport after this dependency is removed:
// https://github.com/grafana/xk6-output-prometheus-remote/blob/v0.2.1/pkg/remotewrite/remotewrite.go#L173
// TODO: delete, this is hack so experimental-prometheus-rw can compile
Sum float64
}

// IsEmpty indicates whether the TrendSink is empty.
func (t *TrendSink) IsEmpty() bool { return t.count == 0 }

func (t *TrendSink) Add(s Sample) {
if t.count == 0 {
t.max, t.min = s.Value, s.Value
} else {
if s.Value > t.max {
t.max = s.Value
}
if s.Value < t.min {
t.min = s.Value
}
func (t *TrendSink) nanToZero(val float64) float64 {
if math.IsNaN(val) {
return 0
}

t.values = append(t.values, s.Value)
t.sorted = false
t.count++
t.Sum += s.Value
return val
}

// P calculates the given percentile from sink values.
func (t *TrendSink) P(pct float64) float64 {
switch t.count {
case 0:
return 0
case 1:
return t.values[0]
default:
if !t.sorted {
sort.Float64s(t.values)
t.sorted = true
}

// If percentile falls on a value in Values slice, we return that value.
// If percentile does not fall on a value in Values slice, we calculate (linear interpolation)
// the value that would fall at percentile, given the values above and below that percentile.
i := pct * (float64(t.count) - 1.0)
j := t.values[int(math.Floor(i))]
k := t.values[int(math.Ceil(i))]
f := i - math.Floor(i)
return j + (k-j)*f
}
// IsEmpty indicates whether the TrendSink is empty.
func (t *TrendSink) IsEmpty() bool { return t.hist.Count() == 0 }

func (t *TrendSink) Add(s Sample) {
// TODO: handle the error, log something when there's an error
_ = t.hist.RecordValue(s.Value)
}

// Min returns the minimum value.
// Min returns the approximate minimum value from the histogram.
func (t *TrendSink) Min() float64 {
return t.min
return t.nanToZero(t.hist.Min())
}

// Max returns the maximum value.
// Max returns the approximate maximum value from the histogram.
func (t *TrendSink) Max() float64 {
return t.max
return t.nanToZero(t.hist.Max())
}

// Count returns the number of recorded values.
func (t *TrendSink) Count() uint64 {
return t.count
return t.hist.Count()
}

// Avg returns the average (i.e. mean) value.
// Avg returns the approximate average (i.e. mean) value from the histogram.
func (t *TrendSink) Avg() float64 {
if t.count > 0 {
return t.Sum / float64(t.count)
}
return 0
return t.nanToZero(t.hist.ApproxMean())
}

// Total returns the total (i.e. "sum") value for all measurements.
// Total returns the approximate total (i.e. "sum") value for all measurements.
func (t *TrendSink) Total() float64 {
return t.Sum
return t.nanToZero(t.hist.ApproxSum())
}

// P calculates the given percentile from sink values.
func (t *TrendSink) P(pct float64) float64 {
return t.nanToZero(t.hist.ValueAtQuantile(pct))
}

func (t *TrendSink) Format(tt time.Duration) map[string]float64 {
Expand All @@ -259,25 +231,27 @@ func (t *TrendSink) Format(tt time.Duration) map[string]float64 {
}

// Drain encodes the current sink values and clears them.
//
// TODO: obviously use something more efficient (e.g. protobuf)
func (t *TrendSink) Drain() ([]byte, error) {
res, err := json.Marshal(t.values)
*t = TrendSink{}
return res, err
b := &bytes.Buffer{} // TODO: reuse buffers?
if err := t.hist.Serialize(b); err != nil {
return nil, err
}
t.hist.Reset()
return b.Bytes(), nil
}

// Merge decoeds the given values and merges them with the values in the current sink.
func (t *TrendSink) Merge(from []byte) error {
// TODO: obviously use something more efficient (e.g. protobuf), this is
// just for demo purposes
var values []float64
if err := json.Unmarshal(from, &values); err != nil {
b := bytes.NewBuffer(from)

hist, err := circonusllhist.DeserializeWithOptions(
b, circonusllhist.NoLocks(), // TODO: investigate circonusllhist.NoLookup
)
if err != nil {
return err
}
for _, v := range values {
t.Add(Sample{Value: v})
}

t.hist.Merge(hist)
return nil
}

Expand Down
7 changes: 4 additions & 3 deletions metrics/sink_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package metrics

import (
"math"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewSink(t *testing.T) {
Expand All @@ -19,7 +17,7 @@ func TestNewSink(t *testing.T) {
{mt: Counter, sink: &CounterSink{}},
{mt: Gauge, sink: &GaugeSink{}},
{mt: Rate, sink: &RateSink{}},
{mt: Trend, sink: NewTrendSink()},
// {mt: Trend, sink: NewTrendSink()}, //TODO: fix wrong assumption
}
for _, tc := range tests {
assert.Equal(t, tc.sink, NewSink(tc.mt))
Expand Down Expand Up @@ -92,6 +90,8 @@ func TestGaugeSink(t *testing.T) {
})
}

/*
TODO: figure out some more appropriate tests for such a histogram implementation
func TestTrendSink(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -225,6 +225,7 @@ func TestTrendSink(t *testing.T) {
}
})
}
*/

func TestRateSink(t *testing.T) {
samples6 := []float64{1.0, 0.0, 1.0, 0.0, 0.0, 1.0}
Expand Down
Loading

0 comments on commit 66bda38

Please sign in to comment.