diff --git a/api/v1/setup_teardown_routes_test.go b/api/v1/setup_teardown_routes_test.go index 7d0571fc94d..449e39696a5 100644 --- a/api/v1/setup_teardown_routes_test.go +++ b/api/v1/setup_teardown_routes_test.go @@ -15,12 +15,14 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/guregu/null.v3" - "go.k6.io/k6/core" "go.k6.io/k6/execution" "go.k6.io/k6/js" "go.k6.io/k6/lib" "go.k6.io/k6/lib/types" "go.k6.io/k6/loader" + "go.k6.io/k6/metrics" + "go.k6.io/k6/metrics/engine" + "go.k6.io/k6/output" ) func TestSetupData(t *testing.T) { @@ -138,31 +140,30 @@ func TestSetupData(t *testing.T) { execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) - engine, err := core.NewEngine(testState, execScheduler, nil) + metricsEngine, err := engine.NewMetricsEngine(execScheduler.GetState()) require.NoError(t, err) globalCtx, globalCancel := context.WithCancel(context.Background()) - t.Cleanup(globalCancel) + defer globalCancel() runCtx, runAbort := execution.NewTestRunContext(globalCtx, testState.Logger) defer runAbort(fmt.Errorf("unexpected abort")) - require.NoError(t, engine.OutputManager.StartOutputs()) - defer engine.OutputManager.StopOutputs(nil) + outputManager := output.NewManager([]output.Output{metricsEngine.CreateIngester()}, testState.Logger, runAbort) + samples := make(chan metrics.SampleContainer, 1000) + _, stopOutputs, err := outputManager.Start(samples) + require.NoError(t, err) + defer stopOutputs(nil) cs := &ControlSurface{ RunCtx: runCtx, - Samples: engine.Samples, - MetricsEngine: engine.MetricsEngine, + Samples: samples, + MetricsEngine: metricsEngine, Scheduler: execScheduler, RunState: testState, } - run, wait, err := engine.Init(globalCtx, runCtx) - require.NoError(t, err) - - defer wait() errC := make(chan error) - go func() { errC <- run() }() + go func() { errC <- execScheduler.Run(globalCtx, runCtx, samples) }() handler := NewHandler(cs) @@ -194,6 +195,7 @@ func TestSetupData(t *testing.T) { case <-time.After(10 * time.Second): t.Fatal("Test timed out") case err := <-errC: + close(samples) require.NoError(t, err) } }) diff --git a/api/v1/status_routes_test.go b/api/v1/status_routes_test.go index 9e74f768e89..ba2f174e391 100644 --- a/api/v1/status_routes_test.go +++ b/api/v1/status_routes_test.go @@ -15,10 +15,12 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/guregu/null.v3" - "go.k6.io/k6/core" "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils/minirunner" + "go.k6.io/k6/metrics" + "go.k6.io/k6/metrics/engine" + "go.k6.io/k6/output" ) func TestGetStatus(t *testing.T) { @@ -111,39 +113,40 @@ func TestPatchStatus(t *testing.T) { testState := getTestRunState(t, lib.Options{Scenarios: scenarios}, &minirunner.MiniRunner{}) execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) - engine, err := core.NewEngine(testState, execScheduler, nil) - require.NoError(t, err) - require.NoError(t, engine.OutputManager.StartOutputs()) - defer engine.OutputManager.StopOutputs(nil) + metricsEngine, err := engine.NewMetricsEngine(execScheduler.GetState()) + require.NoError(t, err) globalCtx, globalCancel := context.WithCancel(context.Background()) - t.Cleanup(globalCancel) + defer globalCancel() runCtx, runAbort := execution.NewTestRunContext(globalCtx, testState.Logger) defer runAbort(fmt.Errorf("unexpected abort")) - engine.AbortFn = runAbort + + outputManager := output.NewManager([]output.Output{metricsEngine.CreateIngester()}, testState.Logger, runAbort) + samples := make(chan metrics.SampleContainer, 1000) + waitMetricsFlushed, stopOutputs, err := outputManager.Start(samples) + require.NoError(t, err) + defer stopOutputs(nil) cs := &ControlSurface{ RunCtx: runCtx, - Samples: engine.Samples, - MetricsEngine: engine.MetricsEngine, + Samples: samples, + MetricsEngine: metricsEngine, Scheduler: execScheduler, RunState: testState, } - run, wait, err := engine.Init(globalCtx, runCtx) - require.NoError(t, err) - wg := &sync.WaitGroup{} wg.Add(1) defer func() { runAbort(fmt.Errorf("custom cancel signal")) - wait() + waitMetricsFlushed() wg.Wait() }() go func() { - assert.ErrorContains(t, run(), "custom cancel signal") + assert.ErrorContains(t, execScheduler.Run(globalCtx, runCtx, samples), "custom cancel signal") + close(samples) wg.Done() }() // wait for the executor to initialize to avoid a potential data race below diff --git a/cmd/integration_test.go b/cmd/integration_test.go index d63bb8fa7ae..59cc5277eb8 100644 --- a/cmd/integration_test.go +++ b/cmd/integration_test.go @@ -563,7 +563,8 @@ func TestThresholdsFailed(t *testing.T) { ) newRootCommand(ts.globalState).execute() - assert.True(t, testutils.LogContains(ts.loggerHook.Drain(), logrus.ErrorLevel, `some thresholds have failed`)) + expErr := "thresholds on metrics 'iterations{scenario:sc1}, iterations{scenario:sc2}' have been breached" + assert.True(t, testutils.LogContains(ts.loggerHook.Drain(), logrus.ErrorLevel, expErr)) stdOut := ts.stdOut.String() t.Log(stdOut) assert.Contains(t, stdOut, ` ✓ iterations...........: 3`) @@ -655,7 +656,6 @@ func TestAbortedByUserWithGoodThresholds(t *testing.T) { newRootCommand(ts.globalState).execute() logs := ts.loggerHook.Drain() - assert.False(t, testutils.LogContains(logs, logrus.ErrorLevel, `some thresholds have failed`)) assert.True(t, testutils.LogContains(logs, logrus.ErrorLevel, `test run was aborted because k6 received a 'interrupt' signal`)) stdOut := ts.stdOut.String() t.Log(stdOut) @@ -848,7 +848,7 @@ func runTestWithNoLinger(t *testing.T, ts *globalTestState) { func runTestWithLinger(t *testing.T, ts *globalTestState) { ts.args = append(ts.args, "--linger") - asyncWaitForStdoutAndStopTestWithInterruptSignal(t, ts, 15, time.Second, "Linger set; waiting for Ctrl+C") + asyncWaitForStdoutAndStopTestWithInterruptSignal(t, ts, 15, time.Second, "waiting for Ctrl+C to continue") newRootCommand(ts.globalState).execute() } @@ -939,7 +939,7 @@ func testAbortedByScriptError(t *testing.T, script string, runTest func(*testing t.Log(stdOut) assert.Contains(t, stdOut, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`) assert.Contains(t, stdOut, `level=debug msg="Metrics processing finished!"`) - assert.Contains(t, stdOut, `level=debug msg="Everything has finished, exiting k6!"`) + assert.Contains(t, stdOut, `level=debug msg="Everything has finished, exiting k6 with error`) assert.Contains(t, stdOut, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=7 tainted=false`) return ts } @@ -1050,7 +1050,6 @@ func TestAbortedByScriptAbortInSetup(t *testing.T) { t.Parallel() testAbortedByScriptTestAbort(t, script, runTestWithNoLinger) }) - t.Run("withLinger", func(t *testing.T) { t.Parallel() testAbortedByScriptTestAbort(t, script, runTestWithLinger) @@ -1391,7 +1390,7 @@ func TestActiveVUsCount(t *testing.T) { if i < 3 { assert.Equal(t, "Insufficient VUs, reached 10 active VUs and cannot initialize more", logEntry.Message) } else { - assert.Equal(t, "No script iterations finished, consider making the test duration longer", logEntry.Message) + assert.Equal(t, "No script iterations fully finished, consider making the test duration longer", logEntry.Message) } } } diff --git a/cmd/run.go b/cmd/run.go index f44f873a0d2..fa10a27fac0 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -10,21 +10,25 @@ import ( "net/http" "os" "runtime" + "strings" "sync" "time" + "github.com/sirupsen/logrus" "github.com/spf13/afero" "github.com/spf13/cobra" "github.com/spf13/pflag" "go.k6.io/k6/api" - "go.k6.io/k6/core" "go.k6.io/k6/errext" "go.k6.io/k6/errext/exitcodes" "go.k6.io/k6/execution" "go.k6.io/k6/js/common" "go.k6.io/k6/lib" "go.k6.io/k6/lib/consts" + "go.k6.io/k6/metrics" + "go.k6.io/k6/metrics/engine" + "go.k6.io/k6/output" "go.k6.io/k6/ui/pb" ) @@ -37,12 +41,36 @@ type cmdRun struct { // //nolint:funlen,gocognit,gocyclo,cyclop func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { + var logger logrus.FieldLogger = c.gs.logger + defer func() { + logger.Debugf("Everything has finished, exiting k6 with error '%s'!", err) + }() printBanner(c.gs) + globalCtx, globalCancel := context.WithCancel(c.gs.ctx) + defer globalCancel() + + // lingerCtx is cancelled by Ctrl+C, and is used to wait for that event when + // k6 was started with the --linger option. + lingerCtx, lingerCancel := context.WithCancel(globalCtx) + defer lingerCancel() + + // runCtx is used for the test run execution and is created with the special + // execution.NewTestRunContext() function so that it can be aborted even + // from sub-contexts while also attaching a reason for the abort. + runCtx, runAbort := execution.NewTestRunContext(lingerCtx, logger) + test, err := loadAndConfigureTest(c.gs, cmd, args, getConfig) if err != nil { return err } + if test.keyLogger != nil { + defer func() { + if klErr := test.keyLogger.Close(); klErr != nil { + logger.WithError(klErr).Warn("Error while closing the SSLKEYLOGFILE") + } + }() + } // Write the full consolidated *and derived* options back to the Runner. conf := test.derivedConfig @@ -51,26 +79,6 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { return err } - // We prepare a bunch of contexts: - // - The runCtx is cancelled as soon as the Engine's run() lambda finishes, - // and can trigger things like the usage report and end of test summary. - // Crucially, metrics processing by the Engine will still work after this - // context is cancelled! - // - The lingerCtx is cancelled by Ctrl+C, and is used to wait for that - // event when k6 was ran with the --linger option. - // - The globalCtx is cancelled only after we're completely done with the - // test execution and any --linger has been cleared, so that the Engine - // can start winding down its metrics processing. - globalCtx, globalCancel := context.WithCancel(c.gs.ctx) - defer globalCancel() - lingerCtx, lingerCancel := context.WithCancel(globalCtx) - defer lingerCancel() - runCtx, runCancel := context.WithCancel(lingerCtx) - defer runCancel() - - logger := testRunState.Logger - runSubCtx, runSubAbort := execution.NewTestRunContext(runCtx, logger) - // Create a local execution scheduler wrapping the runner. logger.Debug("Initializing the execution scheduler...") execScheduler, err := execution.NewScheduler(testRunState) @@ -105,17 +113,95 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { return err } - // TODO: create a MetricsEngine here and add its ingester to the list of - // outputs (unless both NoThresholds and NoSummary were enabled) + executionState := execScheduler.GetState() + metricsEngine, err := engine.NewMetricsEngine(executionState) + if err != nil { + return err + } + if !testRunState.RuntimeOptions.NoSummary.Bool || !testRunState.RuntimeOptions.NoThresholds.Bool { + // We'll need to pipe metrics to the MetricsEngine if either the + // thresholds or the end-of-test summary are enabled. + outputs = append(outputs, metricsEngine.CreateIngester()) + } + + if !testRunState.RuntimeOptions.NoSummary.Bool { + defer func() { + logger.Debug("Generating the end-of-test summary...") + summaryResult, hsErr := test.initRunner.HandleSummary(globalCtx, &lib.Summary{ + Metrics: metricsEngine.ObservedMetrics, + RootGroup: testRunState.Runner.GetDefaultGroup(), + TestRunDuration: executionState.GetCurrentTestRunDuration(), + NoColor: c.gs.flags.noColor, + UIState: lib.UIState{ + IsStdOutTTY: c.gs.stdOut.isTTY, + IsStdErrTTY: c.gs.stdErr.isTTY, + }, + }) + if hsErr == nil { + hsErr = handleSummaryResult(c.gs.fs, c.gs.stdOut, c.gs.stdErr, summaryResult) + } + if hsErr != nil { + logger.WithError(hsErr).Error("failed to handle the end-of-test summary") + } + }() + } - // TODO: remove this completely - // Create the engine. - initBar.Modify(pb.WithConstProgress(0, "Init engine")) - engine, err := core.NewEngine(testRunState, execScheduler, outputs) + // Create and start the outputs. We do it quite early to get any output URLs + // or other details below. It also allows us to ensure when they have + // flushed their samples and when they have stopped in the defer statements. + initBar.Modify(pb.WithConstProgress(0, "Starting outputs")) + outputManager := output.NewManager(outputs, logger, func(err error) { + if err != nil { + logger.WithError(err).Error("Received error to stop from output") + } + // TODO: attach run status and exit code? + runAbort(err) + }) + samples := make(chan metrics.SampleContainer, test.derivedConfig.MetricSamplesBufferSize.Int64) + waitOutputsFlushed, stopOutputs, err := outputManager.Start(samples) if err != nil { return err } - engine.AbortFn = runSubAbort + defer func() { + logger.Debug("Stopping outputs...") + // We call waitOutputsFlushed() below because the threshold calculations + // need all of the metrics to be sent to the MetricsEngine before we can + // calculate them one last time. We need the threshold calculated here, + // since they may change the run status for the outputs. + stopOutputs(err) + }() + + if !testRunState.RuntimeOptions.NoThresholds.Bool { + finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort) + defer func() { + // This gets called after the Samples channel has been closed and + // the OutputManager has flushed all of the cached samples to + // outputs (including MetricsEngine's ingester). So we are sure + // there won't be any more metrics being sent. + logger.Debug("Finalizing thresholds...") + breachedThresholds := finalizeThresholds() + if len(breachedThresholds) > 0 { + tErr := errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone( + fmt.Errorf("thresholds on metrics '%s' have been breached", strings.Join(breachedThresholds, ", ")), + exitcodes.ThresholdsHaveFailed, + ), errext.AbortedByThresholdsAfterTestEnd) + + if err == nil { + err = tErr + } else { + logger.WithError(tErr).Debug("Breached thresholds, but test already exited with another error") + } + } + }() + } + + defer func() { + logger.Debug("Waiting for metric processing to finish...") + close(samples) + waitOutputsFlushed() + logger.Debug("Metrics processing finished!") + }() // Spin up the REST API server, if not disabled. if c.gs.flags.address != "" { @@ -129,7 +215,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { defer srvCancel() // TODO: send the ExecutionState and MetricsEngine instead of the Engine - srv := api.GetServer(runSubCtx, c.gs.flags.address, testRunState, engine.Samples, engine.MetricsEngine, execScheduler) + srv := api.GetServer(runCtx, c.gs.flags.address, testRunState, samples, metricsEngine, execScheduler) go func() { defer apiWG.Done() logger.Debugf("Starting the REST API server on %s", c.gs.flags.address) @@ -154,26 +240,16 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { }() } - // We do this here so we can get any output URLs below. - initBar.Modify(pb.WithConstProgress(0, "Starting outputs")) - // TODO: directly create the MutputManager here, not in the Engine - err = engine.OutputManager.StartOutputs() - if err != nil { - return err - } - defer func() { - engine.OutputManager.StopOutputs(err) - }() - printExecutionDescription( c.gs, "local", args[0], "", conf, execScheduler.GetState().ExecutionTuple, executionPlan, outputs, ) // Trap Interrupts, SIGINTs and SIGTERMs. + // TODO: move upwards, right after runCtx is created gracefulStop := func(sig os.Signal) { logger.WithField("sig", sig).Debug("Stopping k6 in response to signal...") // first abort the test run this way, to propagate the error - runSubAbort(errext.WithAbortReasonIfNone( + runAbort(errext.WithAbortReasonIfNone( errext.WithExitCodeIfNone( fmt.Errorf("test run was aborted because k6 received a '%s' signal", sig), exitcodes.ExternalAbort, ), errext.AbortedByUser, @@ -182,20 +258,32 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { } onHardStop := func(sig os.Signal) { logger.WithField("sig", sig).Error("Aborting k6 in response to signal") - globalCancel() // not that it matters, given the following command... + globalCancel() // not that it matters, given that os.Exit() will be called right after } stopSignalHandling := handleTestAbortSignals(c.gs, gracefulStop, onHardStop) defer stopSignalHandling() - // Initialize the engine - initBar.Modify(pb.WithConstProgress(0, "Init VUs...")) - engineRun, engineWait, err := engine.Init(globalCtx, runSubCtx) - if err != nil { - err = common.UnwrapGojaInterruptedError(err) - // Add a generic engine exit code if we don't have a more specific one - return errext.WithExitCodeIfNone(err, exitcodes.GenericEngine) + if conf.Linger.Bool { + defer func() { + msg := "The test is done, but --linger was enabled, so k6 is waiting for Ctrl+C to continue..." + select { + case <-lingerCtx.Done(): + // do nothing, we were interrupted by Ctrl+C already + default: + logger.Debug(msg) + if !c.gs.flags.quiet { + printToStdout(c.gs, msg) + } + <-lingerCtx.Done() + logger.Debug("Ctrl+C received, exiting...") + } + }() } + // Initialize VUs and start the test! However, we won't immediately return + // if there was an error, we still have things to do. + err = execScheduler.Run(globalCtx, runCtx, samples) + // Init has passed successfully, so unless disabled, make sure we send a // usage report after the context is done. if !conf.NoUsageReport.Bool { @@ -213,80 +301,21 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { }() } - // Start the test run - initBar.Modify(pb.WithConstProgress(0, "Starting test...")) - err = engineRun() + // Check what the execScheduler.Run() error is. if err != nil { - err = errext.WithExitCodeIfNone(common.UnwrapGojaInterruptedError(err), exitcodes.GenericEngine) - logger.WithError(err).Debug("Engine terminated with an error") - } else { - logger.Debug("Engine run terminated cleanly") + err = common.UnwrapGojaInterruptedError(err) + logger.WithError(err).Debug("Test finished with an error") + return err } - runCancel() - - progressCancel() - progressBarWG.Wait() - executionState := execScheduler.GetState() // Warn if no iterations could be completed. - if err == nil && executionState.GetFullIterationCount() == 0 { - logger.Warn("No script iterations finished, consider making the test duration longer") + if executionState.GetFullIterationCount() == 0 { + logger.Warn("No script iterations fully finished, consider making the test duration longer") } - // Handle the end-of-test summary. - if !testRunState.RuntimeOptions.NoSummary.Bool { - engine.MetricsEngine.MetricsLock.Lock() // TODO: refactor so this is not needed - summaryResult, hsErr := test.initRunner.HandleSummary(globalCtx, &lib.Summary{ - Metrics: engine.MetricsEngine.ObservedMetrics, - RootGroup: testRunState.Runner.GetDefaultGroup(), - TestRunDuration: executionState.GetCurrentTestRunDuration(), - NoColor: c.gs.flags.noColor, - UIState: lib.UIState{ - IsStdOutTTY: c.gs.stdOut.isTTY, - IsStdErrTTY: c.gs.stdErr.isTTY, - }, - }) - engine.MetricsEngine.MetricsLock.Unlock() - if hsErr == nil { - hsErr = handleSummaryResult(c.gs.fs, c.gs.stdOut, c.gs.stdErr, summaryResult) - } - if hsErr != nil { - logger.WithError(hsErr).Error("failed to handle the end-of-test summary") - } - } + logger.Debug("Test finished cleanly") - if conf.Linger.Bool { - select { - case <-lingerCtx.Done(): - // do nothing, we were interrupted by Ctrl+C already - default: - logger.Debug("Linger set; waiting for Ctrl+C...") - if !c.gs.flags.quiet { - printToStdout(c.gs, "Linger set; waiting for Ctrl+C...") - } - <-lingerCtx.Done() - logger.Debug("Ctrl+C received, exiting...") - } - } - globalCancel() // signal the Engine that it should wind down - logger.Debug("Waiting for engine processes to finish...") - engineWait() - logger.Debug("Everything has finished, exiting k6!") - if test.keyLogger != nil { - if klErr := test.keyLogger.Close(); klErr != nil { - logger.WithError(klErr).Warn("Error while closing the SSLKEYLOGFILE") - } - } - - if engine.IsTainted() { - if err == nil { - err = errors.New("some thresholds have failed") - } - err = errext.WithAbortReasonIfNone( - errext.WithExitCodeIfNone(err, exitcodes.ThresholdsHaveFailed), errext.AbortedByThresholdsAfterTestEnd, - ) - } - return err + return nil } func (c *cmdRun) flagSet() *pflag.FlagSet { diff --git a/cmd/ui.go b/cmd/ui.go index b492370b9ba..15274f87df0 100644 --- a/cmd/ui.go +++ b/cmd/ui.go @@ -273,7 +273,7 @@ func renderMultipleBars( // TODO: don't use global variables... // //nolint:funlen,gocognit -func showProgress(ctx context.Context, gs *globalState, pbs []*pb.ProgressBar, logger *logrus.Logger) { +func showProgress(ctx context.Context, gs *globalState, pbs []*pb.ProgressBar, logger logrus.FieldLogger) { if gs.flags.quiet { return } diff --git a/core/engine.go b/core/engine.go deleted file mode 100644 index ef8366f12c6..00000000000 --- a/core/engine.go +++ /dev/null @@ -1,223 +0,0 @@ -package core - -import ( - "context" - "errors" - "strings" - "sync" - "time" - - "github.com/sirupsen/logrus" - - "go.k6.io/k6/execution" - "go.k6.io/k6/lib" - "go.k6.io/k6/metrics" - "go.k6.io/k6/metrics/engine" - "go.k6.io/k6/output" -) - -const collectRate = 50 * time.Millisecond - -// The Engine is the beating heart of k6. -type Engine struct { - // TODO: Make most of the stuff here private! And think how to refactor the - // engine to be less stateful... it's currently one big mess of moving - // pieces, and you implicitly first have to call Init() and then Run() - - // maybe we should refactor it so we have a `Session` dauther-object that - // Init() returns? The only problem with doing this is the REST API - it - // expects to be able to get information from the Engine and is initialized - // before the Init() call... - - // TODO: completely remove the engine and use all of these separately, in a - // much more composable and testable manner - ExecutionScheduler *execution.Scheduler - MetricsEngine *engine.MetricsEngine - OutputManager *output.Manager - - runtimeOptions lib.RuntimeOptions - - ingester output.Output - - logger *logrus.Entry - AbortFn func(error) // temporary - - Samples chan metrics.SampleContainer -} - -// NewEngine instantiates a new Engine, without doing any heavy initialization. -func NewEngine(testState *lib.TestRunState, ex *execution.Scheduler, outputs []output.Output) (*Engine, error) { - if ex == nil { - return nil, errors.New("missing ExecutionScheduler instance") - } - - e := &Engine{ - ExecutionScheduler: ex, - - runtimeOptions: testState.RuntimeOptions, - Samples: make(chan metrics.SampleContainer, testState.Options.MetricSamplesBufferSize.Int64), - logger: testState.Logger.WithField("component", "engine"), - } - - me, err := engine.NewMetricsEngine(ex.GetState()) - if err != nil { - return nil, err - } - e.MetricsEngine = me - - if !(testState.RuntimeOptions.NoSummary.Bool && testState.RuntimeOptions.NoThresholds.Bool) { - e.ingester = me.CreateIngester() - outputs = append(outputs, e.ingester) - } - - e.OutputManager = output.NewManager(outputs, testState.Logger, func(err error) { - if err != nil { - testState.Logger.WithError(err).Error("Received error to stop from output") - } - e.AbortFn(err) - }) - - return e, nil -} - -// Init is used to initialize the execution scheduler and all metrics processing -// in the engine. The first is a costly operation, since it initializes all of -// the planned VUs and could potentially take a long time. -// -// This method either returns an error immediately, or it returns test run() and -// wait() functions. -// -// Things to note: -// - The first lambda, Run(), synchronously executes the actual load test. -// - It can be prematurely aborted by cancelling the runCtx - this won't stop -// the metrics collection by the Engine. -// - Stopping the metrics collection can be done at any time after Run() has -// returned by cancelling the globalCtx -// - The second returned lambda can be used to wait for that process to finish. -func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait func(), err error) { - e.logger.Debug("Initialization starting...") - - // TODO: move all of this in a separate struct? see main TODO above - processMetricsAfterRun := make(chan struct{}) - runFn := func() error { - e.logger.Debug("Execution scheduler starting...") - err := e.ExecutionScheduler.Run(globalCtx, runCtx, e.Samples) - if err == nil { - e.logger.Debug("Execution scheduler finished nominally") - err = runCtx.Err() - } - if err != nil { - e.logger.WithError(err).Debug("Engine run returned an error") - } else { - e.logger.Debug("Execution scheduler and engine finished nominally") - } - - // Make the background jobs process the currently buffered metrics and - // run the thresholds, then wait for that to be done. - select { - case processMetricsAfterRun <- struct{}{}: - <-processMetricsAfterRun - case <-globalCtx.Done(): - } - - return err - } - - waitFn := e.startBackgroundProcesses(globalCtx, processMetricsAfterRun) - return runFn, waitFn, nil -} - -// This starts a bunch of goroutines to process metrics, thresholds, and set the -// test run status when it ends. It returns a function that can be used after -// the provided context is called, to wait for the complete winding down of all -// started goroutines. -// -// Because the background process is not aware of the execution's state, `processMetricsAfterRun` -// will be used to signal that the test run is finished, no more metric samples will be produced, -// and that the remaining metrics samples in the pipeline should be processed as the background -// process is about to exit. -func (e *Engine) startBackgroundProcesses( - globalCtx context.Context, processMetricsAfterRun chan struct{}, -) (wait func()) { - processes := new(sync.WaitGroup) - - // Siphon and handle all produced metric samples - processes.Add(1) - go func() { - defer processes.Done() - e.processMetrics(globalCtx, processMetricsAfterRun) - }() - - return processes.Wait -} - -// processMetrics process the execution's metrics samples as they are collected. -// The processing of samples happens at a fixed rate defined by the `collectRate` -// constant. -// -// The `processMetricsAfterRun` channel argument is used by the caller to signal -// that the test run is finished, no more metric samples will be produced, and that -// the metrics samples remaining in the pipeline should be should be processed. -func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRun chan struct{}) { - sampleContainers := []metrics.SampleContainer{} - - // Run thresholds, if not disabled. - var finalizeThresholds func() (breached []string) - if !e.runtimeOptions.NoThresholds.Bool { - finalizeThresholds = e.MetricsEngine.StartThresholdCalculations(e.AbortFn) - } - - ticker := time.NewTicker(collectRate) - defer ticker.Stop() - - e.logger.Debug("Metrics processing started...") - processSamples := func() { - if len(sampleContainers) > 0 { - e.OutputManager.AddMetricSamples(sampleContainers) - // Make the new container with the same size as the previous - // one, assuming that we produce roughly the same amount of - // metrics data between ticks... - sampleContainers = make([]metrics.SampleContainer, 0, cap(sampleContainers)) - } - } - - finalize := func() { - // Process any remaining metrics in the pipeline, by this point Run() - // has already finished and nothing else should be producing metrics. - e.logger.Debug("Metrics processing winding down...") - - close(e.Samples) - for sc := range e.Samples { - sampleContainers = append(sampleContainers, sc) - } - processSamples() - - if finalizeThresholds != nil { - // Ensure the ingester flushes any buffered metrics - _ = e.ingester.Stop() - breached := finalizeThresholds() - e.logger.Debugf("Engine: thresholds done, breached: '%s'", strings.Join(breached, ", ")) - } - e.logger.Debug("Metrics processing finished!") - } - - for { - select { - case <-ticker.C: - processSamples() - case <-processMetricsAfterRun: - e.logger.Debug("Processing metrics and thresholds after the test run has ended...") - finalize() - processMetricsAfterRun <- struct{}{} - return - case sc := <-e.Samples: - sampleContainers = append(sampleContainers, sc) - case <-globalCtx.Done(): - finalize() - return - } - } -} - -func (e *Engine) IsTainted() bool { - return e.MetricsEngine.GetMetricsWithBreachedThresholdsCount() > 0 -} diff --git a/errext/exitcodes/codes.go b/errext/exitcodes/codes.go index 98b678e69f5..4d5c845ef79 100644 --- a/errext/exitcodes/codes.go +++ b/errext/exitcodes/codes.go @@ -14,11 +14,10 @@ const ( SetupTimeout ExitCode = 100 TeardownTimeout ExitCode = 101 GenericTimeout ExitCode = 102 // TODO: remove? - GenericEngine ExitCode = 103 // TODO: remove after https://github.com/grafana/k6/issues/2804 + CannotStartRESTAPI ExitCode = 103 InvalidConfig ExitCode = 104 ExternalAbort ExitCode = 105 - CannotStartRESTAPI ExitCode = 106 + ScriptStoppedFromRESTAPI ExitCode = 106 ScriptException ExitCode = 107 ScriptAborted ExitCode = 108 - ScriptStoppedFromRESTAPI ExitCode = 109 ) diff --git a/js/runner_test.go b/js/runner_test.go index cc5181ed2ce..71de51c3960 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -33,7 +33,6 @@ import ( "google.golang.org/grpc/test/grpc_testing" "gopkg.in/guregu/null.v3" - "go.k6.io/k6/core" "go.k6.io/k6/errext" "go.k6.io/k6/execution" "go.k6.io/k6/js/modules/k6" @@ -48,6 +47,7 @@ import ( "go.k6.io/k6/lib/testutils/mockoutput" "go.k6.io/k6/lib/types" "go.k6.io/k6/metrics" + "go.k6.io/k6/metrics/engine" "go.k6.io/k6/output" ) @@ -387,30 +387,37 @@ func TestDataIsolation(t *testing.T) { execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) - mockOutput := mockoutput.New() - engine, err := core.NewEngine(testRunState, execScheduler, []output.Output{mockOutput}) + metricsEngine, err := engine.NewMetricsEngine(execScheduler.GetState()) require.NoError(t, err) - require.NoError(t, engine.OutputManager.StartOutputs()) - defer engine.OutputManager.StopOutputs(nil) - ctx, cancel := context.WithCancel(context.Background()) - run, wait, err := engine.Init(ctx, ctx) + globalCtx, globalCancel := context.WithCancel(context.Background()) + defer globalCancel() + runCtx, runAbort := execution.NewTestRunContext(globalCtx, testRunState.Logger) + + mockOutput := mockoutput.New() + outputManager := output.NewManager([]output.Output{mockOutput, metricsEngine.CreateIngester()}, testRunState.Logger, runAbort) + samples := make(chan metrics.SampleContainer, 1000) + waitForMetricsFlushed, stopOutputs, err := outputManager.Start(samples) require.NoError(t, err) + defer stopOutputs(nil) + + finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort) require.Empty(t, runner.defaultGroup.Groups) errC := make(chan error) - go func() { errC <- run() }() + go func() { errC <- execScheduler.Run(globalCtx, runCtx, samples) }() select { case <-time.After(20 * time.Second): - cancel() + runAbort(fmt.Errorf("unexpected abort")) t.Fatal("Test timed out") case err := <-errC: - cancel() + close(samples) require.NoError(t, err) - wait() - require.False(t, engine.IsTainted()) + waitForMetricsFlushed() + breached := finalizeThresholds() + require.Empty(t, breached) } require.Contains(t, runner.defaultGroup.Groups, "setup") require.Contains(t, runner.defaultGroup.Groups, "teardown") diff --git a/metrics/engine/engine.go b/metrics/engine/engine.go index 6c1a364f9eb..6bf0896c34c 100644 --- a/metrics/engine/engine.go +++ b/metrics/engine/engine.go @@ -4,6 +4,7 @@ package engine import ( "fmt" + "sort" "strings" "sync" "sync/atomic" @@ -245,7 +246,9 @@ func (me *MetricsEngine) evaluateThresholds(ignoreEmptySinks bool) (breachedTher shouldAbort = true } } - + if len(breachedThersholds) > 0 { + sort.Strings(breachedThersholds) + } me.logger.Debugf("Thresholds on %d metrics breached: %v", len(breachedThersholds), breachedThersholds) atomic.StoreUint32(&me.breachedThresholdsCount, uint32(len(breachedThersholds))) return breachedThersholds, shouldAbort diff --git a/core/engine_test.go b/metrics/engine/engine_test.go similarity index 61% rename from core/engine_test.go rename to metrics/engine/engine_test.go index 5dfa333b999..a46e409f39d 100644 --- a/core/engine_test.go +++ b/metrics/engine/engine_test.go @@ -1,131 +1,10 @@ -package core - -import ( - "context" - "fmt" - "net/url" - "runtime" - "testing" - "time" - - "github.com/sirupsen/logrus" - "github.com/spf13/afero" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "gopkg.in/guregu/null.v3" - - "go.k6.io/k6/errext" - "go.k6.io/k6/execution" - "go.k6.io/k6/js" - "go.k6.io/k6/lib" - "go.k6.io/k6/lib/executor" - "go.k6.io/k6/lib/testutils" - "go.k6.io/k6/lib/testutils/httpmultibin" - "go.k6.io/k6/lib/testutils/minirunner" - "go.k6.io/k6/lib/testutils/mockoutput" - "go.k6.io/k6/lib/types" - "go.k6.io/k6/loader" - "go.k6.io/k6/metrics" - "go.k6.io/k6/output" -) +package engine -const isWindows = runtime.GOOS == "windows" - -// TODO: completely rewrite all of these tests - -type testStruct struct { - engine *Engine - run func() error - runAbort func(error) - wait func() - piState *lib.TestPreInitState -} - -func getTestPreInitState(tb testing.TB) *lib.TestPreInitState { - reg := metrics.NewRegistry() - logger := testutils.NewLogger(tb) - logger.SetLevel(logrus.DebugLevel) - return &lib.TestPreInitState{ - Logger: logger, - RuntimeOptions: lib.RuntimeOptions{}, - Registry: reg, - BuiltinMetrics: metrics.RegisterBuiltinMetrics(reg), - } -} - -func getTestRunState( - tb testing.TB, piState *lib.TestPreInitState, options lib.Options, runner lib.Runner, -) *lib.TestRunState { - require.Empty(tb, options.Validate()) - require.NoError(tb, runner.SetOptions(options)) - return &lib.TestRunState{ - TestPreInitState: piState, - Options: options, - Runner: runner, - RunTags: piState.Registry.RootTagSet().WithTagsFromMap(options.RunTags), - } -} - -// Wrapper around NewEngine that applies a logger and manages the options. -func newTestEngineWithTestPreInitState( //nolint:golint - t *testing.T, runTimeout *time.Duration, runner lib.Runner, outputs []output.Output, - opts lib.Options, piState *lib.TestPreInitState, -) *testStruct { - if runner == nil { - runner = &minirunner.MiniRunner{ - PreInitState: piState, - } - } - - newOpts, err := executor.DeriveScenariosFromShortcuts(lib.Options{ - MetricSamplesBufferSize: null.NewInt(200, false), - }.Apply(runner.GetOptions()).Apply(opts), piState.Logger) - require.NoError(t, err) +// TODO: refactor and move the tests that still make sense in other packages - testRunState := getTestRunState(t, piState, newOpts, runner) +/* - execScheduler, err := execution.NewScheduler(testRunState) - require.NoError(t, err) - - engine, err := NewEngine(testRunState, execScheduler, outputs) - require.NoError(t, err) - require.NoError(t, engine.OutputManager.StartOutputs()) - - globalCtx, globalCancel := context.WithCancel(context.Background()) - var runCancel func() - var runCtx context.Context - if runTimeout != nil { - runCtx, runCancel = context.WithTimeout(globalCtx, *runTimeout) - } else { - runCtx, runCancel = context.WithCancel(globalCtx) - } - runSubCtx, runSubAbort := execution.NewTestRunContext(runCtx, piState.Logger) - engine.AbortFn = runSubAbort - - run, waitFn, err := engine.Init(globalCtx, runSubCtx) - require.NoError(t, err) - - var test *testStruct - test = &testStruct{ - engine: engine, - run: run, - runAbort: runSubAbort, - wait: func() { - runCancel() - globalCancel() - waitFn() - engine.OutputManager.StopOutputs(nil) - }, - piState: piState, - } - return test -} - -func newTestEngine( - t *testing.T, runTimeout *time.Duration, runner lib.Runner, outputs []output.Output, opts lib.Options, -) *testStruct { - return newTestEngineWithTestPreInitState(t, runTimeout, runner, outputs, opts, getTestPreInitState(t)) -} +const isWindows = runtime.GOOS == "windows" func TestEngineRun(t *testing.T) { t.Parallel() @@ -212,17 +91,6 @@ func TestEngineRun(t *testing.T) { }) } -func TestEngineAtTime(t *testing.T) { - t.Parallel() - test := newTestEngine(t, nil, nil, nil, lib.Options{ - VUs: null.IntFrom(2), - Duration: types.NullDurationFrom(20 * time.Second), - }) - defer test.wait() - - assert.NoError(t, test.run()) -} - func TestEngineOutput(t *testing.T) { t.Parallel() @@ -717,182 +585,6 @@ func TestSentReceivedMetrics(t *testing.T) { }) } -// TODO: delete when implementing https://github.com/grafana/k6/issues/1889, the -// test functionality was duplicated in cmd/integration_test.go -func TestRunTags(t *testing.T) { - t.Parallel() - - expectedRunTags := map[string]string{"foo": "bar", "test": "mest", "over": "written"} - - // it copies the map so in the case the runner will overwrite - // some run tags' values it doesn't affect the assertion. - runTags := make(map[string]string) - for k, v := range expectedRunTags { - runTags[k] = v - } - - tb := httpmultibin.NewHTTPMultiBin(t) - script := []byte(tb.Replacer.Replace(` - import http from "k6/http"; - import ws from "k6/ws"; - import { Counter } from "k6/metrics"; - import { group, check, fail } from "k6"; - - let customTags = { "over": "the rainbow" }; - let params = { "tags": customTags}; - let statusCheck = { "status is 200": (r) => r.status === 200 } - - let myCounter = new Counter("mycounter"); - - export default function() { - - group("http", function() { - check(http.get("HTTPSBIN_URL", params), statusCheck, customTags); - check(http.get("HTTPBIN_URL/status/418", params), statusCheck, customTags); - }) - - group("websockets", function() { - var response = ws.connect("WSBIN_URL/ws-echo", params, function (socket) { - socket.on('open', function open() { - console.log('ws open and say hello'); - socket.send("hello"); - }); - - socket.on('message', function (message) { - console.log('ws got message ' + message); - if (message != "hello") { - fail("Expected to receive 'hello' but got '" + message + "' instead !"); - } - console.log('ws closing socket...'); - socket.close(); - }); - - socket.on('close', function () { - console.log('ws close'); - }); - - socket.on('error', function (e) { - console.log('ws error: ' + e.error()); - }); - }); - console.log('connect returned'); - check(response, { "status is 101": (r) => r && r.status === 101 }, customTags); - }) - - myCounter.add(1, customTags); - } - `)) - - r, err := js.New( - getTestPreInitState(t), - &loader.SourceData{URL: &url.URL{Path: "/script.js"}, Data: script}, - nil, - ) - require.NoError(t, err) - - mockOutput := mockoutput.New() - test := newTestEngine(t, nil, r, []output.Output{mockOutput}, lib.Options{ - Iterations: null.IntFrom(3), - VUs: null.IntFrom(2), - Hosts: types.NullHosts{Trie: tb.Dialer.Hosts, Valid: true}, - RunTags: runTags, - SystemTags: &metrics.DefaultSystemTagSet, - InsecureSkipTLSVerify: null.BoolFrom(true), - }) - - errC := make(chan error) - go func() { errC <- test.run() }() - - select { - case <-time.After(10 * time.Second): - t.Fatal("Test timed out") - case err := <-errC: - require.NoError(t, err) - } - test.wait() - - systemMetrics := []string{ - metrics.VUsName, metrics.VUsMaxName, metrics.IterationsName, metrics.IterationDurationName, - metrics.GroupDurationName, metrics.DataSentName, metrics.DataReceivedName, - } - - getExpectedOverVal := func(metricName string) string { - for _, sysMetric := range systemMetrics { - if sysMetric == metricName { - return expectedRunTags["over"] - } - } - return "the rainbow" - } - - for _, s := range mockOutput.Samples { - for key, expVal := range expectedRunTags { - val, ok := s.Tags.Get(key) - - if key == "over" { - expVal = getExpectedOverVal(s.Metric.Name) - } - - assert.True(t, ok) - assert.Equalf(t, expVal, val, "Wrong tag value in sample - expected: %v got: %v metric %#v", expVal, val, s.Metric) - } - } -} - -// TODO: delete when implementing https://github.com/grafana/k6/issues/1889, the -// test functionality was duplicated in cmd/integration_test.go -func TestSetupException(t *testing.T) { - t.Parallel() - - script := []byte(` - import bar from "./bar.js"; - export function setup() { - bar(); - }; - export default function() { - }; - `) - - memfs := afero.NewMemMapFs() - require.NoError(t, afero.WriteFile(memfs, "/bar.js", []byte(` - export default function () { - baz(); - } - function baz() { - throw new Error("baz"); - } - `), 0x666)) - runner, err := js.New( - getTestPreInitState(t), - &loader.SourceData{URL: &url.URL{Scheme: "file", Path: "/script.js"}, Data: script}, - map[string]afero.Fs{"file": memfs}, - ) - require.NoError(t, err) - - test := newTestEngine(t, nil, runner, nil, lib.Options{ - SystemTags: &metrics.DefaultSystemTagSet, - SetupTimeout: types.NullDurationFrom(3 * time.Second), - TeardownTimeout: types.NullDurationFrom(3 * time.Second), - VUs: null.IntFrom(3), - }) - defer test.wait() - - errC := make(chan error) - go func() { errC <- test.run() }() - - select { - case <-time.After(10 * time.Second): - t.Fatal("Test timed out") - case err := <-errC: - require.Error(t, err) - var exception errext.Exception - require.ErrorAs(t, err, &exception) - require.Equal(t, "Error: baz\n\tat baz (file:///bar.js:6:16(3))\n"+ - "\tat file:///bar.js:3:8(3)\n\tat setup (file:///script.js:4:2(4))\n", - err.Error()) - } -} - func TestEmittedMetricsWhenScalingDown(t *testing.T) { t.Parallel() tb := httpmultibin.NewHTTPMultiBin(t) @@ -1059,237 +751,4 @@ func TestMetricsEmission(t *testing.T) { }) } } - -// TODO: delete when implementing https://github.com/grafana/k6/issues/1889, the -// test functionality was duplicated in cmd/integration_test.go -func TestMinIterationDurationInSetupTeardownStage(t *testing.T) { - t.Parallel() - setupScript := ` - import { sleep } from "k6"; - - export function setup() { - sleep(1); - } - - export let options = { - minIterationDuration: "2s", - scenarios: { - we_need_hard_stop: { - executor: "constant-vus", - vus: 2, - duration: "1.9s", - gracefulStop: "0s", - }, - }, - setupTimeout: "3s", - }; - - export default function () { - };` - teardownScript := ` - import { sleep } from "k6"; - - export let options = { - minIterationDuration: "2s", - scenarios: { - we_need_hard_stop: { - executor: "constant-vus", - vus: 2, - duration: "1.9s", - gracefulStop: "0s", - }, - }, - teardownTimeout: "3s", - }; - - export default function () { - }; - - export function teardown() { - sleep(1); - } -` - tests := []struct { - name, script string - }{ - {"Test setup", setupScript}, - {"Test teardown", teardownScript}, - } - for _, tc := range tests { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - runner, err := js.New( - getTestPreInitState(t), - &loader.SourceData{URL: &url.URL{Path: "/script.js"}, Data: []byte(tc.script)}, - nil, - ) - require.NoError(t, err) - - test := newTestEngine(t, nil, runner, nil, runner.GetOptions()) - - errC := make(chan error) - go func() { errC <- test.run() }() - select { - case <-time.After(10 * time.Second): - t.Fatal("Test timed out") - case err := <-errC: - require.NoError(t, err) - test.wait() - require.False(t, test.engine.IsTainted()) - } - }) - } -} - -// TODO: delete when implementing https://github.com/grafana/k6/issues/1889, the -// test functionality was duplicated in cmd/integration_test.go -func TestEngineRunsTeardownEvenAfterTestRunIsAborted(t *testing.T) { - t.Parallel() - - piState := getTestPreInitState(t) - testMetric, err := piState.Registry.NewMetric("teardown_metric", metrics.Counter) - require.NoError(t, err) - - var test *testStruct - runner := &minirunner.MiniRunner{ - Fn: func(_ context.Context, _ *lib.State, _ chan<- metrics.SampleContainer) error { - test.runAbort(fmt.Errorf("custom error")) // we cancel the run immediately after the test starts - return nil - }, - TeardownFn: func(_ context.Context, out chan<- metrics.SampleContainer) error { - out <- metrics.Sample{ - TimeSeries: metrics.TimeSeries{ - Metric: testMetric, - Tags: piState.Registry.RootTagSet(), - }, - Time: time.Now(), - Value: 1, - } - return nil - }, - } - - mockOutput := mockoutput.New() - test = newTestEngineWithTestPreInitState(t, nil, runner, []output.Output{mockOutput}, lib.Options{ - VUs: null.IntFrom(1), Iterations: null.IntFrom(1), - }, piState) - - assert.ErrorContains(t, test.run(), "custom error") - test.wait() - - var count float64 - for _, sample := range mockOutput.Samples { - if sample.Metric == testMetric { - count += sample.Value - } - } - assert.Equal(t, 1.0, count) -} - -// TODO: delete when implementing https://github.com/grafana/k6/issues/1889, the -// test functionality was duplicated in cmd/integration_test.go -func TestActiveVUsCount(t *testing.T) { - t.Parallel() - - script := []byte(` - var sleep = require('k6').sleep; - - exports.options = { - scenarios: { - carr1: { - executor: 'constant-arrival-rate', - rate: 10, - preAllocatedVUs: 1, - maxVUs: 10, - startTime: '0s', - duration: '3s', - gracefulStop: '0s', - }, - carr2: { - executor: 'constant-arrival-rate', - rate: 10, - preAllocatedVUs: 1, - maxVUs: 10, - duration: '3s', - startTime: '3s', - gracefulStop: '0s', - }, - rarr: { - executor: 'ramping-arrival-rate', - startRate: 5, - stages: [ - { target: 10, duration: '2s' }, - { target: 0, duration: '2s' }, - ], - preAllocatedVUs: 1, - maxVUs: 10, - startTime: '6s', - gracefulStop: '0s', - }, - } - } - - exports.default = function () { - sleep(5); - } - `) - - logger := testutils.NewLogger(t) - logHook := testutils.SimpleLogrusHook{HookedLevels: logrus.AllLevels} - logger.AddHook(&logHook) - - rtOpts := lib.RuntimeOptions{CompatibilityMode: null.StringFrom("base")} - - registry := metrics.NewRegistry() - piState := &lib.TestPreInitState{ - Logger: logger, - Registry: registry, - BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry), - RuntimeOptions: rtOpts, - } - runner, err := js.New(piState, &loader.SourceData{URL: &url.URL{Path: "/script.js"}, Data: script}, nil) - require.NoError(t, err) - - mockOutput := mockoutput.New() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - opts, err := executor.DeriveScenariosFromShortcuts(lib.Options{ - MetricSamplesBufferSize: null.NewInt(200, false), - }.Apply(runner.GetOptions()), nil) - require.NoError(t, err) - - testState := getTestRunState(t, piState, opts, runner) - execScheduler, err := execution.NewScheduler(testState) - require.NoError(t, err) - engine, err := NewEngine(testState, execScheduler, []output.Output{mockOutput}) - require.NoError(t, err) - require.NoError(t, engine.OutputManager.StartOutputs()) - run, waitFn, err := engine.Init(ctx, ctx) // no need for 2 different contexts - require.NoError(t, err) - - errC := make(chan error) - go func() { errC <- run() }() - - select { - case <-time.After(15 * time.Second): - t.Fatal("Test timed out") - case err := <-errC: - require.NoError(t, err) - cancel() - waitFn() - engine.OutputManager.StopOutputs(nil) - require.False(t, engine.IsTainted()) - } - - assert.Equal(t, 10.0, getMetricMax(mockOutput, metrics.VUsName)) - assert.Equal(t, 10.0, getMetricMax(mockOutput, metrics.VUsMaxName)) - - logEntries := logHook.Drain() - assert.Len(t, logEntries, 3) - for _, logEntry := range logEntries { - assert.Equal(t, logrus.WarnLevel, logEntry.Level) - assert.Equal(t, "Insufficient VUs, reached 10 active VUs and cannot initialize more", logEntry.Message) - } -} +*/ diff --git a/output/manager.go b/output/manager.go index 4d041dd346c..21e24c73a3f 100644 --- a/output/manager.go +++ b/output/manager.go @@ -1,10 +1,16 @@ package output import ( + "sync" + "time" + "github.com/sirupsen/logrus" "go.k6.io/k6/metrics" ) +// TODO: completely get rid of this, see https://github.com/grafana/k6/issues/2430 +const sendBatchToOutputsRate = 50 * time.Millisecond + // Manager can be used to manage multiple outputs at the same time. type Manager struct { outputs []Output @@ -22,11 +28,65 @@ func NewManager(outputs []Output, logger logrus.FieldLogger, testStopCallback fu } } -// StartOutputs spins up all configured outputs. If some output fails to start, +// Start spins up all configured outputs and then starts a new goroutine that +// pipes metrics from the given samples channel to them. +// +// If some output fails to start, it stops the already started ones. This may +// take some time, since some outputs make initial network requests to set up +// whatever remote services are going to listen to them. +// +// If all outputs start successfully, this method will return 2 callbacks. The +// first one, wait(), will block until the samples channel has been closed and +// all of its buffered metrics have been sent to all outputs. The second +// callback will call the Stop() or StopWithTestError() method of every output. +func (om *Manager) Start(samplesChan chan metrics.SampleContainer) (wait func(), finish func(error), err error) { + if err := om.startOutputs(); err != nil { + return nil, nil, err + } + + wg := &sync.WaitGroup{} + wg.Add(1) + + sendToOutputs := func(sampleContainers []metrics.SampleContainer) { + for _, out := range om.outputs { + out.AddMetricSamples(sampleContainers) + } + } + + go func() { + defer wg.Done() + ticker := time.NewTicker(sendBatchToOutputsRate) + defer ticker.Stop() + + buffer := make([]metrics.SampleContainer, 0, cap(samplesChan)) + for { + select { + case sampleContainer, ok := <-samplesChan: + if !ok { + sendToOutputs(buffer) + return + } + buffer = append(buffer, sampleContainer) + case <-ticker.C: + sendToOutputs(buffer) + buffer = make([]metrics.SampleContainer, 0, cap(buffer)) + } + } + }() + + wait = wg.Wait + finish = func(testErr error) { + wait() // just in case, though it shouldn't be needed if API is used correctly + om.stopOutputs(testErr, len(om.outputs)) + } + return wait, finish, nil +} + +// startOutputs spins up all configured outputs. If some output fails to start, // it stops the already started ones. This may take some time, since some // outputs make initial network requests to set up whatever remote services are // going to listen to them. -func (om *Manager) StartOutputs() error { +func (om *Manager) startOutputs() error { om.logger.Debugf("Starting %d outputs...", len(om.outputs)) for i, out := range om.outputs { if stopOut, ok := out.(WithTestRunStop); ok { @@ -41,11 +101,6 @@ func (om *Manager) StartOutputs() error { return nil } -// StopOutputs stops all configured outputs. -func (om *Manager) StopOutputs(testErr error) { - om.stopOutputs(testErr, len(om.outputs)) -} - func (om *Manager) stopOutputs(testErr error, upToID int) { om.logger.Debugf("Stopping %d outputs...", upToID) for i := 0; i < upToID; i++ { @@ -62,17 +117,3 @@ func (om *Manager) stopOutputs(testErr error, upToID int) { } } } - -// AddMetricSamples is a temporary method to make the Manager usable in the -// current Engine. It needs to be replaced with the full metric pump. -// -// TODO: refactor -func (om *Manager) AddMetricSamples(sampleContainers []metrics.SampleContainer) { - if len(sampleContainers) == 0 { - return - } - - for _, out := range om.outputs { - out.AddMetricSamples(sampleContainers) - } -}