Skip to content

Commit

Permalink
Add support for native distributed execution and metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- committed Jun 6, 2023
1 parent fe917b0 commit 55d5751
Show file tree
Hide file tree
Showing 30 changed files with 2,157 additions and 79 deletions.
5 changes: 3 additions & 2 deletions api/v1/group_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"

"go.k6.io/k6/execution"
"go.k6.io/k6/execution/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
Expand Down Expand Up @@ -41,10 +42,10 @@ func getTestRunState(tb testing.TB, options lib.Options, runner lib.Runner) *lib
}

func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurface {
execScheduler, err := execution.NewScheduler(testState)
execScheduler, err := execution.NewScheduler(testState, local.NewController())
require.NoError(tb, err)

me, err := engine.NewMetricsEngine(testState)
me, err := engine.NewMetricsEngine(testState, true)
require.NoError(tb, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
5 changes: 3 additions & 2 deletions api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/execution"
"go.k6.io/k6/execution/local"
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/types"
Expand Down Expand Up @@ -138,9 +139,9 @@ func TestSetupData(t *testing.T) {
TeardownTimeout: types.NullDurationFrom(5 * time.Second),
}, runner)

execScheduler, err := execution.NewScheduler(testState)
execScheduler, err := execution.NewScheduler(testState, local.NewController())
require.NoError(t, err)
metricsEngine, err := engine.NewMetricsEngine(testState)
metricsEngine, err := engine.NewMetricsEngine(testState, true)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
Expand Down
5 changes: 3 additions & 2 deletions api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/execution"
"go.k6.io/k6/execution/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
Expand Down Expand Up @@ -112,10 +113,10 @@ func TestPatchStatus(t *testing.T) {
require.NoError(t, err)

testState := getTestRunState(t, lib.Options{Scenarios: scenarios}, &minirunner.MiniRunner{})
execScheduler, err := execution.NewScheduler(testState)
execScheduler, err := execution.NewScheduler(testState, local.NewController())
require.NoError(t, err)

metricsEngine, err := engine.NewMetricsEngine(testState)
metricsEngine, err := engine.NewMetricsEngine(testState, true)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
Expand Down
192 changes: 192 additions & 0 deletions cmd/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package cmd

import (
"bytes"
"context"
"encoding/json"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"go.k6.io/k6/cmd/state"
"go.k6.io/k6/execution"
"go.k6.io/k6/execution/distributed"
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/loader"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
"google.golang.org/grpc"
"gopkg.in/guregu/null.v3"
)

// TODO: something cleaner
func getMetricsHook(
ctx context.Context, instanceID uint32,
client distributed.DistributedTestClient, logger logrus.FieldLogger,
) func(*engine.MetricsEngine) func() {
logger = logger.WithField("component", "metric-engine-hook")
return func(me *engine.MetricsEngine) func() {
stop := make(chan struct{})
done := make(chan struct{})

dumpMetrics := func() {
logger.Debug("Starting metric dump...")
me.MetricsLock.Lock()
defer me.MetricsLock.Unlock()

metrics := make([]*distributed.MetricDump, 0, len(me.ObservedMetrics))
for _, om := range me.ObservedMetrics {
data, err := om.Sink.Drain()
if err != nil {
logger.Errorf("There was a problem draining the sink for metric %s: %s", om.Name, err)
}
metrics = append(metrics, &distributed.MetricDump{
Name: om.Name,
Data: data,
})
}

data := &distributed.MetricsDump{
InstanceID: instanceID,
Metrics: metrics,
}
_, err := client.SendMetrics(ctx, data)
if err != nil {
logger.Errorf("There was a problem dumping metrics: %s", err)
}
}

go func() {
defer close(done)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
dumpMetrics()
case <-stop:
dumpMetrics()
return
}
}
}()

finalize := func() {
logger.Debug("Final metric dump...")
close(stop)
<-done
logger.Debug("Done!")
}

return finalize
}
}

// TODO: a whole lot of cleanup, refactoring, error handling and hardening
func getCmdAgent(gs *state.GlobalState) *cobra.Command { //nolint: funlen
c := &cmdsRunAndAgent{gs: gs}

c.loadConfiguredTest = func(cmd *cobra.Command, args []string) (
*loadedAndConfiguredTest, execution.Controller, error,
) {
conn, err := grpc.Dial(args[0], grpc.WithInsecure())
if err != nil {
return nil, nil, err
}
c.testEndHook = func(err error) {
gs.Logger.Debugf("k6 agent run ended with err=%s", err)
conn.Close()
}

client := distributed.NewDistributedTestClient(conn)

resp, err := client.Register(gs.Ctx, &distributed.RegisterRequest{})
if err != nil {
return nil, nil, err
}

c.metricsEngineHook = getMetricsHook(gs.Ctx, resp.InstanceID, client, gs.Logger)

controller, err := distributed.NewAgentController(gs.Ctx, resp.InstanceID, client, gs.Logger)
if err != nil {
return nil, nil, err
}

var options lib.Options
if err := json.Unmarshal(resp.Options, &options); err != nil {
return nil, nil, err
}

arc, err := lib.ReadArchive(bytes.NewReader(resp.Archive))
if err != nil {
return nil, nil, err
}

registry := metrics.NewRegistry()
piState := &lib.TestPreInitState{
Logger: gs.Logger,
RuntimeOptions: lib.RuntimeOptions{
NoThresholds: null.BoolFrom(true),
NoSummary: null.BoolFrom(true),
Env: arc.Env,
CompatibilityMode: null.StringFrom(arc.CompatibilityMode),
},
Registry: registry,
BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry),
}

initRunner, err := js.NewFromArchive(piState, arc)
if err != nil {
return nil, nil, err
}

test := &loadedTest{
pwd: arc.Pwd,
sourceRootPath: arc.Filename,
source: &loader.SourceData{
Data: resp.Archive,
URL: arc.FilenameURL,
},
fs: afero.NewMemMapFs(), // TODO: figure out what should be here
fileSystems: arc.Filesystems,
preInitState: piState,
initRunner: initRunner,
}

pseudoConsoldatedConfig := applyDefault(Config{Options: options})
for _, thresholds := range pseudoConsoldatedConfig.Thresholds {
if err = thresholds.Parse(); err != nil {
return nil, nil, err
}
}
derivedConfig, err := deriveAndValidateConfig(pseudoConsoldatedConfig, initRunner.IsExecutable, gs.Logger)
if err != nil {
return nil, nil, err
}

configuredTest := &loadedAndConfiguredTest{
loadedTest: test,
consolidatedConfig: pseudoConsoldatedConfig,
derivedConfig: derivedConfig,
}

gs.Flags.Address = "" // TODO: fix, this is a hack so agents don't start an API server

return configuredTest, controller, nil // TODO
}

agentCmd := &cobra.Command{
Use: "agent",
Short: "Join a distributed load test",
Long: `TODO`,
Args: exactArgsWithMsg(1, "arg should either the IP and port of the controller k6 instance"),
RunE: c.run,
}

// TODO: add flags

return agentCmd
}
2 changes: 1 addition & 1 deletion cmd/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type cmdArchive struct {
}

func (c *cmdArchive) run(cmd *cobra.Command, args []string) error {
test, err := loadAndConfigureTest(c.gs, cmd, args, getPartialConfig)
test, err := loadAndConfigureLocalTest(c.gs, cmd, args, getPartialConfig)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *cmdCloud) run(cmd *cobra.Command, args []string) error {
)
printBar(c.gs, progressBar)

test, err := loadAndConfigureTest(c.gs, cmd, args, getPartialConfig)
test, err := loadAndConfigureLocalTest(c.gs, cmd, args, getPartialConfig)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 55d5751

Please sign in to comment.