diff --git a/api/v1/group_routes_test.go b/api/v1/group_routes_test.go index e9b7868ab41..4f919e32043 100644 --- a/api/v1/group_routes_test.go +++ b/api/v1/group_routes_test.go @@ -46,7 +46,7 @@ func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurfa execScheduler, err := execution.NewScheduler(testState) require.NoError(tb, err) - me, err := engine.NewMetricsEngine(execScheduler.GetState()) + me, err := engine.NewMetricsEngine(testState) require.NoError(tb, err) ctx, cancel := context.WithCancel(context.Background()) diff --git a/api/v1/setup_teardown_routes_test.go b/api/v1/setup_teardown_routes_test.go index 449e39696a5..790b8e14647 100644 --- a/api/v1/setup_teardown_routes_test.go +++ b/api/v1/setup_teardown_routes_test.go @@ -140,7 +140,7 @@ func TestSetupData(t *testing.T) { execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) - metricsEngine, err := engine.NewMetricsEngine(execScheduler.GetState()) + metricsEngine, err := engine.NewMetricsEngine(testState) require.NoError(t, err) globalCtx, globalCancel := context.WithCancel(context.Background()) diff --git a/api/v1/status_routes_test.go b/api/v1/status_routes_test.go index ba2f174e391..db15b0d47e0 100644 --- a/api/v1/status_routes_test.go +++ b/api/v1/status_routes_test.go @@ -114,7 +114,7 @@ func TestPatchStatus(t *testing.T) { execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) - metricsEngine, err := engine.NewMetricsEngine(execScheduler.GetState()) + metricsEngine, err := engine.NewMetricsEngine(testState) require.NoError(t, err) globalCtx, globalCancel := context.WithCancel(context.Background()) diff --git a/cmd/run.go b/cmd/run.go index d1338ba1d94..0dcd8d8e85d 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -120,7 +120,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { } executionState := execScheduler.GetState() - metricsEngine, err := engine.NewMetricsEngine(executionState) + metricsEngine, err := engine.NewMetricsEngine(executionState.Test) if err != nil { return err } @@ -178,7 +178,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { }() if !testRunState.RuntimeOptions.NoThresholds.Bool { //nolint:nestif - finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort) + finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort, executionState.GetCurrentTestRunDuration) defer func() { if finalizeThresholds == nil { return diff --git a/js/runner_test.go b/js/runner_test.go index ec9ab13bdd6..8ce8e92e440 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -387,7 +387,7 @@ func TestDataIsolation(t *testing.T) { execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) - metricsEngine, err := engine.NewMetricsEngine(execScheduler.GetState()) + metricsEngine, err := engine.NewMetricsEngine(testRunState) require.NoError(t, err) globalCtx, globalCancel := context.WithCancel(context.Background()) @@ -401,7 +401,7 @@ func TestDataIsolation(t *testing.T) { require.NoError(t, err) defer stopOutputs(nil) - finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort) + finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort, execScheduler.GetState().GetCurrentTestRunDuration) require.Nil(t, finalizeThresholds) require.Empty(t, runner.defaultGroup.Groups) diff --git a/metrics/engine/engine.go b/metrics/engine/engine.go index 3fa561409d1..4c4107de1a1 100644 --- a/metrics/engine/engine.go +++ b/metrics/engine/engine.go @@ -25,9 +25,8 @@ const thresholdsRate = 2 * time.Second // aggregated metric sample values. They are used to generate the end-of-test // summary and to evaluate the test thresholds. type MetricsEngine struct { - es *lib.ExecutionState - logger logrus.FieldLogger - + logger logrus.FieldLogger + test *lib.TestRunState outputIngester *outputIngester // These can be both top-level metrics or sub-metrics @@ -45,15 +44,14 @@ type MetricsEngine struct { } // NewMetricsEngine creates a new metrics Engine with the given parameters. -func NewMetricsEngine(es *lib.ExecutionState) (*MetricsEngine, error) { +func NewMetricsEngine(runState *lib.TestRunState) (*MetricsEngine, error) { me := &MetricsEngine{ - es: es, - logger: es.Test.Logger.WithField("component", "metrics-engine"), - + test: runState, + logger: runState.Logger.WithField("component", "metrics-engine"), ObservedMetrics: make(map[string]*metrics.Metric), } - if !(me.es.Test.RuntimeOptions.NoSummary.Bool && me.es.Test.RuntimeOptions.NoThresholds.Bool) { + if !(me.test.RuntimeOptions.NoSummary.Bool && me.test.RuntimeOptions.NoThresholds.Bool) { err := me.initSubMetricsAndThresholds() if err != nil { return nil, err @@ -77,10 +75,11 @@ func (me *MetricsEngine) getThresholdMetricOrSubmetric(name string) (*metrics.Me // TODO: replace with strings.Cut after Go 1.18 nameParts := strings.SplitN(name, "{", 2) - metric := me.es.Test.Registry.Get(nameParts[0]) + metric := me.test.Registry.Get(nameParts[0]) if metric == nil { return nil, fmt.Errorf("metric '%s' does not exist in the script", nameParts[0]) } + if len(nameParts) == 1 { // no sub-metric return metric, nil } @@ -126,10 +125,10 @@ func (me *MetricsEngine) markObserved(metric *metrics.Metric) { } func (me *MetricsEngine) initSubMetricsAndThresholds() error { - for metricName, thresholds := range me.es.Test.Options.Thresholds { + for metricName, thresholds := range me.test.Options.Thresholds { metric, err := me.getThresholdMetricOrSubmetric(metricName) - if me.es.Test.RuntimeOptions.NoThresholds.Bool { + if me.test.RuntimeOptions.NoThresholds.Bool { if err != nil { me.logger.WithError(err).Warnf("Invalid metric '%s' in threshold definitions", metricName) } @@ -154,7 +153,7 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error { // TODO: refactor out of here when https://github.com/grafana/k6/issues/1321 // lands and there is a better way to enable a metric with tag - if me.es.Test.Options.SystemTags.Has(metrics.TagExpectedResponse) { + if me.test.Options.SystemTags.Has(metrics.TagExpectedResponse) { _, err := me.getThresholdMetricOrSubmetric("http_req_duration{expected_response:true}") if err != nil { return err // shouldn't happen, but ¯\_(ツ)_/¯ @@ -166,8 +165,10 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error { // StartThresholdCalculations spins up a new goroutine to crunch thresholds and // returns a callback that will stop the goroutine and finalizes calculations. -func (me *MetricsEngine) StartThresholdCalculations(abortRun func(error)) ( - finalize func() (breached []string), +func (me *MetricsEngine) StartThresholdCalculations( + abortRun func(error), + getCurrentTestRunDuration func() time.Duration, +) (finalize func() (breached []string), ) { if len(me.metricsWithThresholds) == 0 { return nil // no thresholds were defined @@ -184,7 +185,7 @@ func (me *MetricsEngine) StartThresholdCalculations(abortRun func(error)) ( for { select { case <-ticker.C: - breached, shouldAbort := me.evaluateThresholds(true) + breached, shouldAbort := me.evaluateThresholds(true, getCurrentTestRunDuration) if shouldAbort { err := fmt.Errorf( "thresholds on metrics '%s' were breached; at least one has abortOnFail enabled, stopping test prematurely", @@ -213,7 +214,7 @@ func (me *MetricsEngine) StartThresholdCalculations(abortRun func(error)) ( close(stop) <-done - breached, _ := me.evaluateThresholds(false) + breached, _ := me.evaluateThresholds(false, getCurrentTestRunDuration) return breached } } @@ -221,11 +222,14 @@ func (me *MetricsEngine) StartThresholdCalculations(abortRun func(error)) ( // evaluateThresholds processes all of the thresholds. // // TODO: refactor, optimize -func (me *MetricsEngine) evaluateThresholds(ignoreEmptySinks bool) (breachedThresholds []string, shouldAbort bool) { +func (me *MetricsEngine) evaluateThresholds( + ignoreEmptySinks bool, + getCurrentTestRunDuration func() time.Duration, +) (breachedThresholds []string, shouldAbort bool) { me.MetricsLock.Lock() defer me.MetricsLock.Unlock() - t := me.es.GetCurrentTestRunDuration() + t := getCurrentTestRunDuration() me.logger.Debugf("Running thresholds on %d metrics...", len(me.metricsWithThresholds)) for _, m := range me.metricsWithThresholds { diff --git a/metrics/engine/engine_test.go b/metrics/engine/engine_test.go index a46e409f39d..06267cfa3c8 100644 --- a/metrics/engine/engine_test.go +++ b/metrics/engine/engine_test.go @@ -1,459 +1,186 @@ package engine -// TODO: refactor and move the tests that still make sense in other packages - -/* - -const isWindows = runtime.GOOS == "windows" - -func TestEngineRun(t *testing.T) { +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.k6.io/k6/lib" + "go.k6.io/k6/lib/testutils" + "go.k6.io/k6/metrics" +) + +func TestNewMetricsEngineWithThresholds(t *testing.T) { t.Parallel() - logrus.SetLevel(logrus.DebugLevel) - t.Run("exits with context", func(t *testing.T) { - t.Parallel() - done := make(chan struct{}) - runner := &minirunner.MiniRunner{ - Fn: func(ctx context.Context, _ *lib.State, _ chan<- metrics.SampleContainer) error { - <-ctx.Done() - close(done) - return nil - }, - } - duration := 100 * time.Millisecond - test := newTestEngine(t, &duration, runner, nil, lib.Options{}) - defer test.wait() - - startTime := time.Now() - assert.ErrorContains(t, test.run(), "context deadline exceeded") - assert.WithinDuration(t, startTime.Add(duration), time.Now(), 100*time.Millisecond) - <-done - }) - t.Run("exits with executor", func(t *testing.T) { - t.Parallel() - test := newTestEngine(t, nil, nil, nil, lib.Options{ - VUs: null.IntFrom(10), - Iterations: null.IntFrom(100), - }) - defer test.wait() - assert.NoError(t, test.run()) - assert.Equal(t, uint64(100), test.engine.ExecutionScheduler.GetState().GetFullIterationCount()) - }) - // Make sure samples are discarded after context close (using "cutoff" timestamp in local.go) - t.Run("collects samples", func(t *testing.T) { - t.Parallel() - - piState := getTestPreInitState(t) - testMetric, err := piState.Registry.NewMetric("test_metric", metrics.Trend) - require.NoError(t, err) - - signalChan := make(chan interface{}) - - runner := &minirunner.MiniRunner{ - Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { - metrics.PushIfNotDone(ctx, out, metrics.Sample{ - TimeSeries: metrics.TimeSeries{Metric: testMetric, Tags: piState.Registry.RootTagSet()}, - Time: time.Now(), - Value: 1, - }) - close(signalChan) - <-ctx.Done() - metrics.PushIfNotDone(ctx, out, metrics.Sample{ - TimeSeries: metrics.TimeSeries{Metric: testMetric, Tags: piState.Registry.RootTagSet()}, - Time: time.Now(), Value: 1, - }) - return nil + trs := &lib.TestRunState{ + TestPreInitState: &lib.TestPreInitState{ + Logger: testutils.NewLogger(t), + Registry: metrics.NewRegistry(), + }, + Options: lib.Options{ + Thresholds: map[string]metrics.Thresholds{ + "metric1": {Thresholds: []*metrics.Threshold{}}, + "metric2": {Thresholds: []*metrics.Threshold{}}, }, - } - - mockOutput := mockoutput.New() - test := newTestEngineWithTestPreInitState(t, nil, runner, []output.Output{mockOutput}, lib.Options{ - VUs: null.IntFrom(1), - Iterations: null.IntFrom(1), - }, piState) - - errC := make(chan error) - go func() { errC <- test.run() }() - <-signalChan - test.runAbort(fmt.Errorf("custom error")) - assert.ErrorContains(t, <-errC, "custom error") - test.wait() - - found := 0 - for _, s := range mockOutput.Samples { - if s.Metric != testMetric { - continue - } - found++ - assert.Equal(t, 1.0, s.Value, "wrong value") - } - assert.Equal(t, 1, found, "wrong number of samples") - }) -} - -func TestEngineOutput(t *testing.T) { - t.Parallel() + }, + } + _, err := trs.Registry.NewMetric("metric1", metrics.Counter) + require.NoError(t, err) - piState := getTestPreInitState(t) - testMetric, err := piState.Registry.NewMetric("test_metric", metrics.Trend) + _, err = trs.Registry.NewMetric("metric2", metrics.Counter) require.NoError(t, err) - runner := &minirunner.MiniRunner{ - Fn: func(_ context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { - out <- metrics.Sample{TimeSeries: metrics.TimeSeries{Metric: testMetric}} - return nil - }, - } + me, err := NewMetricsEngine(trs) + require.NoError(t, err) + require.NotNil(t, me) - mockOutput := mockoutput.New() - test := newTestEngineWithTestPreInitState(t, nil, runner, []output.Output{mockOutput}, lib.Options{ - VUs: null.IntFrom(1), - Iterations: null.IntFrom(1), - }, piState) - - assert.NoError(t, test.run()) - test.wait() - - cSamples := []metrics.Sample{} - for _, sample := range mockOutput.Samples { - if sample.Metric == testMetric { - cSamples = append(cSamples, sample) - } - } - metric := test.engine.MetricsEngine.ObservedMetrics["test_metric"] - if assert.NotNil(t, metric) { - sink := metric.Sink.(*metrics.TrendSink) //nolint:forcetypeassert - if assert.NotNil(t, sink) { - numOutputSamples := len(cSamples) - numEngineSamples := len(sink.Values) - assert.Equal(t, numEngineSamples, numOutputSamples) - } - } + assert.Len(t, me.metricsWithThresholds, 2) } -func TestEngine_processSamples(t *testing.T) { +func TestMetricsEngineGetThresholdMetricOrSubmetricError(t *testing.T) { t.Parallel() - t.Run("metric", func(t *testing.T) { - t.Parallel() - - piState := getTestPreInitState(t) - metric, err := piState.Registry.NewMetric("my_metric", metrics.Gauge) - require.NoError(t, err) - - done := make(chan struct{}) - runner := &minirunner.MiniRunner{ - Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { - out <- metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: metric, - Tags: piState.Registry.RootTagSet().WithTagsFromMap(map[string]string{"a": "1"}), - }, - Value: 1.25, - Time: time.Now(), - } - close(done) - return nil - }, - } - test := newTestEngineWithTestPreInitState(t, nil, runner, nil, lib.Options{}, piState) - - go func() { - assert.NoError(t, test.run()) - }() - - select { - case <-done: - return - case <-time.After(10 * time.Second): - assert.Fail(t, "Test should have completed within 10 seconds") - } - - test.wait() - - assert.IsType(t, &metrics.GaugeSink{}, test.engine.MetricsEngine.ObservedMetrics["my_metric"].Sink) - }) - t.Run("submetric", func(t *testing.T) { - t.Parallel() - - piState := getTestPreInitState(t) - metric, err := piState.Registry.NewMetric("my_metric", metrics.Gauge) - require.NoError(t, err) - - ths := metrics.NewThresholds([]string{`value<2`}) - gotParseErr := ths.Parse() - require.NoError(t, gotParseErr) - - done := make(chan struct{}) - runner := &minirunner.MiniRunner{ - Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { - out <- metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: metric, - Tags: piState.Registry.RootTagSet().WithTagsFromMap(map[string]string{"a": "1", "b": "2"}), - }, - Value: 1.25, - Time: time.Now(), - } - close(done) - return nil - }, - } - test := newTestEngineWithTestPreInitState(t, nil, runner, nil, lib.Options{ - Thresholds: map[string]metrics.Thresholds{ - "my_metric{a:1}": ths, - }, - }, piState) - - go func() { - assert.NoError(t, test.run()) - }() + cases := []struct { + metricDefinition string + expErr string + }{ + {metricDefinition: "metric1{test:a", expErr: "missing ending bracket"}, + {metricDefinition: "metric2", expErr: "'metric2' does not exist in the script"}, + {metricDefinition: "metric1{}", expErr: "submetric criteria for metric 'metric1' cannot be empty"}, + } - select { - case <-done: - return - case <-time.After(10 * time.Second): - assert.Fail(t, "Test should have completed within 10 seconds") - } - test.wait() + for _, tc := range cases { + tc := tc + t.Run("", func(t *testing.T) { + t.Parallel() - assert.Len(t, test.engine.MetricsEngine.ObservedMetrics, 2) - sms := test.engine.MetricsEngine.ObservedMetrics["my_metric{a:1}"] - assert.EqualValues(t, map[string]string{"a": "1"}, sms.Sub.Tags.Map()) + me := newTestMetricsEngine(t) + _, err := me.test.Registry.NewMetric("metric1", metrics.Counter) + require.NoError(t, err) - assert.IsType(t, &metrics.GaugeSink{}, test.engine.MetricsEngine.ObservedMetrics["my_metric"].Sink) - assert.IsType(t, &metrics.GaugeSink{}, test.engine.MetricsEngine.ObservedMetrics["my_metric{a:1}"].Sink) - }) + _, err = me.getThresholdMetricOrSubmetric(tc.metricDefinition) + assert.ErrorContains(t, err, tc.expErr) + }) + } } -func TestEngineThresholdsWillAbort(t *testing.T) { +func TestNewMetricsEngineNoThresholds(t *testing.T) { t.Parallel() - piState := getTestPreInitState(t) - metric, err := piState.Registry.NewMetric("my_metric", metrics.Gauge) - require.NoError(t, err) - - // The incoming samples for the metric set it to 1.25. Considering - // the metric is of type Gauge, value > 1.25 should always fail, and - // trigger an abort. - ths := metrics.NewThresholds([]string{"value>1.25"}) - gotParseErr := ths.Parse() - require.NoError(t, gotParseErr) - ths.Thresholds[0].AbortOnFail = true - - thresholds := map[string]metrics.Thresholds{metric.Name: ths} - - done := make(chan struct{}) - runner := &minirunner.MiniRunner{ - Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { - out <- metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: metric, - Tags: piState.Registry.RootTagSet().WithTagsFromMap(map[string]string{"a": "1"}), - }, - Time: time.Now(), - Value: 1.25, - } - close(done) - return nil + trs := &lib.TestRunState{ + TestPreInitState: &lib.TestPreInitState{ + Logger: testutils.NewLogger(t), }, } - test := newTestEngineWithTestPreInitState(t, nil, runner, nil, lib.Options{Thresholds: thresholds}, piState) - go func() { - assert.NoError(t, test.run()) - }() + me, err := NewMetricsEngine(trs) + require.NoError(t, err) + require.NotNil(t, me) - select { - case <-done: - return - case <-time.After(10 * time.Second): - assert.Fail(t, "Test should have completed within 10 seconds") - } - test.wait() - assert.True(t, test.engine.IsTainted()) + assert.Empty(t, me.metricsWithThresholds) } -func TestEngineAbortedByThresholds(t *testing.T) { +func TestMetricsEngineCreateIngester(t *testing.T) { t.Parallel() - piState := getTestPreInitState(t) - metric, err := piState.Registry.NewMetric("my_metric", metrics.Gauge) - require.NoError(t, err) - - // The MiniRunner sets the value of the metric to 1.25. Considering - // the metric is of type Gauge, value > 1.25 should always fail, and - // trigger an abort. - // **N.B**: a threshold returning an error, won't trigger an abort. - ths := metrics.NewThresholds([]string{"value>1.25"}) - gotParseErr := ths.Parse() - require.NoError(t, gotParseErr) - ths.Thresholds[0].AbortOnFail = true - - thresholds := map[string]metrics.Thresholds{metric.Name: ths} - - doneIter := make(chan struct{}) - doneRun := make(chan struct{}) - runner := &minirunner.MiniRunner{ - Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { - out <- metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: metric, - Tags: piState.Registry.RootTagSet().WithTagsFromMap(map[string]string{"a": "1"}), - }, - Time: time.Now(), - Value: 1.25, - } - <-ctx.Done() - close(doneIter) - return nil - }, - } - - test := newTestEngineWithTestPreInitState(t, nil, runner, nil, lib.Options{Thresholds: thresholds}, piState) - defer test.wait() - - go func() { - defer close(doneRun) - t.Logf("test run done with err '%s'", err) - assert.ErrorContains(t, test.run(), "thresholds on metrics 'my_metric' were breached") - }() - - select { - case <-doneIter: - case <-time.After(10 * time.Second): - assert.Fail(t, "Iteration should have completed within 10 seconds") - } - select { - case <-doneRun: - case <-time.After(10 * time.Second): - assert.Fail(t, "Test should have completed within 10 seconds") + me := MetricsEngine{ + logger: testutils.NewLogger(t), } + ingester := me.CreateIngester() + assert.NotNil(t, ingester) + require.NoError(t, ingester.Start()) + require.NoError(t, ingester.Stop()) } -func TestEngine_processThresholds(t *testing.T) { +func TestMetricsEngineEvaluateThresholdNoAbort(t *testing.T) { t.Parallel() - testdata := map[string]struct { - pass bool - ths map[string][]string + cases := []struct { + threshold string + abortOnFail bool + expBreached []string }{ - "passing": {true, map[string][]string{"my_metric": {"value<2"}}}, - "failing": {false, map[string][]string{"my_metric": {"value>1.25"}}}, - - "submetric,match,passing": {true, map[string][]string{"my_metric{a:1}": {"value<2"}}}, - "submetric,match,failing": {false, map[string][]string{"my_metric{a:1}": {"value>1.25"}}}, - "submetric,nomatch,passing": {true, map[string][]string{"my_metric{a:2}": {"value<2"}}}, - "submetric,nomatch,failing": {false, map[string][]string{"my_metric{a:2}": {"value>1.25"}}}, - - "unused,passing": {true, map[string][]string{"unused_counter": {"count==0"}}}, - "unused,failing": {false, map[string][]string{"unused_counter": {"count>1"}}}, - "unused,subm,passing": {true, map[string][]string{"unused_counter{a:2}": {"count<1"}}}, - "unused,subm,failing": {false, map[string][]string{"unused_counter{a:2}": {"count>1"}}}, - - "used,passing": {true, map[string][]string{"used_counter": {"count==2"}}}, - "used,failing": {false, map[string][]string{"used_counter": {"count<1"}}}, - "used,subm,passing": {true, map[string][]string{"used_counter{b:1}": {"count==2"}}}, - "used,not-subm,passing": {true, map[string][]string{"used_counter{b:2}": {"count==0"}}}, - "used,invalid-subm,passing1": {true, map[string][]string{"used_counter{c:''}": {"count==0"}}}, - "used,invalid-subm,failing1": {false, map[string][]string{"used_counter{c:''}": {"count>0"}}}, - "used,invalid-subm,passing2": {true, map[string][]string{"used_counter{c:}": {"count==0"}}}, - "used,invalid-subm,failing2": {false, map[string][]string{"used_counter{c:}": {"count>0"}}}, + {threshold: "count>5", expBreached: nil}, + {threshold: "count<5", expBreached: []string{"m1"}}, + {threshold: "count<5", expBreached: []string{"m1"}, abortOnFail: true}, } - for name, data := range testdata { - name, data := name, data - t.Run(name, func(t *testing.T) { + for _, tc := range cases { + tc := tc + t.Run(tc.threshold, func(t *testing.T) { t.Parallel() + me := newTestMetricsEngine(t) - piState := getTestPreInitState(t) - gaugeMetric, err := piState.Registry.NewMetric("my_metric", metrics.Gauge) - require.NoError(t, err) - counterMetric, err := piState.Registry.NewMetric("used_counter", metrics.Counter) + m1, err := me.test.Registry.NewMetric("m1", metrics.Counter) require.NoError(t, err) - _, err = piState.Registry.NewMetric("unused_counter", metrics.Counter) + m2, err := me.test.Registry.NewMetric("m2", metrics.Counter) require.NoError(t, err) - thresholds := make(map[string]metrics.Thresholds, len(data.ths)) - for m, srcs := range data.ths { - ths := metrics.NewThresholds(srcs) - gotParseErr := ths.Parse() - require.NoError(t, gotParseErr) - thresholds[m] = ths - } - - test := newTestEngineWithTestPreInitState( - t, nil, nil, nil, lib.Options{Thresholds: thresholds}, piState, - ) + ths := metrics.NewThresholds([]string{tc.threshold}) + require.NoError(t, ths.Parse()) + m1.Thresholds = ths + m1.Thresholds.Thresholds[0].AbortOnFail = tc.abortOnFail - tag1 := piState.Registry.RootTagSet().With("a", "1") - tag2 := piState.Registry.RootTagSet().With("b", "1") + me.metricsWithThresholds = []*metrics.Metric{m1, m2} + m1.Sink.Add(metrics.Sample{Value: 6.0}) - test.engine.OutputManager.AddMetricSamples( - []metrics.SampleContainer{ - metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: gaugeMetric, - Tags: tag1, - }, - Time: time.Now(), - Value: 1.25, - }, - metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: counterMetric, - Tags: tag2, - }, - Time: time.Now(), - Value: 2, - }, - }, - ) - - require.NoError(t, test.run()) - test.wait() - - assert.Equal(t, data.pass, !test.engine.IsTainted()) + breached, abort := me.evaluateThresholds(false, zeroTestRunDuration) + require.Equal(t, tc.abortOnFail, abort) + assert.Equal(t, tc.expBreached, breached) }) } } -func getMetricSum(mo *mockoutput.MockOutput, name string) (result float64) { - for _, sc := range mo.SampleContainers { - for _, s := range sc.GetSamples() { - if s.Metric.Name == name { - result += s.Value - } - } - } - return +func TestMetricsEngineEvaluateIgnoreEmptySink(t *testing.T) { + t.Parallel() + + me := newTestMetricsEngine(t) + + m1, err := me.test.Registry.NewMetric("m1", metrics.Counter) + require.NoError(t, err) + m2, err := me.test.Registry.NewMetric("m2", metrics.Counter) + require.NoError(t, err) + + ths := metrics.NewThresholds([]string{"count>5"}) + require.NoError(t, ths.Parse()) + m1.Thresholds = ths + m1.Thresholds.Thresholds[0].AbortOnFail = true + + me.metricsWithThresholds = []*metrics.Metric{m1, m2} + + breached, abort := me.evaluateThresholds(false, zeroTestRunDuration) + require.True(t, abort) + require.Equal(t, []string{"m1"}, breached) + + breached, abort = me.evaluateThresholds(true, zeroTestRunDuration) + require.False(t, abort) + assert.Empty(t, breached) } -func getMetricCount(mo *mockoutput.MockOutput, name string) (result uint) { - for _, sc := range mo.SampleContainers { - for _, s := range sc.GetSamples() { - if s.Metric.Name == name { - result++ - } - } +func newTestMetricsEngine(t *testing.T) MetricsEngine { + trs := &lib.TestRunState{ + TestPreInitState: &lib.TestPreInitState{ + Logger: testutils.NewLogger(t), + Registry: metrics.NewRegistry(), + }, } - return -} -func getMetricMax(mo *mockoutput.MockOutput, name string) (result float64) { - for _, sc := range mo.SampleContainers { - for _, s := range sc.GetSamples() { - if s.Metric.Name == name && s.Value > result { - result = s.Value - } - } + return MetricsEngine{ + logger: trs.Logger, + test: trs, } - return } -const expectedHeaderMaxLength = 550 +func zeroTestRunDuration() time.Duration { + return 0 +} -// FIXME: This test is too brittle, consider simplifying. +/* +// FIXME: This test is too brittle, +// move them as e2e tests and consider to simplify. +// func TestSentReceivedMetrics(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) diff --git a/metrics/engine/ingester_test.go b/metrics/engine/ingester_test.go new file mode 100644 index 00000000000..3bb155243bd --- /dev/null +++ b/metrics/engine/ingester_test.go @@ -0,0 +1,109 @@ +package engine + +import ( + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.k6.io/k6/lib" + "go.k6.io/k6/lib/testutils" + "go.k6.io/k6/metrics" +) + +func TestIngesterOutputFlushMetrics(t *testing.T) { + t.Parallel() + + piState := newTestPreInitState(t) + testMetric, err := piState.Registry.NewMetric("test_metric", metrics.Trend) + require.NoError(t, err) + + ingester := outputIngester{ + logger: piState.Logger, + metricsEngine: &MetricsEngine{ + ObservedMetrics: make(map[string]*metrics.Metric), + }, + } + require.NoError(t, ingester.Start()) + ingester.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ + TimeSeries: metrics.TimeSeries{Metric: testMetric}, + Value: 21, + }}) + ingester.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ + TimeSeries: metrics.TimeSeries{Metric: testMetric}, + Value: 21, + }}) + require.NoError(t, ingester.Stop()) + + require.Len(t, ingester.metricsEngine.ObservedMetrics, 1) + metric := ingester.metricsEngine.ObservedMetrics["test_metric"] + require.NotNil(t, metric) + require.NotNil(t, metric.Sink) + assert.Equal(t, testMetric, metric) + + sink := metric.Sink.(*metrics.TrendSink) //nolint:forcetypeassert + assert.Equal(t, 42.0, sink.Sum) +} + +func TestIngesterOutputFlushSubmetrics(t *testing.T) { + t.Parallel() + + piState := newTestPreInitState(t) + testMetric, err := piState.Registry.NewMetric("test_metric", metrics.Gauge) + require.NoError(t, err) + + me := &MetricsEngine{ + test: &lib.TestRunState{ + TestPreInitState: piState, + }, + ObservedMetrics: make(map[string]*metrics.Metric), + } + _, err = me.getThresholdMetricOrSubmetric("test_metric{a:1}") + require.NoError(t, err) + + // assert that observed metrics is empty before to start + require.Empty(t, me.ObservedMetrics) + + ingester := outputIngester{ + logger: piState.Logger, + metricsEngine: me, + } + require.NoError(t, ingester.Start()) + ingester.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ + TimeSeries: metrics.TimeSeries{ + Metric: testMetric, + Tags: piState.Registry.RootTagSet().WithTagsFromMap( + map[string]string{"a": "1", "b": "2"}), + }, + Value: 21, + }}) + require.NoError(t, ingester.Stop()) + + require.Len(t, ingester.metricsEngine.ObservedMetrics, 2) + + // assert the parent has been observed + metric := ingester.metricsEngine.ObservedMetrics["test_metric"] + require.NotNil(t, metric) + require.NotNil(t, metric.Sink) + assert.IsType(t, &metrics.GaugeSink{}, metric.Sink) + + // assert the submetric has been observed + metric = ingester.metricsEngine.ObservedMetrics["test_metric{a:1}"] + require.NotNil(t, metric) + require.NotNil(t, metric.Sink) + require.NotNil(t, metric.Sub) + assert.EqualValues(t, map[string]string{"a": "1"}, metric.Sub.Tags.Map()) + assert.IsType(t, &metrics.GaugeSink{}, metric.Sink) +} + +func newTestPreInitState(tb testing.TB) *lib.TestPreInitState { + reg := metrics.NewRegistry() + logger := testutils.NewLogger(tb) + logger.SetLevel(logrus.DebugLevel) + return &lib.TestPreInitState{ + Logger: logger, + RuntimeOptions: lib.RuntimeOptions{}, + Registry: reg, + BuiltinMetrics: metrics.RegisterBuiltinMetrics(reg), + } +}