diff --git a/api/v1/group_routes_test.go b/api/v1/group_routes_test.go index 53a846acdad..1e57aa1b3ec 100644 --- a/api/v1/group_routes_test.go +++ b/api/v1/group_routes_test.go @@ -47,7 +47,7 @@ func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurfa execScheduler, err := execution.NewScheduler(testState, local.NewController()) require.NoError(tb, err) - me, err := engine.NewMetricsEngine(testState, true) + me, err := engine.NewMetricsEngine(testState.Registry, testState.Logger) require.NoError(tb, err) ctx, cancel := context.WithCancel(context.Background()) diff --git a/api/v1/setup_teardown_routes_test.go b/api/v1/setup_teardown_routes_test.go index b05ac94ba8a..5a804eca21d 100644 --- a/api/v1/setup_teardown_routes_test.go +++ b/api/v1/setup_teardown_routes_test.go @@ -141,7 +141,7 @@ func TestSetupData(t *testing.T) { execScheduler, err := execution.NewScheduler(testState, local.NewController()) require.NoError(t, err) - metricsEngine, err := engine.NewMetricsEngine(testState, true) + metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger) require.NoError(t, err) globalCtx, globalCancel := context.WithCancel(context.Background()) diff --git a/api/v1/status_routes_test.go b/api/v1/status_routes_test.go index e3509af07a8..657c1dcd573 100644 --- a/api/v1/status_routes_test.go +++ b/api/v1/status_routes_test.go @@ -115,7 +115,7 @@ func TestPatchStatus(t *testing.T) { execScheduler, err := execution.NewScheduler(testState, local.NewController()) require.NoError(t, err) - metricsEngine, err := engine.NewMetricsEngine(testState, true) + metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger) require.NoError(t, err) globalCtx, globalCancel := context.WithCancel(context.Background()) diff --git a/cmd/agent.go b/cmd/agent.go index 9f28752e718..3ec6336eda0 100644 --- a/cmd/agent.go +++ b/cmd/agent.go @@ -84,12 +84,72 @@ func getMetricsHook( } } +func loadAndConfigureDistTest(gs *globalState, distTest *distributed.Test) (*loadedAndConfiguredTest, error) { + var options lib.Options + if err := json.Unmarshal(distTest.Options, &options); err != nil { + return nil, err + } + + arc, err := lib.ReadArchive(bytes.NewReader(distTest.Archive)) + if err != nil { + return nil, err + } + + registry := metrics.NewRegistry() + piState := &lib.TestPreInitState{ + Logger: gs.logger, + RuntimeOptions: lib.RuntimeOptions{ + NoThresholds: null.BoolFrom(true), + NoSummary: null.BoolFrom(true), + Env: arc.Env, + CompatibilityMode: null.StringFrom(arc.CompatibilityMode), + }, + Registry: registry, + BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry), + } + + initRunner, err := js.NewFromArchive(piState, arc) + if err != nil { + return nil, err + } + + test := &loadedTest{ + pwd: arc.Pwd, + sourceRootPath: arc.Filename, + source: &loader.SourceData{ + Data: distTest.Archive, + URL: arc.FilenameURL, + }, + fs: afero.NewMemMapFs(), // TODO: figure out what should be here + fileSystems: arc.Filesystems, + preInitState: piState, + initRunner: initRunner, + } + + pseudoConsoldatedConfig := applyDefault(Config{Options: options}) + for _, thresholds := range pseudoConsoldatedConfig.Thresholds { + if err = thresholds.Parse(); err != nil { + return nil, err + } + } + derivedConfig, err := deriveAndValidateConfig(pseudoConsoldatedConfig, initRunner.IsExecutable, gs.logger) + if err != nil { + return nil, err + } + + return &loadedAndConfiguredTest{ + loadedTest: test, + consolidatedConfig: pseudoConsoldatedConfig, + derivedConfig: derivedConfig, + }, nil +} + // TODO: a whole lot of cleanup, refactoring, error handling and hardening -func getCmdAgent(gs *globalState) *cobra.Command { //nolint: funlen +func getCmdAgent(gs *globalState) *cobra.Command { c := &cmdsRunAndAgent{gs: gs} - c.loadConfiguredTest = func(cmd *cobra.Command, args []string) ( - *loadedAndConfiguredTest, execution.Controller, error, + c.loadConfiguredTests = func(cmd *cobra.Command, args []string) ( + []*loadedAndConfiguredTest, execution.Controller, error, ) { conn, err := grpc.Dial(args[0], grpc.WithInsecure()) if err != nil { @@ -107,74 +167,25 @@ func getCmdAgent(gs *globalState) *cobra.Command { //nolint: funlen return nil, nil, err } - c.metricsEngineHook = getMetricsHook(gs.ctx, resp.InstanceID, client, gs.logger) - - controller, err := distributed.NewAgentController(gs.ctx, resp.InstanceID, client, gs.logger) - if err != nil { - return nil, nil, err - } - - var options lib.Options - if err := json.Unmarshal(resp.Options, &options); err != nil { - return nil, nil, err - } - - arc, err := lib.ReadArchive(bytes.NewReader(resp.Archive)) - if err != nil { - return nil, nil, err - } - - registry := metrics.NewRegistry() - piState := &lib.TestPreInitState{ - Logger: gs.logger, - RuntimeOptions: lib.RuntimeOptions{ - NoThresholds: null.BoolFrom(true), - NoSummary: null.BoolFrom(true), - Env: arc.Env, - CompatibilityMode: null.StringFrom(arc.CompatibilityMode), - }, - Registry: registry, - BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry), - } - - initRunner, err := js.NewFromArchive(piState, arc) - if err != nil { - return nil, nil, err - } - - test := &loadedTest{ - pwd: arc.Pwd, - sourceRootPath: arc.Filename, - source: &loader.SourceData{ - Data: resp.Archive, - URL: arc.FilenameURL, - }, - fs: afero.NewMemMapFs(), // TODO: figure out what should be here - fileSystems: arc.Filesystems, - preInitState: piState, - initRunner: initRunner, - } - - pseudoConsoldatedConfig := applyDefault(Config{Options: options}) - for _, thresholds := range pseudoConsoldatedConfig.Thresholds { - if err = thresholds.Parse(); err != nil { + var configuredTests []*loadedAndConfiguredTest + for _, test := range resp.Tests { + cTest, err := loadAndConfigureDistTest(gs, test) + if err != nil { return nil, nil, err } + configuredTests = append(configuredTests, cTest) } - derivedConfig, err := deriveAndValidateConfig(pseudoConsoldatedConfig, initRunner.IsExecutable, gs.logger) + + c.metricsEngineHook = getMetricsHook(gs.ctx, resp.InstanceID, client, gs.logger) + + controller, err := distributed.NewAgentController(gs.ctx, resp.InstanceID, client, gs.logger) if err != nil { return nil, nil, err } - configuredTest := &loadedAndConfiguredTest{ - loadedTest: test, - consolidatedConfig: pseudoConsoldatedConfig, - derivedConfig: derivedConfig, - } - gs.flags.address = "" // TODO: fix, this is a hack so agents don't start an API server - return configuredTest, controller, nil // TODO + return configuredTests, controller, nil // TODO } agentCmd := &cobra.Command{ diff --git a/cmd/archive.go b/cmd/archive.go index ef18e0885ff..1ba8999d982 100644 --- a/cmd/archive.go +++ b/cmd/archive.go @@ -14,7 +14,11 @@ type cmdArchive struct { } func (c *cmdArchive) run(cmd *cobra.Command, args []string) error { - test, err := loadAndConfigureLocalTest(c.gs, cmd, args, getPartialConfig) + preInitState, err := getPreInitState(c.gs, cmd) + if err != nil { + return err + } + test, err := loadAndConfigureLocalTest(c.gs, preInitState, cmd, args[0], getPartialConfig) if err != nil { return err } @@ -84,7 +88,7 @@ An archive is a fully self-contained test run, and can be executed identically e # Run the resulting archive. k6 run myarchive.tar`[1:], - Args: cobra.ExactArgs(1), + Args: cobra.ExactArgs(1), // TODO: remove and support test suites? RunE: c.run, } diff --git a/cmd/cloud.go b/cmd/cloud.go index 4f6ec647f3d..4a39536471a 100644 --- a/cmd/cloud.go +++ b/cmd/cloud.go @@ -73,7 +73,11 @@ func (c *cmdCloud) run(cmd *cobra.Command, args []string) error { ) printBar(c.gs, progressBar) - test, err := loadAndConfigureLocalTest(c.gs, cmd, args, getPartialConfig) + preInitState, err := getPreInitState(c.gs, cmd) + if err != nil { + return err + } + test, err := loadAndConfigureLocalTest(c.gs, preInitState, cmd, args[0], getPartialConfig) if err != nil { return err } diff --git a/cmd/convert.go b/cmd/convert.go index c5c3c1640d0..58d4c1733cb 100644 --- a/cmd/convert.go +++ b/cmd/convert.go @@ -13,6 +13,7 @@ import ( ) // TODO: split apart like `k6 run` and `k6 archive`? +// //nolint:funlen,gocognit func getCmdConvert(globalState *globalState) *cobra.Command { var ( diff --git a/cmd/coordinator.go b/cmd/coordinator.go index 7e17f994b21..1a10658aad2 100644 --- a/cmd/coordinator.go +++ b/cmd/coordinator.go @@ -27,36 +27,52 @@ type cmdCoordinator struct { func (c *cmdCoordinator) run(cmd *cobra.Command, args []string) (err error) { ctx, runAbort := execution.NewTestRunContext(c.gs.ctx, c.gs.logger) - test, err := loadAndConfigureLocalTest(c.gs, cmd, args, getPartialConfig) + tests, err := loadAndConfigureLocalTests(c.gs, cmd, args, getPartialConfig) if err != nil { return err } - // Only consolidated options, not derived - testRunState, err := test.buildTestRunState(test.consolidatedConfig.Options) + // TODO: refactor at some point, this limits us to handleSummary() from the first test + firstTest := tests[0] + // TODO: refactor - this is safe, preInitState is the same for all tests, + // but it's still icky to get it that way + preInitState := firstTest.preInitState + metricsEngine, err := engine.NewMetricsEngine(preInitState.Registry, c.gs.logger) if err != nil { return err } - shouldProcessMetrics := !testRunState.RuntimeOptions.NoSummary.Bool || !testRunState.RuntimeOptions.NoThresholds.Bool - metricsEngine, err := engine.NewMetricsEngine(testRunState, shouldProcessMetrics) - if err != nil { - return err + testArchives := make([]*lib.Archive, len(tests)) + for i, test := range tests { + runState, err := test.buildTestRunState(test.consolidatedConfig.Options) + if err != nil { + return err + } + + // We get the thresholds from all tests + testArchives[i] = runState.Runner.MakeArchive() + err = metricsEngine.InitSubMetricsAndThresholds( + test.derivedConfig.Options, + preInitState.RuntimeOptions.NoThresholds.Bool, + ) + if err != nil { + return err + } } coordinator, err := distributed.NewCoordinatorServer( - c.instanceCount, test.initRunner.MakeArchive(), metricsEngine, c.gs.logger, + c.instanceCount, testArchives, metricsEngine, c.gs.logger, ) if err != nil { return err } - if !testRunState.RuntimeOptions.NoSummary.Bool { + if !preInitState.RuntimeOptions.NoSummary.Bool { defer func() { c.gs.logger.Debug("Generating the end-of-test summary...") - summaryResult, serr := test.initRunner.HandleSummary(ctx, &lib.Summary{ + summaryResult, serr := firstTest.initRunner.HandleSummary(ctx, &lib.Summary{ Metrics: metricsEngine.ObservedMetrics, - RootGroup: test.initRunner.GetDefaultGroup(), + RootGroup: firstTest.initRunner.GetDefaultGroup(), TestRunDuration: coordinator.GetCurrentTestRunDuration(), NoColor: c.gs.flags.noColor, UIState: lib.UIState{ @@ -73,9 +89,9 @@ func (c *cmdCoordinator) run(cmd *cobra.Command, args []string) (err error) { }() } - if !testRunState.RuntimeOptions.NoThresholds.Bool { + if !preInitState.RuntimeOptions.NoThresholds.Bool { getCurrentTestDuration := coordinator.GetCurrentTestRunDuration - finalizeThresholds := metricsEngine.StartThresholdCalculations(getCurrentTestDuration, runAbort) + finalizeThresholds := metricsEngine.StartThresholdCalculations(nil, getCurrentTestDuration, runAbort) defer func() { // This gets called after all of the outputs have stopped, so we are diff --git a/cmd/inspect.go b/cmd/inspect.go index acedd1a344b..d45ccc9b7c8 100644 --- a/cmd/inspect.go +++ b/cmd/inspect.go @@ -18,9 +18,14 @@ func getCmdInspect(gs *globalState) *cobra.Command { Use: "inspect [file]", Short: "Inspect a script or archive", Long: `Inspect a script or archive.`, - Args: cobra.ExactArgs(1), + Args: cobra.ExactArgs(1), // TODO: remove and support test suites? RunE: func(cmd *cobra.Command, args []string) error { - test, err := loadLocalTest(gs, cmd, args) + preInitState, err := getPreInitState(gs, cmd) + if err != nil { + return err + } + + test, err := loadLocalTest(gs, preInitState, cmd, args[0]) if err != nil { return err } diff --git a/cmd/run.go b/cmd/run.go index cafbe2afcef..ca249cfdfca 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -38,24 +38,46 @@ type cmdsRunAndAgent struct { gs *globalState // TODO: figure out something more elegant? - loadConfiguredTest func(cmd *cobra.Command, args []string) (*loadedAndConfiguredTest, execution.Controller, error) - metricsEngineHook func(*engine.MetricsEngine) func() - testEndHook func(err error) + loadConfiguredTests func(cmd *cobra.Command, args []string) ([]*loadedAndConfiguredTest, execution.Controller, error) + metricsEngineHook func(*engine.MetricsEngine) func() + testEndHook func(err error) } -// TODO: split apart some more -// -//nolint:funlen,gocognit,gocyclo,cyclop func (c *cmdsRunAndAgent) run(cmd *cobra.Command, args []string) (err error) { - var logger logrus.FieldLogger = c.gs.logger defer func() { - logger.Debugf("Everything has finished, exiting k6 with error '%s'!", err) + c.gs.logger.Debugf("Everything has finished, exiting k6 with error '%s'!", err) if c.testEndHook != nil { c.testEndHook(err) } }() printBanner(c.gs) + // TODO: hadnle contexts, Ctrl+C, REST API and a lot of other things here + + tests, controller, err := c.loadConfiguredTests(cmd, args) + if err != nil { + return err + } + + execution.SignalAndWait(controller, "test-suite-start") + defer execution.SignalAndWait(controller, "test-suite-done") + for i, test := range tests { + testName := fmt.Sprintf("%d", i) // TODO: something better but still unique + testController := execution.GetNamespacedController(testName, controller) + + err := c.runTest(cmd, test, testController) + if err != nil { + return err + } + } + return nil +} + +//nolint:funlen,gocognit,gocyclo,cyclop +func (c *cmdsRunAndAgent) runTest( + cmd *cobra.Command, test *loadedAndConfiguredTest, controller execution.Controller, +) (err error) { + var logger logrus.FieldLogger = c.gs.logger globalCtx, globalCancel := context.WithCancel(c.gs.ctx) defer globalCancel() @@ -63,16 +85,14 @@ func (c *cmdsRunAndAgent) run(cmd *cobra.Command, args []string) (err error) { // k6 was started with the --linger option. lingerCtx, lingerCancel := context.WithCancel(globalCtx) defer lingerCancel() + execution.SignalAndWait(controller, "test-start") + defer execution.SignalAndWait(controller, "test-done") // runCtx is used for the test run execution and is created with the special // execution.NewTestRunContext() function so that it can be aborted even // from sub-contexts while also attaching a reason for the abort. runCtx, runAbort := execution.NewTestRunContext(lingerCtx, logger) - test, controller, err := c.loadConfiguredTest(cmd, args) - if err != nil { - return err - } if test.keyLogger != nil { defer func() { if klErr := test.keyLogger.Close(); klErr != nil { @@ -122,19 +142,25 @@ func (c *cmdsRunAndAgent) run(cmd *cobra.Command, args []string) (err error) { return err } - // We'll need to pipe metrics to the MetricsEngine and process them if any - // of these are enabled: thresholds, end-of-test summary, engine hook - shouldProcessMetrics := (!testRunState.RuntimeOptions.NoSummary.Bool || - !testRunState.RuntimeOptions.NoThresholds.Bool || c.metricsEngineHook != nil) - metricsEngine, err := engine.NewMetricsEngine(testRunState, shouldProcessMetrics) + metricsEngine, err := engine.NewMetricsEngine(testRunState.Registry, logger) if err != nil { return err } + // We'll need to pipe metrics to the MetricsEngine and process them if any + // of these are enabled: thresholds, end-of-test summary, engine hook + shouldProcessMetrics := (!testRunState.RuntimeOptions.NoSummary.Bool || + !testRunState.RuntimeOptions.NoThresholds.Bool || c.metricsEngineHook != nil) + var metricsIngester *engine.OutputIngester if shouldProcessMetrics { + err = metricsEngine.InitSubMetricsAndThresholds(conf.Options, testRunState.RuntimeOptions.NoThresholds.Bool) + if err != nil { + return err + } // We'll need to pipe metrics to the MetricsEngine if either the // thresholds or the end-of-test summary are enabled. - outputs = append(outputs, metricsEngine.CreateIngester()) + metricsIngester = metricsEngine.CreateIngester() + outputs = append(outputs, metricsIngester) } executionState := execScheduler.GetState() @@ -192,7 +218,7 @@ func (c *cmdsRunAndAgent) run(cmd *cobra.Command, args []string) (err error) { if !testRunState.RuntimeOptions.NoThresholds.Bool { getCurrentTestDuration := executionState.GetCurrentTestRunDuration - finalizeThresholds := metricsEngine.StartThresholdCalculations(getCurrentTestDuration, runAbort) + finalizeThresholds := metricsEngine.StartThresholdCalculations(metricsIngester, getCurrentTestDuration, runAbort) defer func() { // This gets called after the Samples channel has been closed and // the OutputManager has flushed all of the cached samples to @@ -258,7 +284,7 @@ func (c *cmdsRunAndAgent) run(cmd *cobra.Command, args []string) (err error) { } printExecutionDescription( - c.gs, "local", args[0], "", conf, executionState.ExecutionTuple, executionPlan, outputs, + c.gs, "local", test.sourceRootPath, "", conf, executionState.ExecutionTuple, executionPlan, outputs, ) // Trap Interrupts, SIGINTs and SIGTERMs. @@ -347,9 +373,11 @@ func (c *cmdsRunAndAgent) flagSet() *pflag.FlagSet { func getCmdRun(gs *globalState) *cobra.Command { c := &cmdsRunAndAgent{ gs: gs, - loadConfiguredTest: func(cmd *cobra.Command, args []string) (*loadedAndConfiguredTest, execution.Controller, error) { - test, err := loadAndConfigureLocalTest(gs, cmd, args, getConfig) - return test, local.NewController(), err + loadConfiguredTests: func(cmd *cobra.Command, args []string) ( + []*loadedAndConfiguredTest, execution.Controller, error, + ) { + tests, err := loadAndConfigureLocalTests(gs, cmd, args, getConfig) + return tests, local.NewController(), err }, } @@ -378,7 +406,6 @@ a commandline interface for interacting with it.`, # Send metrics to an influxdb server k6 run -o influxdb=http://1.2.3.4:8086/k6`[1:], - Args: exactArgsWithMsg(1, "arg should either be \"-\", if reading script from stdin, or a path to a script file"), RunE: c.run, } diff --git a/cmd/test_load.go b/cmd/test_load.go index 9d3c24d5205..ea8d6aeddc0 100644 --- a/cmd/test_load.go +++ b/cmd/test_load.go @@ -30,33 +30,17 @@ const ( // loadedTest contains all of data, details and dependencies of a loaded // k6 test, but without any config consolidation. type loadedTest struct { + preInitState *lib.TestPreInitState sourceRootPath string // contains the raw string the user supplied pwd string source *loader.SourceData fs afero.Fs fileSystems map[string]afero.Fs - preInitState *lib.TestPreInitState initRunner lib.Runner // TODO: rename to something more appropriate keyLogger io.Closer } -func loadLocalTest(gs *globalState, cmd *cobra.Command, args []string) (*loadedTest, error) { - if len(args) < 1 { - return nil, fmt.Errorf("k6 needs at least one argument to load the test") - } - - sourceRootPath := args[0] - gs.logger.Debugf("Resolving and reading test '%s'...", sourceRootPath) - src, fileSystems, pwd, err := readSource(gs, sourceRootPath) - if err != nil { - return nil, err - } - resolvedPath := src.URL.String() - gs.logger.Debugf( - "'%s' resolved to '%s' and successfully loaded %d bytes!", - sourceRootPath, resolvedPath, len(src.Data), - ) - +func getPreInitState(gs *globalState, cmd *cobra.Command) (*lib.TestPreInitState, error) { gs.logger.Debugf("Gathering k6 runtime options...") runtimeOptions, err := getRuntimeOptions(cmd.Flags(), gs.envVars) if err != nil { @@ -64,20 +48,35 @@ func loadLocalTest(gs *globalState, cmd *cobra.Command, args []string) (*loadedT } registry := metrics.NewRegistry() - state := &lib.TestPreInitState{ + return &lib.TestPreInitState{ Logger: gs.logger, RuntimeOptions: runtimeOptions, Registry: registry, BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry), + }, nil +} + +func loadLocalTest( + gs *globalState, preInitState *lib.TestPreInitState, cmd *cobra.Command, sourceRootPath string, +) (*loadedTest, error) { + gs.logger.Debugf("Resolving and reading test '%s'...", sourceRootPath) + src, fileSystems, pwd, err := readSource(gs, sourceRootPath) + if err != nil { + return nil, err } + resolvedPath := src.URL.String() + gs.logger.Debugf( + "'%s' resolved to '%s' and successfully loaded %d bytes!", + sourceRootPath, resolvedPath, len(src.Data), + ) test := &loadedTest{ + preInitState: preInitState, pwd: pwd, sourceRootPath: sourceRootPath, source: src, fs: gs.fs, fileSystems: fileSystems, - preInitState: state, } gs.logger.Debugf("Initializing k6 runner for '%s' (%s)...", sourceRootPath, resolvedPath) @@ -230,11 +229,36 @@ type loadedAndConfiguredTest struct { derivedConfig Config } -func loadAndConfigureLocalTest( +func loadAndConfigureLocalTests( gs *globalState, cmd *cobra.Command, args []string, cliConfigGetter func(flags *pflag.FlagSet) (Config, error), +) ([]*loadedAndConfiguredTest, error) { + if len(args) < 1 { + return nil, fmt.Errorf("k6 needs at least one argument to load the test") + } + + preInitState, err := getPreInitState(gs, cmd) + if err != nil { + return nil, err + } + + tests := make([]*loadedAndConfiguredTest, 0, len(args)) + for _, arg := range args { + test, err := loadAndConfigureLocalTest(gs, preInitState, cmd, arg, cliConfigGetter) + if err != nil { + return nil, err + } + tests = append(tests, test) + } + + return tests, nil +} + +func loadAndConfigureLocalTest( + gs *globalState, preInitState *lib.TestPreInitState, cmd *cobra.Command, sourceRootPath string, + cliConfigGetter func(flags *pflag.FlagSet) (Config, error), ) (*loadedAndConfiguredTest, error) { - test, err := loadLocalTest(gs, cmd, args) + test, err := loadLocalTest(gs, preInitState, cmd, sourceRootPath) if err != nil { return nil, err } diff --git a/execution/controller.go b/execution/controller.go index 662d19dc7df..2baba03a4bb 100644 --- a/execution/controller.go +++ b/execution/controller.go @@ -9,3 +9,32 @@ type Controller interface { Wait(eventId string) func() error Signal(eventId string) error } + +type namesspacedController struct { + namespace string + c Controller +} + +func (nc namesspacedController) GetOrCreateData(id string, callback func() ([]byte, error)) ([]byte, error) { + return nc.c.GetOrCreateData(nc.namespace+"/"+id, callback) +} + +func (nc namesspacedController) Wait(eventId string) func() error { + return nc.c.Wait(nc.namespace + "/" + eventId) +} + +func (nc namesspacedController) Signal(eventId string) error { + return nc.c.Signal(nc.namespace + "/" + eventId) +} + +func GetNamespacedController(namespace string, controller Controller) Controller { + return &namesspacedController{namespace: namespace, c: controller} +} + +func SignalAndWait(c Controller, eventID string) error { + wait := c.Wait(eventID) + if err := c.Signal(eventID); err != nil { + return err + } + return wait() +} diff --git a/execution/distributed/coordinator.go b/execution/distributed/coordinator.go index 644036cbc53..529a367d25e 100644 --- a/execution/distributed/coordinator.go +++ b/execution/distributed/coordinator.go @@ -5,6 +5,8 @@ import ( context "context" "encoding/json" "fmt" + "path/filepath" + "strconv" "sync" "sync/atomic" "time" @@ -14,54 +16,64 @@ import ( "go.k6.io/k6/metrics/engine" ) +type test struct { + archive *lib.Archive + archiveData []byte + ess lib.ExecutionSegmentSequence +} + // TODO: something more polished... type CoordinatorServer struct { UnimplementedDistributedTestServer instanceCount int - test *lib.Archive logger logrus.FieldLogger metricsEngine *engine.MetricsEngine + tests []test + testStartTimeLock sync.Mutex testStartTime *time.Time cc *coordinatorController currentInstance int32 // TODO: something a bit better, support full execution plans from JSON? - ess lib.ExecutionSegmentSequence - archive []byte wg *sync.WaitGroup } func NewCoordinatorServer( - instanceCount int, test *lib.Archive, metricsEngine *engine.MetricsEngine, logger logrus.FieldLogger, + instanceCount int, testArchives []*lib.Archive, metricsEngine *engine.MetricsEngine, logger logrus.FieldLogger, ) (*CoordinatorServer, error) { - segments, err := test.Options.ExecutionSegment.Split(int64(instanceCount)) - if err != nil { - return nil, err - } - ess, err := lib.NewExecutionSegmentSequence(segments...) - if err != nil { - return nil, err + tests := make([]test, len(testArchives)) + for i, testArchive := range testArchives { + segments, err := testArchive.Options.ExecutionSegment.Split(int64(instanceCount)) + if err != nil { + return nil, err + } + ess, err := lib.NewExecutionSegmentSequence(segments...) + if err != nil { + return nil, err + } + buf := &bytes.Buffer{} + if err = testArchive.Write(buf); err != nil { + return nil, err + } + tests[i] = test{ + ess: ess, + archive: testArchive, + archiveData: buf.Bytes(), + } } // TODO: figure out some way to add metrics from the instance to the metricsEngine - buf := &bytes.Buffer{} - if err = test.Write(buf); err != nil { - return nil, err - } - wg := &sync.WaitGroup{} wg.Add(instanceCount) cs := &CoordinatorServer{ instanceCount: instanceCount, - test: test, + tests: tests, metricsEngine: metricsEngine, logger: logger, - ess: ess, cc: newCoordinatorController(instanceCount, logger), - archive: buf.Bytes(), wg: wg, } @@ -71,25 +83,31 @@ func NewCoordinatorServer( } func (cs *CoordinatorServer) monitorProgress() { - wg := cs.cc.getSignalWG("test-start") // TODO: use constant when we refactor scheduler.go + wg := cs.cc.getSignalWG("test-suite-start") // TODO: use constant when we refactor scheduler.go wg.Wait() - cs.logger.Info("All instances ready to start initializing VUs...") + cs.logger.Info("All instances ready!") - wg = cs.cc.getSignalWG("test-ready-to-run-setup") // TODO: use constant when we refactor scheduler.go - wg.Wait() - cs.logger.Info("VUs initialized, setup()...") - cs.testStartTimeLock.Lock() - t := time.Now() - cs.testStartTime = &t - cs.testStartTimeLock.Unlock() + for i, test := range cs.tests { + testName := fmt.Sprintf("%d", i) + wg = cs.cc.getSignalWG(testName + "/test-start") // TODO: use constant when we refactor scheduler.go + wg.Wait() + cs.logger.Infof("Test %d (%s) started...", i+1, filepath.Base(test.archive.FilenameURL.Path)) - wg = cs.cc.getSignalWG("setup-done") // TODO: use constant when we refactor scheduler.go - wg.Wait() - cs.logger.Info("setup() done, starting test!") + cs.testStartTimeLock.Lock() + if cs.testStartTime == nil { + t := time.Now() + cs.testStartTime = &t + } + cs.testStartTimeLock.Unlock() - wg = cs.cc.getSignalWG("test-done") // TODO: use constant when we refactor scheduler.go + wg = cs.cc.getSignalWG(testName + "/test-done") // TODO: use constant when we refactor scheduler.go + wg.Wait() + cs.logger.Infof("Test %d (%s) ended!", i+1, filepath.Base(test.archive.FilenameURL.Path)) + } + + wg = cs.cc.getSignalWG("test-suite-done") // TODO: use constant when we refactor scheduler.go wg.Wait() - cs.logger.Info("Instances finished with the test") + cs.logger.Info("Instances finished with the test suite") } func (cs *CoordinatorServer) GetCurrentTestRunDuration() time.Duration { @@ -110,18 +128,32 @@ func (cs *CoordinatorServer) Register(context.Context, *RegisterRequest) (*Regis } cs.logger.Infof("Instance %d of %d connected!", instanceID, cs.instanceCount) - instanceOptions := cs.test.Options - instanceOptions.ExecutionSegment = cs.ess[instanceID-1] - instanceOptions.ExecutionSegmentSequence = &cs.ess - options, err := json.Marshal(instanceOptions) - if err != nil { - return nil, err + instanceTests := make([]*Test, len(cs.tests)) + for i, test := range cs.tests { + opts := test.archive.Options + opts.ExecutionSegment = test.ess[instanceID-1] + opts.ExecutionSegmentSequence = &test.ess + + if opts.RunTags == nil { + opts.RunTags = make(map[string]string) + } + opts.RunTags["test_id"] = test.archive.Filename + opts.RunTags["instance_id"] = strconv.Itoa(int(instanceID)) + + jsonOpts, err := json.Marshal(opts) + if err != nil { + return nil, err + } + + instanceTests[i] = &Test{ + Archive: test.archiveData, + Options: jsonOpts, + } } return &RegisterResponse{ InstanceID: uint32(instanceID), - Archive: cs.archive, - Options: options, + Tests: instanceTests, }, nil } diff --git a/execution/distributed/distributed.pb.go b/execution/distributed/distributed.pb.go index f73564daa00..72f9d241cce 100644 --- a/execution/distributed/distributed.pb.go +++ b/execution/distributed/distributed.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.27.1 -// protoc v3.19.4 +// protoc v3.21.6 // source: distributed.proto package distributed @@ -63,9 +63,8 @@ type RegisterResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - InstanceID uint32 `protobuf:"varint,1,opt,name=instanceID,proto3" json:"instanceID,omitempty"` - Archive []byte `protobuf:"bytes,2,opt,name=archive,proto3" json:"archive,omitempty"` // TODO: send this with a `stream` of bytes chunks - Options []byte `protobuf:"bytes,3,opt,name=options,proto3" json:"options,omitempty"` + InstanceID uint32 `protobuf:"varint,1,opt,name=instanceID,proto3" json:"instanceID,omitempty"` + Tests []*Test `protobuf:"bytes,2,rep,name=tests,proto3" json:"tests,omitempty"` } func (x *RegisterResponse) Reset() { @@ -107,14 +106,62 @@ func (x *RegisterResponse) GetInstanceID() uint32 { return 0 } -func (x *RegisterResponse) GetArchive() []byte { +func (x *RegisterResponse) GetTests() []*Test { + if x != nil { + return x.Tests + } + return nil +} + +type Test struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Archive []byte `protobuf:"bytes,1,opt,name=archive,proto3" json:"archive,omitempty"` // TODO: send this with a `stream` of bytes chunks + Options []byte `protobuf:"bytes,2,opt,name=options,proto3" json:"options,omitempty"` +} + +func (x *Test) Reset() { + *x = Test{} + if protoimpl.UnsafeEnabled { + mi := &file_distributed_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Test) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Test) ProtoMessage() {} + +func (x *Test) ProtoReflect() protoreflect.Message { + mi := &file_distributed_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Test.ProtoReflect.Descriptor instead. +func (*Test) Descriptor() ([]byte, []int) { + return file_distributed_proto_rawDescGZIP(), []int{2} +} + +func (x *Test) GetArchive() []byte { if x != nil { return x.Archive } return nil } -func (x *RegisterResponse) GetOptions() []byte { +func (x *Test) GetOptions() []byte { if x != nil { return x.Options } @@ -139,7 +186,7 @@ type AgentMessage struct { func (x *AgentMessage) Reset() { *x = AgentMessage{} if protoimpl.UnsafeEnabled { - mi := &file_distributed_proto_msgTypes[2] + mi := &file_distributed_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -152,7 +199,7 @@ func (x *AgentMessage) String() string { func (*AgentMessage) ProtoMessage() {} func (x *AgentMessage) ProtoReflect() protoreflect.Message { - mi := &file_distributed_proto_msgTypes[2] + mi := &file_distributed_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -165,7 +212,7 @@ func (x *AgentMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use AgentMessage.ProtoReflect.Descriptor instead. func (*AgentMessage) Descriptor() ([]byte, []int) { - return file_distributed_proto_rawDescGZIP(), []int{2} + return file_distributed_proto_rawDescGZIP(), []int{3} } func (m *AgentMessage) GetMessage() isAgentMessage_Message { @@ -247,7 +294,7 @@ type ControllerMessage struct { func (x *ControllerMessage) Reset() { *x = ControllerMessage{} if protoimpl.UnsafeEnabled { - mi := &file_distributed_proto_msgTypes[3] + mi := &file_distributed_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -260,7 +307,7 @@ func (x *ControllerMessage) String() string { func (*ControllerMessage) ProtoMessage() {} func (x *ControllerMessage) ProtoReflect() protoreflect.Message { - mi := &file_distributed_proto_msgTypes[3] + mi := &file_distributed_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -273,7 +320,7 @@ func (x *ControllerMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use ControllerMessage.ProtoReflect.Descriptor instead. func (*ControllerMessage) Descriptor() ([]byte, []int) { - return file_distributed_proto_rawDescGZIP(), []int{3} + return file_distributed_proto_rawDescGZIP(), []int{4} } func (x *ControllerMessage) GetInstanceID() uint32 { @@ -346,7 +393,7 @@ type DataPacket struct { func (x *DataPacket) Reset() { *x = DataPacket{} if protoimpl.UnsafeEnabled { - mi := &file_distributed_proto_msgTypes[4] + mi := &file_distributed_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -359,7 +406,7 @@ func (x *DataPacket) String() string { func (*DataPacket) ProtoMessage() {} func (x *DataPacket) ProtoReflect() protoreflect.Message { - mi := &file_distributed_proto_msgTypes[4] + mi := &file_distributed_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -372,7 +419,7 @@ func (x *DataPacket) ProtoReflect() protoreflect.Message { // Deprecated: Use DataPacket.ProtoReflect.Descriptor instead. func (*DataPacket) Descriptor() ([]byte, []int) { - return file_distributed_proto_rawDescGZIP(), []int{4} + return file_distributed_proto_rawDescGZIP(), []int{5} } func (x *DataPacket) GetId() string { @@ -408,7 +455,7 @@ type MetricsDump struct { func (x *MetricsDump) Reset() { *x = MetricsDump{} if protoimpl.UnsafeEnabled { - mi := &file_distributed_proto_msgTypes[5] + mi := &file_distributed_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -421,7 +468,7 @@ func (x *MetricsDump) String() string { func (*MetricsDump) ProtoMessage() {} func (x *MetricsDump) ProtoReflect() protoreflect.Message { - mi := &file_distributed_proto_msgTypes[5] + mi := &file_distributed_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -434,7 +481,7 @@ func (x *MetricsDump) ProtoReflect() protoreflect.Message { // Deprecated: Use MetricsDump.ProtoReflect.Descriptor instead. func (*MetricsDump) Descriptor() ([]byte, []int) { - return file_distributed_proto_rawDescGZIP(), []int{5} + return file_distributed_proto_rawDescGZIP(), []int{6} } func (x *MetricsDump) GetInstanceID() uint32 { @@ -463,7 +510,7 @@ type MetricDump struct { func (x *MetricDump) Reset() { *x = MetricDump{} if protoimpl.UnsafeEnabled { - mi := &file_distributed_proto_msgTypes[6] + mi := &file_distributed_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -476,7 +523,7 @@ func (x *MetricDump) String() string { func (*MetricDump) ProtoMessage() {} func (x *MetricDump) ProtoReflect() protoreflect.Message { - mi := &file_distributed_proto_msgTypes[6] + mi := &file_distributed_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -489,7 +536,7 @@ func (x *MetricDump) ProtoReflect() protoreflect.Message { // Deprecated: Use MetricDump.ProtoReflect.Descriptor instead. func (*MetricDump) Descriptor() ([]byte, []int) { - return file_distributed_proto_rawDescGZIP(), []int{6} + return file_distributed_proto_rawDescGZIP(), []int{7} } func (x *MetricDump) GetName() string { @@ -515,7 +562,7 @@ type MetricsDumpResponse struct { func (x *MetricsDumpResponse) Reset() { *x = MetricsDumpResponse{} if protoimpl.UnsafeEnabled { - mi := &file_distributed_proto_msgTypes[7] + mi := &file_distributed_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -528,7 +575,7 @@ func (x *MetricsDumpResponse) String() string { func (*MetricsDumpResponse) ProtoMessage() {} func (x *MetricsDumpResponse) ProtoReflect() protoreflect.Message { - mi := &file_distributed_proto_msgTypes[7] + mi := &file_distributed_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -541,7 +588,7 @@ func (x *MetricsDumpResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use MetricsDumpResponse.ProtoReflect.Descriptor instead. func (*MetricsDumpResponse) Descriptor() ([]byte, []int) { - return file_distributed_proto_rawDescGZIP(), []int{7} + return file_distributed_proto_rawDescGZIP(), []int{8} } var File_distributed_proto protoreflect.FileDescriptor @@ -550,75 +597,79 @@ var file_distributed_proto_rawDesc = []byte{ 0x0a, 0x11, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0b, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x22, 0x11, 0x0a, 0x0f, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x22, 0x66, 0x0a, 0x10, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, + 0x65, 0x73, 0x74, 0x22, 0x5b, 0x0a, 0x10, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x69, 0x6e, 0x73, - 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x72, 0x63, 0x68, 0x69, - 0x76, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x61, 0x72, 0x63, 0x68, 0x69, 0x76, - 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0xe8, 0x01, 0x0a, 0x0c, - 0x41, 0x67, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x28, 0x0a, 0x0e, - 0x69, 0x6e, 0x69, 0x74, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0d, 0x48, 0x00, 0x52, 0x0e, 0x69, 0x6e, 0x69, 0x74, 0x49, 0x6e, 0x73, 0x74, - 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, 0x12, 0x2e, 0x0a, 0x11, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, - 0x41, 0x6e, 0x64, 0x57, 0x61, 0x69, 0x74, 0x4f, 0x6e, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x48, 0x00, 0x52, 0x11, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x41, 0x6e, 0x64, 0x57, 0x61, - 0x69, 0x74, 0x4f, 0x6e, 0x49, 0x44, 0x12, 0x36, 0x0a, 0x15, 0x67, 0x65, 0x74, 0x4f, 0x72, 0x43, - 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x61, 0x74, 0x61, 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x15, 0x67, 0x65, 0x74, 0x4f, 0x72, 0x43, 0x72, - 0x65, 0x61, 0x74, 0x65, 0x44, 0x61, 0x74, 0x61, 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, 0x12, 0x3b, - 0x0a, 0x0b, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x44, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, - 0x64, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x48, 0x00, 0x52, 0x0b, - 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x44, 0x61, 0x74, 0x61, 0x42, 0x09, 0x0a, 0x07, 0x4d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xd1, 0x01, 0x0a, 0x11, 0x43, 0x6f, 0x6e, 0x74, 0x72, - 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1e, 0x0a, 0x0a, - 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, - 0x52, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, 0x12, 0x28, 0x0a, 0x0e, - 0x64, 0x6f, 0x6e, 0x65, 0x57, 0x61, 0x69, 0x74, 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x0e, 0x64, 0x6f, 0x6e, 0x65, 0x57, 0x61, 0x69, 0x74, - 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, 0x12, 0x2c, 0x0a, 0x10, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, - 0x44, 0x61, 0x74, 0x61, 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, - 0x48, 0x00, 0x52, 0x10, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x61, 0x74, 0x61, 0x57, 0x69, - 0x74, 0x68, 0x49, 0x44, 0x12, 0x39, 0x0a, 0x0a, 0x64, 0x61, 0x74, 0x61, 0x57, 0x69, 0x74, 0x68, - 0x49, 0x44, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, - 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, - 0x74, 0x48, 0x00, 0x52, 0x0a, 0x64, 0x61, 0x74, 0x61, 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, 0x42, - 0x09, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x46, 0x0a, 0x0a, 0x44, 0x61, - 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x14, 0x0a, 0x05, - 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, - 0x6f, 0x72, 0x22, 0x60, 0x0a, 0x0b, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x44, 0x75, 0x6d, - 0x70, 0x12, 0x1e, 0x0a, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, - 0x44, 0x12, 0x31, 0x0a, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, 0x03, - 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, - 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x44, 0x75, 0x6d, 0x70, 0x52, 0x07, 0x6d, 0x65, 0x74, - 0x72, 0x69, 0x63, 0x73, 0x22, 0x34, 0x0a, 0x0a, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x44, 0x75, - 0x6d, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x15, 0x0a, 0x13, 0x4d, 0x65, - 0x74, 0x72, 0x69, 0x63, 0x73, 0x44, 0x75, 0x6d, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x32, 0xff, 0x01, 0x0a, 0x0f, 0x44, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, - 0x64, 0x54, 0x65, 0x73, 0x74, 0x12, 0x49, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, - 0x72, 0x12, 0x1c, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, - 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x1d, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x52, 0x65, - 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, - 0x12, 0x54, 0x0a, 0x11, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x41, 0x6e, 0x64, 0x43, 0x6f, - 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x12, 0x19, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, - 0x74, 0x65, 0x64, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x1a, 0x1e, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x43, - 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x4b, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x4d, 0x65, - 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x18, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, - 0x74, 0x65, 0x64, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x44, 0x75, 0x6d, 0x70, 0x1a, - 0x20, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x4d, 0x65, - 0x74, 0x72, 0x69, 0x63, 0x73, 0x44, 0x75, 0x6d, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x42, 0x23, 0x5a, 0x21, 0x67, 0x6f, 0x2e, 0x6b, 0x36, 0x2e, 0x69, 0x6f, 0x2f, - 0x6b, 0x36, 0x2f, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x64, 0x69, 0x73, - 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, 0x12, 0x27, 0x0a, 0x05, 0x74, 0x65, 0x73, 0x74, 0x73, + 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, + 0x75, 0x74, 0x65, 0x64, 0x2e, 0x54, 0x65, 0x73, 0x74, 0x52, 0x05, 0x74, 0x65, 0x73, 0x74, 0x73, + 0x22, 0x3a, 0x0a, 0x04, 0x54, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x72, 0x63, 0x68, + 0x69, 0x76, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x61, 0x72, 0x63, 0x68, 0x69, + 0x76, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0xe8, 0x01, 0x0a, + 0x0c, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x28, 0x0a, + 0x0e, 0x69, 0x6e, 0x69, 0x74, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0d, 0x48, 0x00, 0x52, 0x0e, 0x69, 0x6e, 0x69, 0x74, 0x49, 0x6e, 0x73, + 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, 0x12, 0x2e, 0x0a, 0x11, 0x73, 0x69, 0x67, 0x6e, 0x61, + 0x6c, 0x41, 0x6e, 0x64, 0x57, 0x61, 0x69, 0x74, 0x4f, 0x6e, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x48, 0x00, 0x52, 0x11, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x41, 0x6e, 0x64, 0x57, + 0x61, 0x69, 0x74, 0x4f, 0x6e, 0x49, 0x44, 0x12, 0x36, 0x0a, 0x15, 0x67, 0x65, 0x74, 0x4f, 0x72, + 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x61, 0x74, 0x61, 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x15, 0x67, 0x65, 0x74, 0x4f, 0x72, 0x43, + 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x61, 0x74, 0x61, 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, 0x12, + 0x3b, 0x0a, 0x0b, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x44, 0x61, 0x74, 0x61, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, + 0x65, 0x64, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x48, 0x00, 0x52, + 0x0b, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x44, 0x61, 0x74, 0x61, 0x42, 0x09, 0x0a, 0x07, + 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xd1, 0x01, 0x0a, 0x11, 0x43, 0x6f, 0x6e, 0x74, + 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1e, 0x0a, + 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0d, 0x52, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, 0x12, 0x28, 0x0a, + 0x0e, 0x64, 0x6f, 0x6e, 0x65, 0x57, 0x61, 0x69, 0x74, 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x0e, 0x64, 0x6f, 0x6e, 0x65, 0x57, 0x61, 0x69, + 0x74, 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, 0x12, 0x2c, 0x0a, 0x10, 0x63, 0x72, 0x65, 0x61, 0x74, + 0x65, 0x44, 0x61, 0x74, 0x61, 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x09, 0x48, 0x00, 0x52, 0x10, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x61, 0x74, 0x61, 0x57, + 0x69, 0x74, 0x68, 0x49, 0x44, 0x12, 0x39, 0x0a, 0x0a, 0x64, 0x61, 0x74, 0x61, 0x57, 0x69, 0x74, + 0x68, 0x49, 0x44, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x64, 0x69, 0x73, 0x74, + 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, + 0x65, 0x74, 0x48, 0x00, 0x52, 0x0a, 0x64, 0x61, 0x74, 0x61, 0x57, 0x69, 0x74, 0x68, 0x49, 0x44, + 0x42, 0x09, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x46, 0x0a, 0x0a, 0x44, + 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, + 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x14, 0x0a, + 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, + 0x72, 0x6f, 0x72, 0x22, 0x60, 0x0a, 0x0b, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x44, 0x75, + 0x6d, 0x70, 0x12, 0x1e, 0x0a, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x49, 0x44, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, + 0x49, 0x44, 0x12, 0x31, 0x0a, 0x07, 0x6d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x18, 0x02, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, + 0x64, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x44, 0x75, 0x6d, 0x70, 0x52, 0x07, 0x6d, 0x65, + 0x74, 0x72, 0x69, 0x63, 0x73, 0x22, 0x34, 0x0a, 0x0a, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x44, + 0x75, 0x6d, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x15, 0x0a, 0x13, 0x4d, + 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x44, 0x75, 0x6d, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x32, 0xff, 0x01, 0x0a, 0x0f, 0x44, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, + 0x65, 0x64, 0x54, 0x65, 0x73, 0x74, 0x12, 0x49, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, + 0x65, 0x72, 0x12, 0x1c, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, + 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x1d, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x52, + 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x00, 0x12, 0x54, 0x0a, 0x11, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x41, 0x6e, 0x64, 0x43, + 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x12, 0x19, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, + 0x75, 0x74, 0x65, 0x64, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x1a, 0x1e, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, + 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x4b, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x4d, + 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x12, 0x18, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, + 0x75, 0x74, 0x65, 0x64, 0x2e, 0x4d, 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x44, 0x75, 0x6d, 0x70, + 0x1a, 0x20, 0x2e, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x2e, 0x4d, + 0x65, 0x74, 0x72, 0x69, 0x63, 0x73, 0x44, 0x75, 0x6d, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x42, 0x23, 0x5a, 0x21, 0x67, 0x6f, 0x2e, 0x6b, 0x36, 0x2e, 0x69, 0x6f, + 0x2f, 0x6b, 0x36, 0x2f, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x64, 0x69, + 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( @@ -633,32 +684,34 @@ func file_distributed_proto_rawDescGZIP() []byte { return file_distributed_proto_rawDescData } -var file_distributed_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_distributed_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_distributed_proto_goTypes = []interface{}{ (*RegisterRequest)(nil), // 0: distributed.RegisterRequest (*RegisterResponse)(nil), // 1: distributed.RegisterResponse - (*AgentMessage)(nil), // 2: distributed.AgentMessage - (*ControllerMessage)(nil), // 3: distributed.ControllerMessage - (*DataPacket)(nil), // 4: distributed.DataPacket - (*MetricsDump)(nil), // 5: distributed.MetricsDump - (*MetricDump)(nil), // 6: distributed.MetricDump - (*MetricsDumpResponse)(nil), // 7: distributed.MetricsDumpResponse + (*Test)(nil), // 2: distributed.Test + (*AgentMessage)(nil), // 3: distributed.AgentMessage + (*ControllerMessage)(nil), // 4: distributed.ControllerMessage + (*DataPacket)(nil), // 5: distributed.DataPacket + (*MetricsDump)(nil), // 6: distributed.MetricsDump + (*MetricDump)(nil), // 7: distributed.MetricDump + (*MetricsDumpResponse)(nil), // 8: distributed.MetricsDumpResponse } var file_distributed_proto_depIdxs = []int32{ - 4, // 0: distributed.AgentMessage.createdData:type_name -> distributed.DataPacket - 4, // 1: distributed.ControllerMessage.dataWithID:type_name -> distributed.DataPacket - 6, // 2: distributed.MetricsDump.metrics:type_name -> distributed.MetricDump - 0, // 3: distributed.DistributedTest.Register:input_type -> distributed.RegisterRequest - 2, // 4: distributed.DistributedTest.CommandAndControl:input_type -> distributed.AgentMessage - 5, // 5: distributed.DistributedTest.SendMetrics:input_type -> distributed.MetricsDump - 1, // 6: distributed.DistributedTest.Register:output_type -> distributed.RegisterResponse - 3, // 7: distributed.DistributedTest.CommandAndControl:output_type -> distributed.ControllerMessage - 7, // 8: distributed.DistributedTest.SendMetrics:output_type -> distributed.MetricsDumpResponse - 6, // [6:9] is the sub-list for method output_type - 3, // [3:6] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 2, // 0: distributed.RegisterResponse.tests:type_name -> distributed.Test + 5, // 1: distributed.AgentMessage.createdData:type_name -> distributed.DataPacket + 5, // 2: distributed.ControllerMessage.dataWithID:type_name -> distributed.DataPacket + 7, // 3: distributed.MetricsDump.metrics:type_name -> distributed.MetricDump + 0, // 4: distributed.DistributedTest.Register:input_type -> distributed.RegisterRequest + 3, // 5: distributed.DistributedTest.CommandAndControl:input_type -> distributed.AgentMessage + 6, // 6: distributed.DistributedTest.SendMetrics:input_type -> distributed.MetricsDump + 1, // 7: distributed.DistributedTest.Register:output_type -> distributed.RegisterResponse + 4, // 8: distributed.DistributedTest.CommandAndControl:output_type -> distributed.ControllerMessage + 8, // 9: distributed.DistributedTest.SendMetrics:output_type -> distributed.MetricsDumpResponse + 7, // [7:10] is the sub-list for method output_type + 4, // [4:7] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_distributed_proto_init() } @@ -692,7 +745,7 @@ func file_distributed_proto_init() { } } file_distributed_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*AgentMessage); i { + switch v := v.(*Test); i { case 0: return &v.state case 1: @@ -704,7 +757,7 @@ func file_distributed_proto_init() { } } file_distributed_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ControllerMessage); i { + switch v := v.(*AgentMessage); i { case 0: return &v.state case 1: @@ -716,7 +769,7 @@ func file_distributed_proto_init() { } } file_distributed_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*DataPacket); i { + switch v := v.(*ControllerMessage); i { case 0: return &v.state case 1: @@ -728,7 +781,7 @@ func file_distributed_proto_init() { } } file_distributed_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*MetricsDump); i { + switch v := v.(*DataPacket); i { case 0: return &v.state case 1: @@ -740,7 +793,7 @@ func file_distributed_proto_init() { } } file_distributed_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*MetricDump); i { + switch v := v.(*MetricsDump); i { case 0: return &v.state case 1: @@ -752,6 +805,18 @@ func file_distributed_proto_init() { } } file_distributed_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MetricDump); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_distributed_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*MetricsDumpResponse); i { case 0: return &v.state @@ -764,13 +829,13 @@ func file_distributed_proto_init() { } } } - file_distributed_proto_msgTypes[2].OneofWrappers = []interface{}{ + file_distributed_proto_msgTypes[3].OneofWrappers = []interface{}{ (*AgentMessage_InitInstanceID)(nil), (*AgentMessage_SignalAndWaitOnID)(nil), (*AgentMessage_GetOrCreateDataWithID)(nil), (*AgentMessage_CreatedData)(nil), } - file_distributed_proto_msgTypes[3].OneofWrappers = []interface{}{ + file_distributed_proto_msgTypes[4].OneofWrappers = []interface{}{ (*ControllerMessage_DoneWaitWithID)(nil), (*ControllerMessage_CreateDataWithID)(nil), (*ControllerMessage_DataWithID)(nil), @@ -781,7 +846,7 @@ func file_distributed_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_distributed_proto_rawDesc, NumEnums: 0, - NumMessages: 8, + NumMessages: 9, NumExtensions: 0, NumServices: 1, }, diff --git a/execution/distributed/distributed.proto b/execution/distributed/distributed.proto index d0193fdbed5..01748c9028e 100644 --- a/execution/distributed/distributed.proto +++ b/execution/distributed/distributed.proto @@ -16,8 +16,12 @@ service DistributedTest { message RegisterRequest {} message RegisterResponse { uint32 instanceID = 1; - bytes archive = 2; // TODO: send this with a `stream` of bytes chunks - bytes options = 3; + repeated Test tests = 2; +} + +message Test { + bytes archive = 1; // TODO: send this with a `stream` of bytes chunks + bytes options = 2; } message AgentMessage { diff --git a/execution/distributed/distributed_grpc.pb.go b/execution/distributed/distributed_grpc.pb.go index 8e0694f49cb..f9766229824 100644 --- a/execution/distributed/distributed_grpc.pb.go +++ b/execution/distributed/distributed_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v3.19.4 +// - protoc v3.21.6 // source: distributed.proto package distributed diff --git a/execution/local/controller.go b/execution/local/controller.go index 6189bfeeee6..6e3116bd8af 100644 --- a/execution/local/controller.go +++ b/execution/local/controller.go @@ -17,6 +17,7 @@ func (c *Controller) Wait(eventId string) func() error { return nil } } + func (c *Controller) Signal(eventId string) error { return nil } diff --git a/execution/scheduler.go b/execution/scheduler.go index 2fe75b02ea5..775be0a3b66 100644 --- a/execution/scheduler.go +++ b/execution/scheduler.go @@ -375,15 +375,6 @@ func (e *Scheduler) runExecutor( runResults <- err } -func (e *Scheduler) signalAndWait(eventID string) error { - wait := e.controller.Wait(eventID) - err := e.controller.Signal(eventID) - if err != nil { - return err - } - return wait() -} - // Run the Scheduler, funneling all generated metric samples through the supplied // out channel. // @@ -391,8 +382,8 @@ func (e *Scheduler) signalAndWait(eventID string) error { func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- metrics.SampleContainer) error { // TODO: use constants and namespaces for these e.initProgress.Modify(pb.WithConstProgress(0, "Waiting to start...")) - e.signalAndWait("test-start") - defer e.signalAndWait("test-done") + SignalAndWait(e.controller, "scheduler-start") + defer SignalAndWait(e.controller, "scheduler-done") execSchedRunCtx, execSchedRunCancel := context.WithCancel(runCtx) waitForVUsMetricPush := e.emitVUsAndVUsMax(execSchedRunCtx, samplesOut) @@ -403,7 +394,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met return err } - e.signalAndWait("vus-initialized") + SignalAndWait(e.controller, "vus-initialized") executorsCount := len(e.executors) logger := e.state.Test.Logger.WithField("phase", "execution-scheduler-run") @@ -428,7 +419,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met } } - e.signalAndWait("test-ready-to-run-setup") + SignalAndWait(e.controller, "test-ready-to-run-setup") e.state.MarkStarted() e.initProgress.Modify(pb.WithConstProgress(1, "running")) @@ -463,7 +454,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met } } - e.signalAndWait("setup-done") + SignalAndWait(e.controller, "setup-done") e.initProgress.Modify(pb.WithHijack(e.getRunStats)) @@ -488,7 +479,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met } } - e.signalAndWait("execution-done") + SignalAndWait(e.controller, "execution-done") // Run teardown() after all executors are done, if it's not disabled if !e.state.Test.Options.NoTeardown.Bool { @@ -510,7 +501,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, samplesOut chan<- met } } - e.signalAndWait("teardown-done") + SignalAndWait(e.controller, "teardown-done") if err := GetCancelReasonIfTestAborted(executorsRunCtx); err != nil { interrupted = true diff --git a/js/runner_test.go b/js/runner_test.go index 10bb7111c0b..b9295db3746 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -48,7 +48,6 @@ import ( "go.k6.io/k6/lib/testutils/mockoutput" "go.k6.io/k6/lib/types" "go.k6.io/k6/metrics" - "go.k6.io/k6/metrics/engine" "go.k6.io/k6/output" ) @@ -388,22 +387,17 @@ func TestDataIsolation(t *testing.T) { execScheduler, err := execution.NewScheduler(testRunState, local.NewController()) require.NoError(t, err) - metricsEngine, err := engine.NewMetricsEngine(testRunState, true) - require.NoError(t, err) - globalCtx, globalCancel := context.WithCancel(context.Background()) defer globalCancel() runCtx, runAbort := execution.NewTestRunContext(globalCtx, testRunState.Logger) mockOutput := mockoutput.New() - outputManager := output.NewManager([]output.Output{mockOutput, metricsEngine.CreateIngester()}, testRunState.Logger, runAbort) + outputManager := output.NewManager([]output.Output{mockOutput}, testRunState.Logger, runAbort) samples := make(chan metrics.SampleContainer, 1000) waitForMetricsFlushed, stopOutputs, err := outputManager.Start(samples) require.NoError(t, err) defer stopOutputs(nil) - finalizeThresholds := metricsEngine.StartThresholdCalculations(execScheduler.GetState().GetCurrentTestRunDuration, runAbort) - require.Empty(t, runner.defaultGroup.Groups) errC := make(chan error) @@ -417,8 +411,6 @@ func TestDataIsolation(t *testing.T) { close(samples) require.NoError(t, err) waitForMetricsFlushed() - breached := finalizeThresholds() - require.Empty(t, breached) } require.Contains(t, runner.defaultGroup.Groups, "setup") require.Contains(t, runner.defaultGroup.Groups, "teardown") diff --git a/metrics/engine/engine.go b/metrics/engine/engine.go index 25b3a103626..eb2b4dc6943 100644 --- a/metrics/engine/engine.go +++ b/metrics/engine/engine.go @@ -15,7 +15,6 @@ import ( "go.k6.io/k6/errext/exitcodes" "go.k6.io/k6/lib" "go.k6.io/k6/metrics" - "go.k6.io/k6/output" "gopkg.in/guregu/null.v3" ) @@ -25,14 +24,11 @@ const thresholdsRate = 2 * time.Second // aggregated metric sample values. They are used to generate the end-of-test // summary and to evaluate the test thresholds. type MetricsEngine struct { - runState *lib.TestRunState + registry *metrics.Registry logger logrus.FieldLogger - outputIngester *outputIngester - // These can be both top-level metrics or sub-metrics - metricsWithThresholds []*metrics.Metric - + metricsWithThresholds []*metrics.Metric breachedThresholdsCount uint32 // TODO: completely refactor: @@ -45,32 +41,23 @@ type MetricsEngine struct { } // NewMetricsEngine creates a new metrics Engine with the given parameters. -func NewMetricsEngine(runState *lib.TestRunState, shouldProcessMetrics bool) (*MetricsEngine, error) { +func NewMetricsEngine(registry *metrics.Registry, logger logrus.FieldLogger) (*MetricsEngine, error) { me := &MetricsEngine{ - runState: runState, - logger: runState.Logger.WithField("component", "metrics-engine"), - + registry: registry, + logger: logger.WithField("component", "metrics-engine"), ObservedMetrics: make(map[string]*metrics.Metric), } - if shouldProcessMetrics { - err := me.initSubMetricsAndThresholds() - if err != nil { - return nil, err - } - } - return me, nil } // CreateIngester returns a pseudo-Output that uses the given metric samples to // update the engine's inner state. -func (me *MetricsEngine) CreateIngester() output.Output { - me.outputIngester = &outputIngester{ +func (me *MetricsEngine) CreateIngester() *OutputIngester { + return &OutputIngester{ logger: me.logger.WithField("component", "metrics-engine-ingester"), metricsEngine: me, } - return me.outputIngester } // TODO: something better? deduplicate code with getThresholdMetricOrSubmetric @@ -81,7 +68,7 @@ func (me *MetricsEngine) ImportMetric(name string, data []byte) error { // TODO: replace with strings.Cut after Go 1.18 nameParts := strings.SplitN(name, "{", 2) - metric := me.runState.Registry.Get(nameParts[0]) + metric := me.registry.Get(nameParts[0]) if metric == nil { return fmt.Errorf("metric '%s' does not exist in the script", nameParts[0]) } @@ -107,7 +94,7 @@ func (me *MetricsEngine) getThresholdMetricOrSubmetric(name string) (*metrics.Me // TODO: replace with strings.Cut after Go 1.18 nameParts := strings.SplitN(name, "{", 2) - metric := me.runState.Registry.Get(nameParts[0]) + metric := me.registry.Get(nameParts[0]) if metric == nil { return nil, fmt.Errorf("metric '%s' does not exist in the script", nameParts[0]) } @@ -152,14 +139,25 @@ func (me *MetricsEngine) markObserved(metric *metrics.Metric) { if !metric.Observed { metric.Observed = true me.ObservedMetrics[metric.Name] = metric + } else { + // TODO: remove + // + // This is huge HACK to clean up the metrics from a previous test run, + // it will no longer be needed once this issue is resolved: + // https://github.com/grafana/k6/issues/572 + if _, ok := me.ObservedMetrics[metric.Name]; ok { + return // from this run, not a problem + } + metric.Sink.Drain() + me.ObservedMetrics[metric.Name] = metric } } -func (me *MetricsEngine) initSubMetricsAndThresholds() error { - for metricName, thresholds := range me.runState.Options.Thresholds { +func (me *MetricsEngine) InitSubMetricsAndThresholds(options lib.Options, onlyLogErrors bool) error { + for metricName, thresholds := range options.Thresholds { metric, err := me.getThresholdMetricOrSubmetric(metricName) - if me.runState.RuntimeOptions.NoThresholds.Bool { + if onlyLogErrors { if err != nil { me.logger.WithError(err).Warnf("Invalid metric '%s' in threshold definitions", metricName) } @@ -184,7 +182,7 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error { // TODO: refactor out of here when https://github.com/grafana/k6/issues/1321 // lands and there is a better way to enable a metric with tag - if me.runState.Options.SystemTags.Has(metrics.TagExpectedResponse) { + if options.SystemTags.Has(metrics.TagExpectedResponse) { _, err := me.getThresholdMetricOrSubmetric("http_req_duration{expected_response:true}") if err != nil { return err // shouldn't happen, but ¯\_(ツ)_/¯ @@ -197,7 +195,7 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error { // StartThresholdCalculations spins up a new goroutine to crunch thresholds and // returns a callback that will stop the goroutine and finalizes calculations. func (me *MetricsEngine) StartThresholdCalculations( - getCurrentTestRunDuration func() time.Duration, abortRun func(error), + ingester *OutputIngester, getCurrentTestRunDuration func() time.Duration, abortRun func(error), ) (finalize func() (breached []string)) { stop := make(chan struct{}) done := make(chan struct{}) @@ -229,9 +227,9 @@ func (me *MetricsEngine) StartThresholdCalculations( }() return func() []string { - if me.outputIngester != nil { + if ingester != nil { // Stop the ingester so we don't get any more metrics - err := me.outputIngester.Stop() + err := ingester.Stop() if err != nil { me.logger.WithError(err).Warnf("There was a problem stopping the output ingester.") } diff --git a/metrics/engine/ingester.go b/metrics/engine/ingester.go index 83a514552a0..133c8810ad9 100644 --- a/metrics/engine/ingester.go +++ b/metrics/engine/ingester.go @@ -9,11 +9,11 @@ import ( const collectRate = 50 * time.Millisecond -var _ output.Output = &outputIngester{} +var _ output.Output = &OutputIngester{} -// outputIngester implements the output.Output interface and can be used to +// OutputIngester implements the output.Output interface and can be used to // "feed" the MetricsEngine data from a `k6 run` test run. -type outputIngester struct { +type OutputIngester struct { output.SampleBuffer logger logrus.FieldLogger @@ -22,12 +22,12 @@ type outputIngester struct { } // Description returns a human-readable description of the output. -func (oi *outputIngester) Description() string { +func (oi *OutputIngester) Description() string { return "engine" } // Start the engine by initializing a new output.PeriodicFlusher -func (oi *outputIngester) Start() error { +func (oi *OutputIngester) Start() error { oi.logger.Debug("Starting...") pf, err := output.NewPeriodicFlusher(collectRate, oi.flushMetrics) @@ -41,7 +41,7 @@ func (oi *outputIngester) Start() error { } // Stop flushes any remaining metrics and stops the goroutine. -func (oi *outputIngester) Stop() error { +func (oi *OutputIngester) Stop() error { oi.logger.Debug("Stopping...") defer oi.logger.Debug("Stopped!") oi.periodicFlusher.Stop() @@ -49,7 +49,7 @@ func (oi *outputIngester) Stop() error { } // flushMetrics Writes samples to the MetricsEngine -func (oi *outputIngester) flushMetrics() { +func (oi *OutputIngester) flushMetrics() { sampleContainers := oi.GetBufferedSamples() if len(sampleContainers) == 0 { return