Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

k6/cloud module with the testRunId #4092

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions cloudapi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,17 @@ type Config struct {
StopOnError null.Bool `json:"stopOnError" envconfig:"K6_CLOUD_STOP_ON_ERROR"`
APIVersion null.Int `json:"apiVersion" envconfig:"K6_CLOUD_API_VERSION"`

// PushRefID represents the test run id.
// Note: It is a legacy name used by the backend, the code in k6 open-source
// references it as test run id.
// Currently, a renaming is not planned.
// PushRefID is the identifier used by Cloud systems to correlate all the things that
// belong to the same test run/execution. Currently, it is equivalent to the TestRunID.
// But, in the future, or in future solutions (e.g. Synthetic Monitoring), there might be
// no TestRunID and we may still need an identifier to correlate all the things.
PushRefID null.String `json:"pushRefID" envconfig:"K6_CLOUD_PUSH_REF_ID"`

// TestRunID is the test run id, a unique identifier across all the test runs.
// It might be used to correlate all the things that belong to the same test run/execution,
// see PushRefID for more details.
TestRunID null.String `json:"testRunID" envconfig:"K6_CLOUD_TEST_RUN_ID"`

// Defines the max allowed number of time series in a single batch.
MaxTimeSeriesInBatch null.Int `json:"maxTimeSeriesInBatch" envconfig:"K6_CLOUD_MAX_TIME_SERIES_IN_BATCH"`

Expand Down Expand Up @@ -121,6 +126,9 @@ func (c Config) Apply(cfg Config) Config {
if cfg.PushRefID.Valid {
c.PushRefID = cfg.PushRefID
}
if cfg.TestRunID.Valid {
c.TestRunID = cfg.TestRunID
}
if cfg.WebAppURL.Valid {
c.WebAppURL = cfg.WebAppURL
}
Expand Down
19 changes: 19 additions & 0 deletions cmd/cloud_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,25 @@ func (c *cmdCloudRun) preRun(cmd *cobra.Command, args []string) error {

func (c *cmdCloudRun) run(cmd *cobra.Command, args []string) error {
if c.localExecution {
// We know this execution requires a test run to be created in the Cloud.
// So, we create it before delegating the actual execution to the run command.
// To do that, we need to load the test and configure it.
test, err := loadAndConfigureLocalTest(c.runCmd.gs, cmd, args, getCloudRunLocalExecutionConfig)
if err != nil {
return fmt.Errorf("could not load and configure the test: %w", err)
}

// As we've already loaded the test, we can modify the init function to
// reuse the initialized one.
c.runCmd.loadConfiguredTest = func(*cobra.Command, []string) (*loadedAndConfiguredTest, execution.Controller, error) {
return test, local.NewController(), nil
}

// After that, we can create the remote test run.
if err := createCloudTest(c.runCmd.gs, test); err != nil {
return fmt.Errorf("could not create the cloud test run: %w", err)
}

return c.runCmd.run(cmd, args)
}

Expand Down
174 changes: 174 additions & 0 deletions cmd/outputs_cloud.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package cmd

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"strings"
"time"

"github.com/sirupsen/logrus"
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/cloudapi"
"go.k6.io/k6/cmd/state"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/consts"
"go.k6.io/k6/metrics"
)

const defaultTestName = "k6 test"

// createCloudTest performs some test and Cloud configuration validations and if everything
// looks good, then it creates a test run in the k6 Cloud, unless k6 is already running in the Cloud.
// It is also responsible for filling the test run id on the test options, so it can be used later.
// It returns the resulting Cloud configuration as a json.RawMessage, as expected by the Cloud output,
// or an error if something goes wrong.
func createCloudTest(gs *state.GlobalState, test *loadedAndConfiguredTest) error {

Check failure on line 29 in cmd/outputs_cloud.go

View workflow job for this annotation

GitHub Actions / lint

Function 'createCloudTest' is too long (112 > 80) (funlen)
conf, warn, err := cloudapi.GetConsolidatedConfig(
test.derivedConfig.Collectors[builtinOutputCloud.String()],
gs.Env,
"", // Historically used for -o cloud=..., no longer used (deprecated).
test.derivedConfig.Options.Cloud,
test.derivedConfig.Options.External,
)
if err != nil {
return err
}

if warn != "" {
gs.Logger.Warn(warn)
}

// If this is true, then it means that this code is being executed in the k6 Cloud.
// Therefore, we don't need to continue with the test run creation,
// as we don't need to create any test run.
//
// Precisely, the identifier of the test run is conf.TestRunID.
if conf.TestRunID.Valid {
return nil
}

// If not, we continue with some validations and the creation of the test run.
if err := validateRequiredSystemTags(test.derivedConfig.Options.SystemTags); err != nil {
return err
}

if !conf.Name.Valid || conf.Name.String == "" {
scriptPath := test.source.URL.String()
if scriptPath == "" {
// Script from stdin without a name, likely from stdin
return errors.New("script name not set, please specify K6_CLOUD_NAME or options.cloud.name")
}

conf.Name = null.StringFrom(filepath.Base(scriptPath))
}
if conf.Name.String == "-" {
conf.Name = null.StringFrom(defaultTestName)
}

thresholds := make(map[string][]string)
for name, t := range test.derivedConfig.Thresholds {
for _, threshold := range t.Thresholds {
thresholds[name] = append(thresholds[name], threshold.Source)
}
}

et, err := lib.NewExecutionTuple(
test.derivedConfig.Options.ExecutionSegment,
test.derivedConfig.Options.ExecutionSegmentSequence,
)
if err != nil {
return err
}
executionPlan := test.derivedConfig.Options.Scenarios.GetFullExecutionRequirements(et)

duration, testEnds := lib.GetEndOffset(executionPlan)
if !testEnds {
return errors.New("tests with unspecified duration are not allowed when outputting data to k6 cloud")
}

if conf.MetricPushConcurrency.Int64 < 1 {
return fmt.Errorf("metrics push concurrency must be a positive number but is %d",
conf.MetricPushConcurrency.Int64)
}

if conf.MaxTimeSeriesInBatch.Int64 < 1 {
return fmt.Errorf("max allowed number of time series in a single batch must be a positive number but is %d",
conf.MaxTimeSeriesInBatch.Int64)
}

var testArchive *lib.Archive
if !test.derivedConfig.NoArchiveUpload.Bool {
testArchive = test.initRunner.MakeArchive()
}

testRun := &cloudapi.TestRun{
Name: conf.Name.String,
ProjectID: conf.ProjectID.Int64,
VUsMax: int64(lib.GetMaxPossibleVUs(executionPlan)),
Thresholds: thresholds,
Duration: int64(duration / time.Second),
Archive: testArchive,
}

logger := gs.Logger.WithFields(logrus.Fields{"output": builtinOutputCloud.String()})

apiClient := cloudapi.NewClient(
logger, conf.Token.String, conf.Host.String, consts.Version, conf.Timeout.TimeDuration())

response, err := apiClient.CreateTestRun(testRun)
if err != nil {
return err
}

if response.ConfigOverride != nil {
logger.WithFields(logrus.Fields{"override": response.ConfigOverride}).Debug("overriding config options")
conf = conf.Apply(*response.ConfigOverride)
}

conf.TestRunID = null.NewString(response.ReferenceID, true)

raw, err := cloudConfToRawMessage(conf)
if err != nil {
return fmt.Errorf("could not serialize cloud configuration: %w", err)
}

test.derivedConfig.Collectors[builtinOutputCloud.String()] = raw

return nil
}

// validateRequiredSystemTags checks if all required tags are present.
func validateRequiredSystemTags(scriptTags *metrics.SystemTagSet) error {
var missingRequiredTags []string
requiredTags := metrics.SystemTagSet(metrics.TagName |
metrics.TagMethod |
metrics.TagStatus |
metrics.TagError |
metrics.TagCheck |
metrics.TagGroup)
for _, tag := range metrics.SystemTagValues() {
if requiredTags.Has(tag) && !scriptTags.Has(tag) {
missingRequiredTags = append(missingRequiredTags, tag.String())
}
}
if len(missingRequiredTags) > 0 {
return fmt.Errorf(
"the cloud output needs the following system tags enabled: %s",
strings.Join(missingRequiredTags, ", "),
)
}
return nil
}

func cloudConfToRawMessage(conf cloudapi.Config) (json.RawMessage, error) {
var buff bytes.Buffer
enc := json.NewEncoder(&buff)
if err := enc.Encode(conf); err != nil {
return nil, err
}
return buff.Bytes(), nil
}
2 changes: 1 addition & 1 deletion cmd/test_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func loadSystemCertPool(logger logrus.FieldLogger) {
func (lct *loadedAndConfiguredTest) buildTestRunState(
configToReinject lib.Options,
) (*lib.TestRunState, error) {
// This might be the full derived or just the consodlidated options
// This might be the full derived or just the consolidated options
if err := lct.initRunner.SetOptions(configToReinject); err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions js/jsmodules.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/js/modules/k6"
"go.k6.io/k6/js/modules/k6/cloud"
"go.k6.io/k6/js/modules/k6/crypto"
"go.k6.io/k6/js/modules/k6/crypto/x509"
"go.k6.io/k6/js/modules/k6/data"
Expand All @@ -32,6 +33,7 @@ import (
func getInternalJSModules() map[string]interface{} {
return map[string]interface{}{
"k6": k6.New(),
"k6/cloud": cloud.New(),
"k6/crypto": crypto.New(),
"k6/crypto/x509": x509.New(),
"k6/data": data.New(),
Expand Down
110 changes: 110 additions & 0 deletions js/modules/k6/cloud/cloud.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Package cloud implements k6/cloud which lets script find out more about the Cloud execution.
package cloud

import (
"sync"

"github.com/grafana/sobek"
"github.com/mstoykov/envconfig"

"go.k6.io/k6/cloudapi"
"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
)

type (
// RootModule is the global module instance that will create module
// instances for each VU.
RootModule struct{}

// ModuleInstance represents an instance of the execution module.
ModuleInstance struct {
vu modules.VU
obj *sobek.Object

once sync.Once
testRunID sobek.Value
}
)

var (
_ modules.Module = &RootModule{}
_ modules.Instance = &ModuleInstance{}
)

// New returns a pointer to a new RootModule instance.
func New() *RootModule {
return &RootModule{}
}

// NewModuleInstance implements the modules.Module interface to return
// a new instance for each VU.
func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
mi := &ModuleInstance{vu: vu}

rt := vu.Runtime()

mi.obj = rt.NewObject()
defProp := func(name string, getter func() (sobek.Value, error)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Genuine curiosity, is there an advantage to defining a closure for this operation considering it's used only once? Or is it to scope the logic in the code and make it easier to maintain: which I'm completely okay with.

err := mi.obj.DefineAccessorProperty(name, rt.ToValue(func() sobek.Value {
obj, err := getter()
if err != nil {
common.Throw(rt, err)
}
return obj
}), nil, sobek.FLAG_FALSE, sobek.FLAG_TRUE)
if err != nil {
common.Throw(rt, err)
}
}
defProp("testRunId", mi.testRunId)

// By default, we try to load the test run id from the environment variables,
// which corresponds to those scenarios where the k6 binary is running in the Cloud.
var envConf cloudapi.Config
if err := envconfig.Process("", &envConf, vu.InitEnv().LookupEnv); err != nil {
common.Throw(vu.Runtime(), err)
}
if envConf.TestRunID.Valid {
mi.testRunID = mi.vu.Runtime().ToValue(envConf.TestRunID.String)
} else {
mi.testRunID = sobek.Undefined() // Default value.
}

return mi
}

// Exports returns the exports of the execution module.
func (mi *ModuleInstance) Exports() modules.Exports {
return modules.Exports{Default: mi.obj}
}

// testRunId returns a sobek.Value(string) with the Cloud test run id.
//
// This code can be executed in two situations, either when the k6 binary is running in the Cloud, in which case
// the value of the test run id would be available in the environment, and we would have loaded at module initialization
// time; or when the k6 binary is running locally and test run id is present in the options, which we try to read at
// time of running this method, but only once for the whole execution as options won't change anymore.
func (mi *ModuleInstance) testRunId() (sobek.Value, error) {

Check warning on line 88 in js/modules/k6/cloud/cloud.go

View workflow job for this annotation

GitHub Actions / lint

var-naming: method testRunId should be testRunID (revive)
// In case we have a value (e.g. loaded from env), we return it.
// If we're in the init context (where we cannot read the options), we return undefined (the default value).
if !sobek.IsUndefined(mi.testRunID) || mi.vu.State() == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sens to use common.IsNullish here? Or do we want to specifically look only for undefined here?

return mi.testRunID, nil
}

// Otherwise, we try to read the test run id from options.
// We only try it once for the whole execution, as options won't change.
vuState := mi.vu.State()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: as you already check whether mi.vu.State() is nil before, would it make sense moving this assignment at the top of the function? What do you think?

var err error
mi.once.Do(func() {
// We pass almost all values to zero/nil because here we only care about the Cloud configuration present in options.
var optsConf cloudapi.Config
optsConf, _, err = cloudapi.GetConsolidatedConfig(vuState.Options.Cloud, nil, "", nil, nil)

if optsConf.TestRunID.Valid {
mi.testRunID = mi.vu.Runtime().ToValue(optsConf.TestRunID.String)
}
})

return mi.testRunID, err
}
Loading
Loading