Skip to content

Commit

Permalink
Return result from SegmentedIndex methods, remove superfluous locking
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivan Mirić committed Jun 21, 2021
1 parent a459dda commit a2aee84
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 46 deletions.
34 changes: 15 additions & 19 deletions lib/execution_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,18 +738,24 @@ func (et *ExecutionTuple) GetNewExecutionTupleFromValue(value int64) (*Execution
type SegmentedIndex struct ***REMOVED***
start, lcd int64
offsets []int64
mx sync.RWMutex
mx sync.Mutex
scaled, unscaled int64 // for both the first element(vu) is 1 not 0
***REMOVED***

// SegmentedIndexResult holds the new index values after being changed by
// Next(), Prev() or GoTo().
type SegmentedIndexResult struct ***REMOVED***
Scaled, Unscaled int64
***REMOVED***

// NewSegmentedIndex returns a pointer to a new SegmentedIndex instance,
// given a starting index, LCD and offsets as returned by GetStripedOffsets().
func NewSegmentedIndex(start, lcd int64, offsets []int64) *SegmentedIndex ***REMOVED***
return &SegmentedIndex***REMOVED***start: start, lcd: lcd, offsets: offsets***REMOVED***
***REMOVED***

// Next goes to the next scaled index and moves the unscaled one accordingly.
func (s *SegmentedIndex) Next() ***REMOVED***
func (s *SegmentedIndex) Next() SegmentedIndexResult ***REMOVED***
s.mx.Lock()
defer s.mx.Unlock()
if s.scaled == 0 ***REMOVED*** // the 1 element(VU) is at the start
Expand All @@ -758,11 +764,13 @@ func (s *SegmentedIndex) Next() ***REMOVED***
s.unscaled += s.offsets[int(s.scaled-1)%len(s.offsets)] // slice's index start at 0 ours start at 1
***REMOVED***
s.scaled++

return SegmentedIndexResult***REMOVED***Scaled: s.scaled, Unscaled: s.unscaled***REMOVED***
***REMOVED***

// Prev goes to the previous scaled value and sets the unscaled one accordingly.
// Calling Prev when s.scaled == 0 is undefined.
func (s *SegmentedIndex) Prev() ***REMOVED***
func (s *SegmentedIndex) Prev() SegmentedIndexResult ***REMOVED***
s.mx.Lock()
defer s.mx.Unlock()
if s.scaled == 1 ***REMOVED*** // we are the first need to go to the 0th element which means we need to remove the start
Expand All @@ -771,11 +779,13 @@ func (s *SegmentedIndex) Prev() ***REMOVED***
s.unscaled -= s.offsets[int(s.scaled-2)%len(s.offsets)] // slice's index start 0 our start at 1
***REMOVED***
s.scaled--

return SegmentedIndexResult***REMOVED***Scaled: s.scaled, Unscaled: s.unscaled***REMOVED***
***REMOVED***

// GoTo sets the scaled index to its biggest value for which the corresponding
// unscaled index is smaller or equal to value.
func (s *SegmentedIndex) GoTo(value int64) int64 ***REMOVED*** // TODO optimize
func (s *SegmentedIndex) GoTo(value int64) SegmentedIndexResult ***REMOVED*** // TODO optimize
s.mx.Lock()
defer s.mx.Unlock()
var gi int64
Expand Down Expand Up @@ -810,19 +820,5 @@ func (s *SegmentedIndex) GoTo(value int64) int64 ***REMOVED*** // TODO optimize
s.unscaled = 0 // we would've added the start and 1
***REMOVED***

return s.scaled
***REMOVED***

// GetScaled returns the scaled value.
func (s *SegmentedIndex) GetScaled() int64 ***REMOVED***
s.mx.RLock()
defer s.mx.RUnlock()
return s.scaled
***REMOVED***

// GetUnscaled returns the unscaled value.
func (s *SegmentedIndex) GetUnscaled() int64 ***REMOVED***
s.mx.RLock()
defer s.mx.RUnlock()
return s.unscaled
return SegmentedIndexResult***REMOVED***Scaled: s.scaled, Unscaled: s.unscaled***REMOVED***
***REMOVED***
8 changes: 2 additions & 6 deletions lib/executor/constant_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ func (carc ConstantArrivalRateConfig) NewExecutor(
return &ConstantArrivalRate***REMOVED***
BaseExecutor: NewBaseExecutor(&carc, es, logger),
config: carc,
iterMx: &sync.Mutex***REMOVED******REMOVED***,
***REMOVED***, nil
***REMOVED***

Expand All @@ -186,7 +185,6 @@ type ConstantArrivalRate struct ***REMOVED***
*BaseExecutor
config ConstantArrivalRateConfig
et *lib.ExecutionTuple
iterMx *sync.Mutex
segIdx *lib.SegmentedIndex
***REMOVED***

Expand All @@ -210,11 +208,9 @@ func (car *ConstantArrivalRate) Init(ctx context.Context) error ***REMOVED***
// Unlike the local iteration number returned by getNextLocalIter(), this
// iteration number will be unique across k6 instances.
func (car *ConstantArrivalRate) getNextGlobalIter() uint64 ***REMOVED***
car.iterMx.Lock()
defer car.iterMx.Unlock()
car.segIdx.Next()
res := car.segIdx.Next()
// iterations are 0-based
return uint64(car.segIdx.GetUnscaled() - 1)
return uint64(res.Unscaled - 1)
***REMOVED***

// Run executes a constant number of iterations per second.
Expand Down
8 changes: 2 additions & 6 deletions lib/executor/ramping_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ func (varc RampingArrivalRateConfig) NewExecutor(
return &RampingArrivalRate***REMOVED***
BaseExecutor: NewBaseExecutor(&varc, es, logger),
config: varc,
iterMx: &sync.Mutex***REMOVED******REMOVED***,
***REMOVED***, nil
***REMOVED***

Expand All @@ -177,7 +176,6 @@ type RampingArrivalRate struct ***REMOVED***
*BaseExecutor
config RampingArrivalRateConfig
et *lib.ExecutionTuple
iterMx *sync.Mutex
segIdx *lib.SegmentedIndex
***REMOVED***

Expand All @@ -201,11 +199,9 @@ func (varr *RampingArrivalRate) Init(ctx context.Context) error ***REMOVED***
// Unlike the local iteration number returned by getNextLocalIter(), this
// iteration number will be unique across k6 instances.
func (varr *RampingArrivalRate) getNextGlobalIter() uint64 ***REMOVED***
varr.iterMx.Lock()
defer varr.iterMx.Unlock()
varr.segIdx.Next()
res := varr.segIdx.Next()
// iterations are 0-based
return uint64(varr.segIdx.GetUnscaled() - 1)
return uint64(res.Unscaled - 1)
***REMOVED***

// cal calculates the transtitions between stages and gives the next full value produced by the
Expand Down
20 changes: 11 additions & 9 deletions lib/executor/ramping_vus.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ func (vlvc RampingVUsConfig) getRawExecutionSteps(et *lib.ExecutionTuple, zeroEn
)

// Reserve the scaled StartVUs at the beginning
steps = append(steps, lib.ExecutionStep***REMOVED***TimeOffset: 0, PlannedVUs: uint64(index.GoTo(fromVUs))***REMOVED***)
res := index.GoTo(fromVUs)
steps = append(steps, lib.ExecutionStep***REMOVED***TimeOffset: 0, PlannedVUs: uint64(res.Scaled)***REMOVED***)
addStep := func(timeOffset time.Duration, plannedVUs uint64) ***REMOVED***
if steps[len(steps)-1].PlannedVUs != plannedVUs ***REMOVED***
steps = append(steps, lib.ExecutionStep***REMOVED***TimeOffset: timeOffset, PlannedVUs: plannedVUs***REMOVED***)
Expand All @@ -212,30 +213,31 @@ func (vlvc RampingVUsConfig) getRawExecutionSteps(et *lib.ExecutionTuple, zeroEn
continue
***REMOVED***
if stageDuration == 0 ***REMOVED***
addStep(timeTillEnd, uint64(index.GoTo(stageEndVUs)))
res = index.GoTo(stageEndVUs)
addStep(timeTillEnd, uint64(res.Scaled))
fromVUs = stageEndVUs
continue
***REMOVED***

// VU reservation for gracefully ramping down is handled as a
// separate method: reserveVUsForGracefulRampDowns()
if index.GetUnscaled() > stageEndVUs ***REMOVED*** // ramp down
if res.Unscaled > stageEndVUs ***REMOVED*** // ramp down
// here we don't want to emit for the equal to stageEndVUs as it doesn't go below it
// it will just go to it
for ; index.GetUnscaled() > stageEndVUs; index.Prev() ***REMOVED***
for ; res.Unscaled > stageEndVUs; res = index.Prev() ***REMOVED***
addStep(
// this is the time that we should go up 1 if we are ramping up
// but we are ramping down so we should go 1 down, but because we want to not
// stop VUs immediately we stop it on the next unscaled VU's time
timeTillEnd-time.Duration(int64(stageDuration)*(stageEndVUs-index.GetUnscaled()+1)/stageVUDiff),
uint64(index.GetScaled()-1),
timeTillEnd-time.Duration(int64(stageDuration)*(stageEndVUs-res.Unscaled+1)/stageVUDiff),
uint64(res.Scaled-1),
)
***REMOVED***
***REMOVED*** else ***REMOVED***
for ; index.GetUnscaled() <= stageEndVUs; index.Next() ***REMOVED***
for ; res.Unscaled <= stageEndVUs; res = index.Next() ***REMOVED***
addStep(
timeTillEnd-time.Duration(int64(stageDuration)*(stageEndVUs-index.GetUnscaled())/stageVUDiff),
uint64(index.GetScaled()),
timeTillEnd-time.Duration(int64(stageDuration)*(stageEndVUs-res.Unscaled)/stageVUDiff),
uint64(res.Scaled),
)
***REMOVED***
***REMOVED***
Expand Down
8 changes: 2 additions & 6 deletions lib/executor/shared_iterations.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func (sic SharedIterationsConfig) NewExecutor(
return &SharedIterations***REMOVED***
BaseExecutor: NewBaseExecutor(sic, es, logger),
config: sic,
iterMx: &sync.Mutex***REMOVED******REMOVED***,
***REMOVED***, nil
***REMOVED***

Expand All @@ -161,7 +160,6 @@ type SharedIterations struct ***REMOVED***
*BaseExecutor
config SharedIterationsConfig
et *lib.ExecutionTuple
iterMx *sync.Mutex
segIdx *lib.SegmentedIndex
***REMOVED***

Expand Down Expand Up @@ -190,11 +188,9 @@ func (si *SharedIterations) Init(ctx context.Context) error ***REMOVED***
// Unlike the local iteration number returned by getNextLocalIter(), this
// iteration number will be unique across k6 instances.
func (si *SharedIterations) getNextGlobalIter() uint64 ***REMOVED***
si.iterMx.Lock()
defer si.iterMx.Unlock()
si.segIdx.Next()
res := si.segIdx.Next()
// iterations are 0-based
return uint64(si.segIdx.GetUnscaled() - 1)
return uint64(res.Unscaled - 1)
***REMOVED***

// Run executes a specific total number of iterations, which are all shared by
Expand Down

0 comments on commit a2aee84

Please sign in to comment.