-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
k6/cloud module with the testRunId #4092
base: master
Are you sure you want to change the base?
Changes from all commits
dd7efc8
20fe571
436e0a9
82400de
e7eae06
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
package cmd | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"path/filepath" | ||
"strings" | ||
"time" | ||
|
||
"github.com/sirupsen/logrus" | ||
"gopkg.in/guregu/null.v3" | ||
|
||
"go.k6.io/k6/cloudapi" | ||
"go.k6.io/k6/cmd/state" | ||
"go.k6.io/k6/lib" | ||
"go.k6.io/k6/lib/consts" | ||
"go.k6.io/k6/metrics" | ||
) | ||
|
||
const defaultTestName = "k6 test" | ||
|
||
// createCloudTest performs some test and Cloud configuration validations and if everything | ||
// looks good, then it creates a test run in the k6 Cloud, unless k6 is already running in the Cloud. | ||
// It is also responsible for filling the test run id on the test options, so it can be used later. | ||
// It returns the resulting Cloud configuration as a json.RawMessage, as expected by the Cloud output, | ||
// or an error if something goes wrong. | ||
func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest) error { | ||
conf, warn, err := cloudapi.GetConsolidatedConfig( | ||
test.derivedConfig.Collectors[builtinOutputCloud.String()], | ||
gs.Env, | ||
"", // Historically used for -o cloud=..., no longer used (deprecated). | ||
test.derivedConfig.Options.Cloud, | ||
test.derivedConfig.Options.External, | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if warn != "" { | ||
gs.Logger.Warn(warn) | ||
} | ||
|
||
// If this is true, then it means that this code is being executed in the k6 Cloud. | ||
// Therefore, we don't need to continue with the test run creation, | ||
// as we don't need to create any test run. | ||
// | ||
// Precisely, the identifier of the test run is conf.TestRunID. | ||
if conf.TestRunID.Valid { | ||
return nil | ||
} | ||
|
||
// If not, we continue with some validations and the creation of the test run. | ||
if err := validateRequiredSystemTags(test.derivedConfig.Options.SystemTags); err != nil { | ||
return err | ||
} | ||
|
||
if !conf.Name.Valid || conf.Name.String == "" { | ||
scriptPath := test.source.URL.String() | ||
if scriptPath == "" { | ||
// Script from stdin without a name, likely from stdin | ||
return errors.New("script name not set, please specify K6_CLOUD_NAME or options.cloud.name") | ||
} | ||
|
||
conf.Name = null.StringFrom(filepath.Base(scriptPath)) | ||
} | ||
if conf.Name.String == "-" { | ||
conf.Name = null.StringFrom(defaultTestName) | ||
} | ||
|
||
thresholds := make(map[string][]string) | ||
for name, t := range test.derivedConfig.Thresholds { | ||
for _, threshold := range t.Thresholds { | ||
thresholds[name] = append(thresholds[name], threshold.Source) | ||
} | ||
} | ||
|
||
et, err := lib.NewExecutionTuple( | ||
test.derivedConfig.Options.ExecutionSegment, | ||
test.derivedConfig.Options.ExecutionSegmentSequence, | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
executionPlan := test.derivedConfig.Options.Scenarios.GetFullExecutionRequirements(et) | ||
|
||
duration, testEnds := lib.GetEndOffset(executionPlan) | ||
if !testEnds { | ||
return errors.New("tests with unspecified duration are not allowed when outputting data to k6 cloud") | ||
} | ||
|
||
if conf.MetricPushConcurrency.Int64 < 1 { | ||
return fmt.Errorf("metrics push concurrency must be a positive number but is %d", | ||
conf.MetricPushConcurrency.Int64) | ||
} | ||
|
||
if conf.MaxTimeSeriesInBatch.Int64 < 1 { | ||
return fmt.Errorf("max allowed number of time series in a single batch must be a positive number but is %d", | ||
conf.MaxTimeSeriesInBatch.Int64) | ||
} | ||
|
||
var testArchive *lib.Archive | ||
if !test.derivedConfig.NoArchiveUpload.Bool { | ||
testArchive = test.initRunner.MakeArchive() | ||
} | ||
|
||
testRun := &cloudapi.TestRun{ | ||
Name: conf.Name.String, | ||
ProjectID: conf.ProjectID.Int64, | ||
VUsMax: int64(lib.GetMaxPossibleVUs(executionPlan)), | ||
Thresholds: thresholds, | ||
Duration: int64(duration / time.Second), | ||
Archive: testArchive, | ||
} | ||
|
||
logger := gs.Logger.WithFields(logrus.Fields{"output": builtinOutputCloud.String()}) | ||
|
||
apiClient := cloudapi.NewClient( | ||
logger, conf.Token.String, conf.Host.String, consts.Version, conf.Timeout.TimeDuration()) | ||
|
||
response, err := apiClient.CreateTestRun(testRun) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if response.ConfigOverride != nil { | ||
logger.WithFields(logrus.Fields{"override": response.ConfigOverride}).Debug("overriding config options") | ||
conf = conf.Apply(*response.ConfigOverride) | ||
} | ||
|
||
conf.TestRunID = null.NewString(response.ReferenceID, true) | ||
|
||
raw, err := cloudConfToRawMessage(conf) | ||
if err != nil { | ||
return fmt.Errorf("could not serialize cloud configuration: %w", err) | ||
} | ||
|
||
test.derivedConfig.Collectors[builtinOutputCloud.String()] = raw | ||
|
||
return nil | ||
} | ||
|
||
// validateRequiredSystemTags checks if all required tags are present. | ||
func validateRequiredSystemTags(scriptTags *metrics.SystemTagSet) error { | ||
var missingRequiredTags []string | ||
requiredTags := metrics.SystemTagSet(metrics.TagName | | ||
metrics.TagMethod | | ||
metrics.TagStatus | | ||
metrics.TagError | | ||
metrics.TagCheck | | ||
metrics.TagGroup) | ||
for _, tag := range metrics.SystemTagValues() { | ||
if requiredTags.Has(tag) && !scriptTags.Has(tag) { | ||
missingRequiredTags = append(missingRequiredTags, tag.String()) | ||
} | ||
} | ||
if len(missingRequiredTags) > 0 { | ||
return fmt.Errorf( | ||
"the cloud output needs the following system tags enabled: %s", | ||
strings.Join(missingRequiredTags, ", "), | ||
) | ||
} | ||
return nil | ||
} | ||
|
||
func cloudConfToRawMessage(conf cloudapi.Config) (json.RawMessage, error) { | ||
var buff bytes.Buffer | ||
enc := json.NewEncoder(&buff) | ||
if err := enc.Encode(conf); err != nil { | ||
return nil, err | ||
} | ||
return buff.Bytes(), nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
// Package cloud implements k6/cloud which lets script find out more about the Cloud execution. | ||
package cloud | ||
|
||
import ( | ||
"sync" | ||
|
||
"github.com/grafana/sobek" | ||
"github.com/mstoykov/envconfig" | ||
|
||
"go.k6.io/k6/cloudapi" | ||
"go.k6.io/k6/js/common" | ||
"go.k6.io/k6/js/modules" | ||
) | ||
|
||
type ( | ||
// RootModule is the global module instance that will create module | ||
// instances for each VU. | ||
RootModule struct{} | ||
|
||
// ModuleInstance represents an instance of the execution module. | ||
ModuleInstance struct { | ||
vu modules.VU | ||
obj *sobek.Object | ||
|
||
once sync.Once | ||
testRunID sobek.Value | ||
} | ||
) | ||
|
||
var ( | ||
_ modules.Module = &RootModule{} | ||
_ modules.Instance = &ModuleInstance{} | ||
) | ||
|
||
// New returns a pointer to a new RootModule instance. | ||
func New() *RootModule { | ||
return &RootModule{} | ||
} | ||
|
||
// NewModuleInstance implements the modules.Module interface to return | ||
// a new instance for each VU. | ||
func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance { | ||
mi := &ModuleInstance{vu: vu} | ||
|
||
rt := vu.Runtime() | ||
|
||
mi.obj = rt.NewObject() | ||
defProp := func(name string, getter func() (sobek.Value, error)) { | ||
err := mi.obj.DefineAccessorProperty(name, rt.ToValue(func() sobek.Value { | ||
obj, err := getter() | ||
if err != nil { | ||
common.Throw(rt, err) | ||
} | ||
return obj | ||
}), nil, sobek.FLAG_FALSE, sobek.FLAG_TRUE) | ||
if err != nil { | ||
common.Throw(rt, err) | ||
} | ||
} | ||
defProp("testRunId", mi.testRunId) | ||
|
||
// By default, we try to load the test run id from the environment variables, | ||
// which corresponds to those scenarios where the k6 binary is running in the Cloud. | ||
var envConf cloudapi.Config | ||
if err := envconfig.Process("", &envConf, vu.InitEnv().LookupEnv); err != nil { | ||
common.Throw(vu.Runtime(), err) | ||
} | ||
if envConf.TestRunID.Valid { | ||
mi.testRunID = mi.vu.Runtime().ToValue(envConf.TestRunID.String) | ||
} else { | ||
mi.testRunID = sobek.Undefined() // Default value. | ||
} | ||
|
||
return mi | ||
} | ||
|
||
// Exports returns the exports of the execution module. | ||
func (mi *ModuleInstance) Exports() modules.Exports { | ||
return modules.Exports{Default: mi.obj} | ||
} | ||
|
||
// testRunId returns a sobek.Value(string) with the Cloud test run id. | ||
// | ||
// This code can be executed in two situations, either when the k6 binary is running in the Cloud, in which case | ||
// the value of the test run id would be available in the environment, and we would have loaded at module initialization | ||
// time; or when the k6 binary is running locally and test run id is present in the options, which we try to read at | ||
// time of running this method, but only once for the whole execution as options won't change anymore. | ||
func (mi *ModuleInstance) testRunId() (sobek.Value, error) { | ||
// In case we have a value (e.g. loaded from env), we return it. | ||
// If we're in the init context (where we cannot read the options), we return undefined (the default value). | ||
if !sobek.IsUndefined(mi.testRunID) || mi.vu.State() == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sens to use common.IsNullish here? Or do we want to specifically look only for undefined here? |
||
return mi.testRunID, nil | ||
} | ||
|
||
// Otherwise, we try to read the test run id from options. | ||
// We only try it once for the whole execution, as options won't change. | ||
vuState := mi.vu.State() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: as you already check whether mi.vu.State() is nil before, would it make sense moving this assignment at the top of the function? What do you think? |
||
var err error | ||
mi.once.Do(func() { | ||
// We pass almost all values to zero/nil because here we only care about the Cloud configuration present in options. | ||
var optsConf cloudapi.Config | ||
optsConf, _, err = cloudapi.GetConsolidatedConfig(vuState.Options.Cloud, nil, "", nil, nil) | ||
|
||
if optsConf.TestRunID.Valid { | ||
mi.testRunID = mi.vu.Runtime().ToValue(optsConf.TestRunID.String) | ||
} | ||
}) | ||
|
||
return mi.testRunID, err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Genuine curiosity, is there an advantage to defining a closure for this operation considering it's used only once? Or is it to scope the logic in the code and make it easier to maintain: which I'm completely okay with.