diff --git a/cloudapi/config.go b/cloudapi/config.go index 54959d93eb4..ebe07d5a871 100644 --- a/cloudapi/config.go +++ b/cloudapi/config.go @@ -32,12 +32,17 @@ type Config struct { StopOnError null.Bool `json:"stopOnError" envconfig:"K6_CLOUD_STOP_ON_ERROR"` APIVersion null.Int `json:"apiVersion" envconfig:"K6_CLOUD_API_VERSION"` - // PushRefID represents the test run id. - // Note: It is a legacy name used by the backend, the code in k6 open-source - // references it as test run id. - // Currently, a renaming is not planned. + // PushRefID is the identifier used by Cloud systems to correlate all the things that + // belong to the same test run/execution. Currently, it is equivalent to the TestRunID. + // But, in the future, or in future solutions (e.g. Synthetic Monitoring), there might be + // no TestRunID and we may still need an identifier to correlate all the things. PushRefID null.String `json:"pushRefID" envconfig:"K6_CLOUD_PUSH_REF_ID"` + // TestRunID is the test run id, a unique identifier across all the test runs. + // It might be used to correlate all the things that belong to the same test run/execution, + // see PushRefID for more details. + TestRunID null.String `json:"testRunID" envconfig:"K6_CLOUD_TEST_RUN_ID"` + // Defines the max allowed number of time series in a single batch. MaxTimeSeriesInBatch null.Int `json:"maxTimeSeriesInBatch" envconfig:"K6_CLOUD_MAX_TIME_SERIES_IN_BATCH"` @@ -121,6 +126,9 @@ func (c Config) Apply(cfg Config) Config { if cfg.PushRefID.Valid { c.PushRefID = cfg.PushRefID } + if cfg.TestRunID.Valid { + c.TestRunID = cfg.TestRunID + } if cfg.WebAppURL.Valid { c.WebAppURL = cfg.WebAppURL } diff --git a/cmd/cloud_run.go b/cmd/cloud_run.go index 54b13f5c34a..866a5b94af8 100644 --- a/cmd/cloud_run.go +++ b/cmd/cloud_run.go @@ -131,6 +131,25 @@ func (c *cmdCloudRun) preRun(cmd *cobra.Command, args []string) error { func (c *cmdCloudRun) run(cmd *cobra.Command, args []string) error { if c.localExecution { + // We know this execution requires a test run to be created in the Cloud. + // So, we create it before delegating the actual execution to the run command. + // To do that, we need to load the test and configure it. + test, err := loadAndConfigureLocalTest(c.runCmd.gs, cmd, args, getCloudRunLocalExecutionConfig) + if err != nil { + return fmt.Errorf("could not load and configure the test: %w", err) + } + + // As we've already loaded the test, we can modify the init function to + // reuse the initialized one. + c.runCmd.loadConfiguredTest = func(*cobra.Command, []string) (*loadedAndConfiguredTest, execution.Controller, error) { + return test, local.NewController(), nil + } + + // After that, we can create the remote test run. + if err := createCloudTest(c.runCmd.gs, test); err != nil { + return fmt.Errorf("could not create the cloud test run: %w", err) + } + return c.runCmd.run(cmd, args) } diff --git a/cmd/outputs_cloud.go b/cmd/outputs_cloud.go new file mode 100644 index 00000000000..ef2f64daffa --- /dev/null +++ b/cmd/outputs_cloud.go @@ -0,0 +1,174 @@ +package cmd + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "path/filepath" + "strings" + "time" + + "github.com/sirupsen/logrus" + "gopkg.in/guregu/null.v3" + + "go.k6.io/k6/cloudapi" + "go.k6.io/k6/cmd/state" + "go.k6.io/k6/lib" + "go.k6.io/k6/lib/consts" + "go.k6.io/k6/metrics" +) + +const defaultTestName = "k6 test" + +// createCloudTest performs some test and Cloud configuration validations and if everything +// looks good, then it creates a test run in the k6 Cloud, unless k6 is already running in the Cloud. +// It is also responsible for filling the test run id on the test options, so it can be used later. +// It returns the resulting Cloud configuration as a json.RawMessage, as expected by the Cloud output, +// or an error if something goes wrong. +func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest) error { + conf, warn, err := cloudapi.GetConsolidatedConfig( + test.derivedConfig.Collectors[builtinOutputCloud.String()], + gs.Env, + "", // Historically used for -o cloud=..., no longer used (deprecated). + test.derivedConfig.Options.Cloud, + test.derivedConfig.Options.External, + ) + if err != nil { + return err + } + + if warn != "" { + gs.Logger.Warn(warn) + } + + // If this is true, then it means that this code is being executed in the k6 Cloud. + // Therefore, we don't need to continue with the test run creation, + // as we don't need to create any test run. + // + // Precisely, the identifier of the test run is conf.TestRunID. + if conf.TestRunID.Valid { + return nil + } + + // If not, we continue with some validations and the creation of the test run. + if err := validateRequiredSystemTags(test.derivedConfig.Options.SystemTags); err != nil { + return err + } + + if !conf.Name.Valid || conf.Name.String == "" { + scriptPath := test.source.URL.String() + if scriptPath == "" { + // Script from stdin without a name, likely from stdin + return errors.New("script name not set, please specify K6_CLOUD_NAME or options.cloud.name") + } + + conf.Name = null.StringFrom(filepath.Base(scriptPath)) + } + if conf.Name.String == "-" { + conf.Name = null.StringFrom(defaultTestName) + } + + thresholds := make(map[string][]string) + for name, t := range test.derivedConfig.Thresholds { + for _, threshold := range t.Thresholds { + thresholds[name] = append(thresholds[name], threshold.Source) + } + } + + et, err := lib.NewExecutionTuple( + test.derivedConfig.Options.ExecutionSegment, + test.derivedConfig.Options.ExecutionSegmentSequence, + ) + if err != nil { + return err + } + executionPlan := test.derivedConfig.Options.Scenarios.GetFullExecutionRequirements(et) + + duration, testEnds := lib.GetEndOffset(executionPlan) + if !testEnds { + return errors.New("tests with unspecified duration are not allowed when outputting data to k6 cloud") + } + + if conf.MetricPushConcurrency.Int64 < 1 { + return fmt.Errorf("metrics push concurrency must be a positive number but is %d", + conf.MetricPushConcurrency.Int64) + } + + if conf.MaxTimeSeriesInBatch.Int64 < 1 { + return fmt.Errorf("max allowed number of time series in a single batch must be a positive number but is %d", + conf.MaxTimeSeriesInBatch.Int64) + } + + var testArchive *lib.Archive + if !test.derivedConfig.NoArchiveUpload.Bool { + testArchive = test.initRunner.MakeArchive() + } + + testRun := &cloudapi.TestRun{ + Name: conf.Name.String, + ProjectID: conf.ProjectID.Int64, + VUsMax: int64(lib.GetMaxPossibleVUs(executionPlan)), + Thresholds: thresholds, + Duration: int64(duration / time.Second), + Archive: testArchive, + } + + logger := gs.Logger.WithFields(logrus.Fields{"output": builtinOutputCloud.String()}) + + apiClient := cloudapi.NewClient( + logger, conf.Token.String, conf.Host.String, consts.Version, conf.Timeout.TimeDuration()) + + response, err := apiClient.CreateTestRun(testRun) + if err != nil { + return err + } + + if response.ConfigOverride != nil { + logger.WithFields(logrus.Fields{"override": response.ConfigOverride}).Debug("overriding config options") + conf = conf.Apply(*response.ConfigOverride) + } + + conf.TestRunID = null.NewString(response.ReferenceID, true) + + raw, err := cloudConfToRawMessage(conf) + if err != nil { + return fmt.Errorf("could not serialize cloud configuration: %w", err) + } + + test.derivedConfig.Collectors[builtinOutputCloud.String()] = raw + + return nil +} + +// validateRequiredSystemTags checks if all required tags are present. +func validateRequiredSystemTags(scriptTags *metrics.SystemTagSet) error { + var missingRequiredTags []string + requiredTags := metrics.SystemTagSet(metrics.TagName | + metrics.TagMethod | + metrics.TagStatus | + metrics.TagError | + metrics.TagCheck | + metrics.TagGroup) + for _, tag := range metrics.SystemTagValues() { + if requiredTags.Has(tag) && !scriptTags.Has(tag) { + missingRequiredTags = append(missingRequiredTags, tag.String()) + } + } + if len(missingRequiredTags) > 0 { + return fmt.Errorf( + "the cloud output needs the following system tags enabled: %s", + strings.Join(missingRequiredTags, ", "), + ) + } + return nil +} + +func cloudConfToRawMessage(conf cloudapi.Config) (json.RawMessage, error) { + var buff bytes.Buffer + enc := json.NewEncoder(&buff) + if err := enc.Encode(conf); err != nil { + return nil, err + } + return buff.Bytes(), nil +} diff --git a/cmd/test_load.go b/cmd/test_load.go index f022e415576..1caeedebfc5 100644 --- a/cmd/test_load.go +++ b/cmd/test_load.go @@ -265,7 +265,7 @@ func loadSystemCertPool(logger logrus.FieldLogger) { func (lct *loadedAndConfiguredTest) buildTestRunState( configToReinject lib.Options, ) (*lib.TestRunState, error) { - // This might be the full derived or just the consodlidated options + // This might be the full derived or just the consolidated options if err := lct.initRunner.SetOptions(configToReinject); err != nil { return nil, err } diff --git a/js/jsmodules.go b/js/jsmodules.go index e6f96eb2882..61a48e6b011 100644 --- a/js/jsmodules.go +++ b/js/jsmodules.go @@ -8,6 +8,7 @@ import ( "go.k6.io/k6/js/common" "go.k6.io/k6/js/modules" "go.k6.io/k6/js/modules/k6" + "go.k6.io/k6/js/modules/k6/cloud" "go.k6.io/k6/js/modules/k6/crypto" "go.k6.io/k6/js/modules/k6/crypto/x509" "go.k6.io/k6/js/modules/k6/data" @@ -32,6 +33,7 @@ import ( func getInternalJSModules() map[string]interface{} { return map[string]interface{}{ "k6": k6.New(), + "k6/cloud": cloud.New(), "k6/crypto": crypto.New(), "k6/crypto/x509": x509.New(), "k6/data": data.New(), diff --git a/js/modules/k6/cloud/cloud.go b/js/modules/k6/cloud/cloud.go new file mode 100644 index 00000000000..ddc7a2da6f9 --- /dev/null +++ b/js/modules/k6/cloud/cloud.go @@ -0,0 +1,110 @@ +// Package cloud implements k6/cloud which lets script find out more about the Cloud execution. +package cloud + +import ( + "sync" + + "github.com/grafana/sobek" + "github.com/mstoykov/envconfig" + + "go.k6.io/k6/cloudapi" + "go.k6.io/k6/js/common" + "go.k6.io/k6/js/modules" +) + +type ( + // RootModule is the global module instance that will create module + // instances for each VU. + RootModule struct{} + + // ModuleInstance represents an instance of the execution module. + ModuleInstance struct { + vu modules.VU + obj *sobek.Object + + once sync.Once + testRunID sobek.Value + } +) + +var ( + _ modules.Module = &RootModule{} + _ modules.Instance = &ModuleInstance{} +) + +// New returns a pointer to a new RootModule instance. +func New() *RootModule { + return &RootModule{} +} + +// NewModuleInstance implements the modules.Module interface to return +// a new instance for each VU. +func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance { + mi := &ModuleInstance{vu: vu} + + rt := vu.Runtime() + + mi.obj = rt.NewObject() + defProp := func(name string, getter func() (sobek.Value, error)) { + err := mi.obj.DefineAccessorProperty(name, rt.ToValue(func() sobek.Value { + obj, err := getter() + if err != nil { + common.Throw(rt, err) + } + return obj + }), nil, sobek.FLAG_FALSE, sobek.FLAG_TRUE) + if err != nil { + common.Throw(rt, err) + } + } + defProp("testRunId", mi.testRunId) + + // By default, we try to load the test run id from the environment variables, + // which corresponds to those scenarios where the k6 binary is running in the Cloud. + var envConf cloudapi.Config + if err := envconfig.Process("", &envConf, vu.InitEnv().LookupEnv); err != nil { + common.Throw(vu.Runtime(), err) + } + if envConf.TestRunID.Valid { + mi.testRunID = mi.vu.Runtime().ToValue(envConf.TestRunID.String) + } else { + mi.testRunID = sobek.Undefined() // Default value. + } + + return mi +} + +// Exports returns the exports of the execution module. +func (mi *ModuleInstance) Exports() modules.Exports { + return modules.Exports{Default: mi.obj} +} + +// testRunId returns a sobek.Value(string) with the Cloud test run id. +// +// This code can be executed in two situations, either when the k6 binary is running in the Cloud, in which case +// the value of the test run id would be available in the environment, and we would have loaded at module initialization +// time; or when the k6 binary is running locally and test run id is present in the options, which we try to read at +// time of running this method, but only once for the whole execution as options won't change anymore. +func (mi *ModuleInstance) testRunId() (sobek.Value, error) { + // In case we have a value (e.g. loaded from env), we return it. + // If we're in the init context (where we cannot read the options), we return undefined (the default value). + if !sobek.IsUndefined(mi.testRunID) || mi.vu.State() == nil { + return mi.testRunID, nil + } + + // Otherwise, we try to read the test run id from options. + // We only try it once for the whole execution, as options won't change. + vuState := mi.vu.State() + var err error + mi.once.Do(func() { + // We pass almost all values to zero/nil because here we only care about the Cloud configuration present in options. + var optsConf cloudapi.Config + optsConf, _, err = cloudapi.GetConsolidatedConfig(vuState.Options.Cloud, nil, "", nil, nil) + + if optsConf.TestRunID.Valid { + mi.testRunID = mi.vu.Runtime().ToValue(optsConf.TestRunID.String) + } + }) + + return mi.testRunID, err +} diff --git a/js/modules/k6/cloud/cloud_test.go b/js/modules/k6/cloud/cloud_test.go new file mode 100644 index 00000000000..4adcfd18bf0 --- /dev/null +++ b/js/modules/k6/cloud/cloud_test.go @@ -0,0 +1,84 @@ +package cloud + +import ( + "testing" + + "github.com/grafana/sobek" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.k6.io/k6/js/modulestest" + "go.k6.io/k6/lib" +) + +func setupCloudTestEnv(t *testing.T, env map[string]string) *modulestest.Runtime { + tRt := modulestest.NewRuntime(t) + tRt.VU.InitEnv().LookupEnv = func(key string) (string, bool) { + v, ok := env[key] + return v, ok + } + m, ok := New().NewModuleInstance(tRt.VU).(*ModuleInstance) + require.True(t, ok) + require.NoError(t, tRt.VU.Runtime().Set("cloud", m.Exports().Default)) + return tRt +} + +func TestGetTestRunId(t *testing.T) { + t.Parallel() + + t.Run("Cloud execution", func(t *testing.T) { + t.Parallel() + + t.Run("Not defined", func(t *testing.T) { + t.Parallel() + tRt := setupCloudTestEnv(t, nil) + testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) + require.NoError(t, err) + assert.Equal(t, sobek.Undefined(), testRunId) + }) + + t.Run("Defined", func(t *testing.T) { + t.Parallel() + tRt := setupCloudTestEnv(t, map[string]string{"K6_CLOUD_TEST_RUN_ID": "123"}) + testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) + require.NoError(t, err) + assert.Equal(t, "123", testRunId.String()) + }) + }) + + t.Run("Local execution", func(t *testing.T) { + t.Parallel() + + t.Run("Init context", func(t *testing.T) { + t.Parallel() + tRt := setupCloudTestEnv(t, nil) + testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) + require.NoError(t, err) + assert.Equal(t, sobek.Undefined(), testRunId) + }) + + t.Run("Not defined", func(t *testing.T) { + t.Parallel() + tRt := setupCloudTestEnv(t, nil) + tRt.MoveToVUContext(&lib.State{ + Options: lib.Options{}, + }) + testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) + require.NoError(t, err) + assert.Equal(t, sobek.Undefined(), testRunId) + }) + + t.Run("Defined", func(t *testing.T) { + t.Parallel() + tRt := setupCloudTestEnv(t, nil) + tRt.MoveToVUContext(&lib.State{ + Options: lib.Options{ + Cloud: []byte(`{"testRunID": "123"}`), + }, + }) + testRunId, err := tRt.VU.Runtime().RunString(`cloud.testRunId`) + require.NoError(t, err) + assert.Equal(t, "123", testRunId.String()) + }) + }) +} diff --git a/lib/options.go b/lib/options.go index e4f3a2d6929..55c0dcc84c8 100644 --- a/lib/options.go +++ b/lib/options.go @@ -308,8 +308,7 @@ type Options struct { // iteration is shorter than the specified value. MinIterationDuration types.NullDuration `json:"minIterationDuration" envconfig:"K6_MIN_ITERATION_DURATION"` - // Cloud is the config for the cloud - // formally known as ext.loadimpact + // Cloud is the configuration for the k6 Cloud, formerly known as ext.loadimpact. Cloud json.RawMessage `json:"cloud,omitempty"` // These values are for third party collectors' benefit. diff --git a/output/cloud/output.go b/output/cloud/output.go index c8532a65019..5054fcfd65e 100644 --- a/output/cloud/output.go +++ b/output/cloud/output.go @@ -9,6 +9,8 @@ import ( "time" "github.com/sirupsen/logrus" + "gopkg.in/guregu/null.v3" + "go.k6.io/k6/cloudapi" "go.k6.io/k6/errext" "go.k6.io/k6/lib" @@ -17,7 +19,6 @@ import ( "go.k6.io/k6/output" cloudv2 "go.k6.io/k6/output/cloud/expv2" "go.k6.io/k6/usage" - "gopkg.in/guregu/null.v3" ) // TestName is the default k6 Cloud test name @@ -176,8 +177,13 @@ func validateRequiredSystemTags(scriptTags *metrics.SystemTagSet) error { // Start calls the k6 Cloud API to initialize the test run, and then starts the // goroutine that would listen for metric samples and send them to the cloud. func (out *Output) Start() error { - if out.config.PushRefID.Valid { + if out.config.TestRunID.Valid { + out.testRunID = out.config.TestRunID.String + } else if out.config.PushRefID.Valid { out.testRunID = out.config.PushRefID.String + } + + if out.testRunID != "" { out.logger.WithField("testRunId", out.testRunID).Debug("Directly pushing metrics without init") return out.startVersionedOutput() }