Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPS mode with cold/warm ratio #557

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
32f9e1e
RPS mode - new feature
cvetkovic Nov 15, 2024
4033f8d
YAML fixes
cvetkovic Nov 15, 2024
e831ffc
Read/Write IAT and generate specs knobs
cvetkovic Nov 15, 2024
968f069
Fixing tests
cvetkovic Nov 15, 2024
64e3219
Porting minor changes from aux branch
cvetkovic Nov 15, 2024
0eb270c
Failure rate percentage logging
cvetkovic Nov 15, 2024
a86ddf7
Fixing DAG test
cvetkovic Nov 15, 2024
757ebc5
Fixing tests
cvetkovic Nov 15, 2024
ee2367b
Extracting Dirigent metadata parsing from Azure parser
cvetkovic Nov 25, 2024
05198bc
Fixing typo in trace driver
cvetkovic Nov 25, 2024
1ff197d
Extracting specification generator to a separate method
cvetkovic Nov 25, 2024
f046e99
Bugfix on Knative parallel deploy
cvetkovic Nov 25, 2024
c0d3b02
Moving driver-unrelated code (metrics) to separate package
cvetkovic Nov 25, 2024
4887435
Variable renaming and fixing hanging comment
cvetkovic Nov 25, 2024
6d29c10
Fixing IAT for empty time slot
cvetkovic Nov 25, 2024
d6b0591
Fixing runtime specification test
cvetkovic Nov 25, 2024
050d5ac
Bugfix: some invocations got ommited if at the end of array
cvetkovic Nov 25, 2024
07875f8
RPS mode without RpsCooldownSeconds offset with fixed tests
cvetkovic Nov 26, 2024
805e057
Comment on IATArray purpose
cvetkovic Nov 26, 2024
389e81f
Extracting commons from OpenWhisk and our function
cvetkovic Nov 26, 2024
f8f2f13
Shift IAT test and fix
cvetkovic Nov 26, 2024
dff61bc
Bugfixing IAT generation for empty minutes
cvetkovic Nov 26, 2024
279e28d
Fixing linter
cvetkovic Nov 26, 2024
c15a643
More unit tests for low RPS and huge cooldown
cvetkovic Nov 26, 2024
134b82c
Bugfixing IAT generation issues - Leonid's solution
cvetkovic Nov 27, 2024
0843abe
Fixing testing checks
cvetkovic Nov 28, 2024
2c49b49
Another round of code improvements - Leonid's feedback
cvetkovic Nov 28, 2024
090e6a6
Per-minute count check
cvetkovic Nov 28, 2024
20d1dc6
New individual function driver
cvetkovic Nov 28, 2024
ce7250d
Applying new round of Leonid's feedback
cvetkovic Nov 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/configs/wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -777,4 +777,5 @@ PrepullMode
async
AsyncMode
AsyncResponseURL
AsyncWaitToCollectMin
AsyncWaitToCollectMin
Knative's
2 changes: 1 addition & 1 deletion cmd/config_dirigent_dandelion_rps.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"BusyLoopOnSandboxStartup": false,

"AsyncMode": false,
"AsyncResponseURL": "10.0.1.3:8082",
"AsyncResponseURL": "10.0.1.253:8082",
"AsyncWaitToCollectMin": 1,

"RpsTarget": 1,
Expand Down
2 changes: 1 addition & 1 deletion cmd/config_dirigent_dandelion_trace.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"BusyLoopOnSandboxStartup": false,

"AsyncMode": false,
"AsyncResponseURL": "10.0.1.3:8082",
"AsyncResponseURL": "10.0.1.253:8082",
"AsyncWaitToCollectMin": 1,

"TracePath": "data/traces/example",
Expand Down
4 changes: 2 additions & 2 deletions cmd/config_dirigent_rps.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
"InvokeProtocol" : "http2",
"EndpointPort": 80,

"DirigentControlPlaneIP": "10.0.1.253:9092",
"DirigentControlPlaneIP": "10.0.1.253:9091",
"BusyLoopOnSandboxStartup": false,

"AsyncMode": false,
"AsyncResponseURL": "10.0.1.3:8082",
"AsyncResponseURL": "10.0.1.253:8082",
"AsyncWaitToCollectMin": 1,

"RpsTarget": 1,
Expand Down
2 changes: 1 addition & 1 deletion cmd/config_dirigent_trace.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"InvokeProtocol" : "http2",
"EndpointPort": 80,

"DirigentControlPlaneIP": "10.0.1.253:9092",
"DirigentControlPlaneIP": "10.0.1.253:9091",
"BusyLoopOnSandboxStartup": false,

"AsyncMode": false,
Expand Down
41 changes: 34 additions & 7 deletions cmd/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package main
import (
"flag"
"fmt"
"github.com/vhive-serverless/loader/pkg/generator"
"os"
"strings"
"time"
Expand All @@ -49,7 +50,7 @@ var (
configPath = flag.String("config", "cmd/config_knative_trace.json", "Path to loader configuration file")
failurePath = flag.String("failureConfig", "cmd/failure.json", "Path to the failure configuration file")
verbosity = flag.String("verbosity", "info", "Logging verbosity - choose from [info, debug, trace]")
iatGeneration = flag.Bool("iatGeneration", false, "Generate iats only or run invocations as well")
iatGeneration = flag.Bool("iatGeneration", false, "Generate IATs only or run invocations as well")
iatFromFile = flag.Bool("generated", false, "True if iats were already generated")
)

Expand Down Expand Up @@ -107,7 +108,7 @@ func main() {
if !strings.HasSuffix(cfg.Platform, "-RPS") {
runTraceMode(&cfg, *iatFromFile, *iatGeneration)
} else {
runRPSMode(&cfg, *iatGeneration)
runRPSMode(&cfg, *iatFromFile, *iatGeneration)
}
}

Expand Down Expand Up @@ -171,12 +172,17 @@ func parseTraceGranularity(cfg *config.LoaderConfiguration) common.TraceGranular
return common.MinuteGranularity
}

func runTraceMode(cfg *config.LoaderConfiguration, readIATFromFile bool, justGenerateIAT bool) {
func runTraceMode(cfg *config.LoaderConfiguration, readIATFromFile bool, writeIATsToFile bool) {
durationToParse := determineDurationToParse(cfg.ExperimentDuration, cfg.WarmupDuration)
yamlPath := parseYAMLSpecification(cfg)

// Azure trace parsing
traceParser := trace.NewAzureParser(cfg.TracePath, durationToParse)
functions := traceParser.Parse(cfg.Platform)
functions := traceParser.Parse()

// Dirigent metadata parsing
dirigentMetadataParser := trace.NewDirigentMetadataParser(cfg.TracePath, functions, yamlPath, cfg.Platform)
dirigentMetadataParser.Parse()

log.Infof("Traces contain the following %d functions:\n", len(functions))
for _, function := range functions {
Expand All @@ -202,9 +208,30 @@ func runTraceMode(cfg *config.LoaderConfiguration, readIATFromFile bool, justGen

log.Infof("Using %s as a service YAML specification file.\n", experimentDriver.Configuration.YAMLPath)

experimentDriver.RunExperiment(justGenerateIAT, readIATFromFile)
experimentDriver.GenerateSpecification()
experimentDriver.DumpSpecification(writeIATsToFile, readIATFromFile)
experimentDriver.RunExperiment()
}

func runRPSMode(cfg *config.LoaderConfiguration, justGenerateIAT bool) {
panic("Not yet implemented")
func runRPSMode(cfg *config.LoaderConfiguration, readIATFromFile bool, writeIATsToFile bool) {
rpsTarget := cfg.RpsTarget
coldStartPercentage := cfg.RpsColdStartRatioPercentage

warmStartRPS := rpsTarget * (100 - coldStartPercentage) / 100
coldStartRPS := rpsTarget * coldStartPercentage / 100

warmFunction, warmStartCount := generator.GenerateWarmStartFunction(cfg.ExperimentDuration, warmStartRPS)
coldFunctions, coldStartCount := generator.GenerateColdStartFunctions(cfg.ExperimentDuration, coldStartRPS, cfg.RpsCooldownSeconds)

experimentDriver := driver.NewDriver(&config.Configuration{
LoaderConfiguration: cfg,
TraceDuration: determineDurationToParse(cfg.ExperimentDuration, cfg.WarmupDuration),

YAMLPath: parseYAMLSpecification(cfg),

Functions: generator.CreateRPSFunctions(cfg, warmFunction, warmStartCount, coldFunctions, coldStartCount),
})

experimentDriver.DumpSpecification(writeIATsToFile, readIATFromFile)
experimentDriver.RunExperiment()
}
9 changes: 8 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
| AsyncWaitToCollectMin [^6] | int | >= 0 | 0 | Time after experiment ends after which to collect invocation results |
| RpsTarget | int | >= 0 | 0 | Number of requests per second to issue |
| RpsColdStartRatioPercentage | int | >= 0 && <= 100 | 0 | Percentage of cold starts out of specified RPS |
| RpsCooldownSeconds | int | > 0 | 0 | The time it takes for the autoscaler to downscale function (higher for higher RPS) |
| RpsCooldownSeconds [^7] | int | > 0 | 0 | The time it takes for the autoscaler to downscale function (higher for higher RPS) |
| RpsImage | string | N/A | N/A | Function image to use for RPS experiments |
| RpsRuntimeMs | int | >=0 | 0 | Requested execution time |
| RpsMemoryMB | int | >=0 | 0 | Requested memory |
Expand Down Expand Up @@ -55,6 +55,13 @@ Lambda; https://aws.amazon.com/about-aws/whats-new/2018/10/aws-lambda-supports-f

[^6]: Dirigent specific

[^7] Because Knative's minimum autoscaling stable window is 6s, the minimum keep-alive for a function is 6s. This means
that we need multiple functions to achieve RPS=1, each scaling up/and down with a 1-second delay from each other. In RPS
mode, the number of functions for the cold start experiment is determined by the `RpsCooldownSeconds` parameter, which
is the minimum keep-alive. Due to the implementation complexity, the cold start experiment sleeps for the first
cvetkovic marked this conversation as resolved.
Show resolved Hide resolved
`RpsCooldownSeconds` seconds. In the results, the user should discard the first and the last `RpsCooldownSeconds` of the
results, since the RPS at those points is lower than the requested one.

---

InVitro can cause failure on cluster manager components. To do so, please configure the `cmd/failure.json`. Make sure
Expand Down
15 changes: 9 additions & 6 deletions pkg/common/specification_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

package common

// IATMatrix - columns are minutes, rows are IATs
type IATMatrix [][]float64
// IATArray Hold the IATs of invocations for a particular function. Values in this array tells individual function driver
// how much time to sleep before firing an invocation. First invocations should be fired right away after the start of
// experiment, i.e., should typically have a IAT of 0.
type IATArray []float64
cvetkovic marked this conversation as resolved.
Show resolved Hide resolved

// ProbabilisticDuration used for testing the exponential distribution
type ProbabilisticDuration []float64
Expand All @@ -35,10 +37,11 @@ type RuntimeSpecification struct {
Memory int
}

type RuntimeSpecificationMatrix [][]RuntimeSpecification
type RuntimeSpecificationArray []RuntimeSpecification

type FunctionSpecification struct {
IAT IATMatrix `json:"IAT"`
RawDuration ProbabilisticDuration `json:"RawDuration"`
RuntimeSpecification RuntimeSpecificationMatrix `json:"RuntimeSpecification"`
IAT IATArray `json:"IAT"`
PerMinuteCount []int `json:"PerMinuteCount"`
RawDuration ProbabilisticDuration `json:"RawDuration"`
RuntimeSpecification RuntimeSpecificationArray `json:"RuntimeSpecification"`
}
47 changes: 47 additions & 0 deletions pkg/common/workload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package common

// static double SQRTSD (double x) {
// double r;
// __asm__ ("sqrtsd %1, %0" : "=x" (r) : "x" (x));
// return r;
// }
import "C"
import (
"time"
)

const (
// ContainerImageSizeMB was chosen as a median of the container physical memory usage.
// Allocate this much less memory inside the actual function.
ContainerImageSizeMB = 15

ExecUnit int = 1e2
)

func takeSqrts() C.double {
var tmp C.double // Circumvent compiler optimizations
for i := 0; i < ExecUnit; i++ {
tmp = C.SQRTSD(C.double(10))
}
return tmp
}

func busySpin(multiplier, runtimeMilli uint32) {
totalIterations := int(multiplier * runtimeMilli)

for i := 0; i < totalIterations; i++ {
takeSqrts()
}
}

func TraceFunctionExecution(start time.Time, IterationsMultiplier uint32, timeLeftMilliseconds uint32) (msg string) {
timeConsumedMilliseconds := uint32(time.Since(start).Milliseconds())
if timeConsumedMilliseconds < timeLeftMilliseconds {
timeLeftMilliseconds -= timeConsumedMilliseconds
if timeLeftMilliseconds > 0 {
busySpin(uint32(IterationsMultiplier), timeLeftMilliseconds)
}
}

return msg
}
34 changes: 25 additions & 9 deletions pkg/driver/deployment/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"math"
"os/exec"
"regexp"
"runtime"
"strconv"
"sync"
)

const (
Expand Down Expand Up @@ -46,15 +48,27 @@ func newKnativeDeployerConfiguration(cfg *config.Configuration) knativeDeploymen
func (*knativeDeployer) Deploy(cfg *config.Configuration) {
knativeConfig := newKnativeDeployerConfiguration(cfg)

queue := make(chan struct{}, runtime.NumCPU()) // message queue as a sync method
deployed := sync.WaitGroup{}
deployed.Add(len(cfg.Functions))

for i := 0; i < len(cfg.Functions); i++ {
knativeDeploySingleFunction(
cfg.Functions[i],
knativeConfig.YamlPath,
knativeConfig.IsPartiallyPanic,
knativeConfig.EndpointPort,
knativeConfig.AutoscalingMetric,
)
go func() {
queue <- struct{}{}
cvetkovic marked this conversation as resolved.
Show resolved Hide resolved

knativeDeploySingleFunction(
cfg.Functions[i],
knativeConfig.YamlPath,
knativeConfig.IsPartiallyPanic,
knativeConfig.EndpointPort,
knativeConfig.AutoscalingMetric,
&deployed,
queue,
)
}()
}

deployed.Wait()
}

func (*knativeDeployer) Clean() {
Expand All @@ -68,8 +82,10 @@ func (*knativeDeployer) Clean() {
}
}

func knativeDeploySingleFunction(function *common.Function, yamlPath string, isPartiallyPanic bool, endpointPort int,
autoscalingMetric string) bool {
func knativeDeploySingleFunction(function *common.Function, yamlPath string, isPartiallyPanic bool, endpointPort int, autoscalingMetric string, deployed *sync.WaitGroup, queue chan struct{}) bool {
defer deployed.Done()
defer func() { <-queue }()

panicWindow := "\"10.0\""
panicThreshold := "\"200.0\""
if isPartiallyPanic {
Expand Down
68 changes: 68 additions & 0 deletions pkg/driver/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package driver

import (
"encoding/json"
"github.com/vhive-serverless/loader/pkg/common"
mc "github.com/vhive-serverless/loader/pkg/metric"
"os"
"sync"
"time"
)

func (d *Driver) CreateMetricsScrapper(interval time.Duration,
signalReady *sync.WaitGroup, finishCh chan int, allRecordsWritten *sync.WaitGroup) func() {
timer := time.NewTicker(interval)

return func() {
signalReady.Done()
knStatRecords := make(chan interface{}, 100)
scaleRecords := make(chan interface{}, 100)
writerDone := sync.WaitGroup{}

clusterUsageFile, err := os.Create(d.outputFilename("cluster_usage"))
common.Check(err)
defer clusterUsageFile.Close()

writerDone.Add(1)
go mc.RunCSVWriter(knStatRecords, d.outputFilename("kn_stats"), &writerDone)

writerDone.Add(1)
go mc.RunCSVWriter(scaleRecords, d.outputFilename("deployment_scale"), &writerDone)

for {
select {
case <-timer.C:
recCluster := mc.ScrapeClusterUsage()
recCluster.Timestamp = time.Now().UnixMicro()

byteArr, err := json.Marshal(recCluster)
common.Check(err)

_, err = clusterUsageFile.Write(byteArr)
common.Check(err)

_, err = clusterUsageFile.WriteString("\n")
common.Check(err)

recScale := mc.ScrapeDeploymentScales()
timestamp := time.Now().UnixMicro()
for _, rec := range recScale {
rec.Timestamp = timestamp
scaleRecords <- rec
}

recKnative := mc.ScrapeKnStats()
recKnative.Timestamp = time.Now().UnixMicro()
knStatRecords <- recKnative
case <-finishCh:
close(knStatRecords)
close(scaleRecords)

writerDone.Wait()
allRecordsWritten.Done()

return
}
}
}
}
Loading
Loading