Skip to content

Commit

Permalink
Moving driver-unrelated code (metrics) to separate package
Browse files Browse the repository at this point in the history
Signed-off-by: Lazar Cvetković <[email protected]>
  • Loading branch information
cvetkovic committed Nov 25, 2024
1 parent c54df79 commit 8558bb7
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 63 deletions.
46 changes: 2 additions & 44 deletions pkg/driver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
"github.com/vhive-serverless/loader/pkg/common"
mc "github.com/vhive-serverless/loader/pkg/metric"
"math"
"os"
"sync"
"time"
Expand All @@ -25,10 +24,10 @@ func (d *Driver) CreateMetricsScrapper(interval time.Duration,
defer clusterUsageFile.Close()

writerDone.Add(1)
go d.runCSVWriter(knStatRecords, d.outputFilename("kn_stats"), &writerDone)
go mc.RunCSVWriter(knStatRecords, d.outputFilename("kn_stats"), &writerDone)

writerDone.Add(1)
go d.runCSVWriter(scaleRecords, d.outputFilename("deployment_scale"), &writerDone)
go mc.RunCSVWriter(scaleRecords, d.outputFilename("deployment_scale"), &writerDone)

for {
select {
Expand Down Expand Up @@ -67,44 +66,3 @@ func (d *Driver) CreateMetricsScrapper(interval time.Duration,
}
}
}

func (d *Driver) createGlobalMetricsCollector(filename string, collector chan *mc.ExecutionRecord,
signalReady *sync.WaitGroup, signalEverythingWritten *sync.WaitGroup, totalIssuedChannel chan int64) {

// NOTE: totalNumberOfInvocations is initialized to MaxInt64 not to allow collector to complete before
// the end signal is received on totalIssuedChannel, which deliver the total number of issued invocations.
// This number is known once all the individual function drivers finish issuing invocations and
// when all the invocations return
var totalNumberOfInvocations int64 = math.MaxInt64
var currentlyWritten int64

file, err := os.Create(filename)
common.Check(err)
defer file.Close()

signalReady.Done()

records := make(chan interface{}, 100)
writerDone := sync.WaitGroup{}
writerDone.Add(1)
go d.runCSVWriter(records, filename, &writerDone)

for {
select {
case record := <-collector:
records <- record

currentlyWritten++
case record := <-totalIssuedChannel:
totalNumberOfInvocations = record
}

if currentlyWritten == totalNumberOfInvocations {
close(records)
writerDone.Wait()
(*signalEverythingWritten).Done()

return
}
}
}
19 changes: 1 addition & 18 deletions pkg/driver/trace_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package driver

import (
"container/list"
"encoding/csv"
"encoding/json"
"fmt"
"github.com/vhive-serverless/loader/pkg/config"
Expand All @@ -39,7 +38,6 @@ import (
"sync/atomic"
"time"

"github.com/gocarina/gocsv"
log "github.com/sirupsen/logrus"
"github.com/vhive-serverless/loader/pkg/common"
"github.com/vhive-serverless/loader/pkg/generator"
Expand Down Expand Up @@ -79,21 +77,6 @@ func (d *Driver) outputFilename(name string) string {
return fmt.Sprintf("%s_%s_%d.csv", d.Configuration.LoaderConfiguration.OutputPathPrefix, name, d.Configuration.TraceDuration)
}

func (d *Driver) runCSVWriter(records chan interface{}, filename string, writerDone *sync.WaitGroup) {
log.Debugf("Starting writer for %s", filename)

file, err := os.Create(filename)
common.Check(err)
defer file.Close()

writer := gocsv.NewSafeCSVWriter(csv.NewWriter(file))
if err := gocsv.MarshalChan(records, writer); err != nil {
log.Fatal(err)
}

writerDone.Done()
}

func DAGCreation(functions []*common.Function) *list.List {
linkedList := list.New()
// Assigning nodes one after another
Expand Down Expand Up @@ -433,7 +416,7 @@ func (d *Driver) startBackgroundProcesses(allRecordsWritten *sync.WaitGroup) (*s

globalMetricsCollector := make(chan *mc.ExecutionRecord)
totalIssuedChannel := make(chan int64)
go d.createGlobalMetricsCollector(d.outputFilename("duration"), globalMetricsCollector, auxiliaryProcessBarrier, allRecordsWritten, totalIssuedChannel)
go mc.CreateGlobalMetricsCollector(d.outputFilename("duration"), globalMetricsCollector, auxiliaryProcessBarrier, allRecordsWritten, totalIssuedChannel)

traceDurationInMinutes := d.Configuration.TraceDuration
go d.globalTimekeeper(traceDurationInMinutes, auxiliaryProcessBarrier)
Expand Down
2 changes: 1 addition & 1 deletion pkg/driver/trace_driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func TestGlobalMetricsCollector(t *testing.T) {
collectorReady.Add(1)
collectorFinished.Add(1)

go driver.createGlobalMetricsCollector(driver.outputFilename("duration"), inputChannel, collectorReady, collectorFinished, totalIssuedChannel)
go metric.CreateGlobalMetricsCollector(driver.outputFilename("duration"), inputChannel, collectorReady, collectorFinished, totalIssuedChannel)
collectorReady.Wait()

bogusRecord := &metric.ExecutionRecord{
Expand Down
File renamed without changes.
67 changes: 67 additions & 0 deletions pkg/metric/loader_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package metric

import (
"encoding/csv"
"github.com/gocarina/gocsv"
log "github.com/sirupsen/logrus"
"github.com/vhive-serverless/loader/pkg/common"
"math"
"os"
"sync"
)

func RunCSVWriter(records chan interface{}, filename string, writerDone *sync.WaitGroup) {
log.Debugf("Starting writer for %s", filename)

file, err := os.Create(filename)
common.Check(err)
defer file.Close()

writer := gocsv.NewSafeCSVWriter(csv.NewWriter(file))
if err := gocsv.MarshalChan(records, writer); err != nil {
log.Fatal(err)
}

writerDone.Done()
}

func CreateGlobalMetricsCollector(filename string, collector chan *ExecutionRecord,
signalReady *sync.WaitGroup, signalEverythingWritten *sync.WaitGroup, totalIssuedChannel chan int64) {

// NOTE: totalNumberOfInvocations is initialized to MaxInt64 not to allow collector to complete before
// the end signal is received on totalIssuedChannel, which deliver the total number of issued invocations.
// This number is known once all the individual function drivers finish issuing invocations and
// when all the invocations return
var totalNumberOfInvocations int64 = math.MaxInt64
var currentlyWritten int64

file, err := os.Create(filename)
common.Check(err)
defer file.Close()

signalReady.Done()

records := make(chan interface{}, 100)
writerDone := sync.WaitGroup{}
writerDone.Add(1)
go RunCSVWriter(records, filename, &writerDone)

for {
select {
case record := <-collector:
records <- record

currentlyWritten++
case record := <-totalIssuedChannel:
totalNumberOfInvocations = record
}

if currentlyWritten == totalNumberOfInvocations {
close(records)
writerDone.Wait()
(*signalEverythingWritten).Done()

return
}
}
}

0 comments on commit 8558bb7

Please sign in to comment.