Skip to content

Commit

Permalink
feat: batch kept decisions (#1419)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

The throughput of Redis pub/sub is impacted by the number of messages
sent per time unit. To improve performance and ensure a higher
throughput of kept messages, we need to reduce the number of kept
messages sent.
This PR implements a batching strategy similar to the one used for
dropped messages, reducing the frequency of sending individual kept
messages.

## Short description of the changes

- Introduced a batching strategy for kept messages, similar to the
existing strategy for dropped messages.
- Refactor kept and dropped decision processing to reduce duplicated
code
  • Loading branch information
VinozzZ authored Nov 8, 2024
1 parent 313f4bf commit 20896ee
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 199 deletions.
220 changes: 106 additions & 114 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import (
)

const (
keptTraceDecisionTopic = "trace_decision_kept"
droppedTraceDecisionTopic = "trace_decision_dropped"
decisionMessageBufferSize = 10_000
defaultDropDecisionTicker = 1 * time.Second
keptTraceDecisionTopic = "trace_decision_kept"
dropTraceDecisionTopic = "trace_decision_dropped"
decisionMessageBufferSize = 10_000
defaultDropDecisionTickerInterval = 1 * time.Second
defaultKeptDecisionTickerInterval = 100 * time.Millisecond
)

var ErrWouldBlock = errors.New("Dropping span as channel buffer is full. Span will not be processed and will be lost.")
Expand Down Expand Up @@ -113,8 +114,8 @@ type InMemCollector struct {
dropDecisionMessages chan string
keptDecisionMessages chan string

dropDecisionBatch chan string
keptDecisionBuffer chan string
dropDecisionBuffer chan TraceDecision
keptDecisionBuffer chan TraceDecision
hostname string
}

Expand Down Expand Up @@ -213,10 +214,10 @@ func (i *InMemCollector) Start() error {
i.keptDecisionMessages = make(chan string, decisionMessageBufferSize)
i.dropDecisionMessages = make(chan string, decisionMessageBufferSize)
i.PubSub.Subscribe(context.Background(), keptTraceDecisionTopic, i.signalKeptTraceDecisions)
i.PubSub.Subscribe(context.Background(), droppedTraceDecisionTopic, i.signalDroppedTraceDecisions)
i.PubSub.Subscribe(context.Background(), dropTraceDecisionTopic, i.signalDroppedTraceDecisions)

i.dropDecisionBatch = make(chan string, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize*5)
i.keptDecisionBuffer = make(chan string, 100_000)
i.dropDecisionBuffer = make(chan TraceDecision, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize*5)
i.keptDecisionBuffer = make(chan TraceDecision, i.Config.GetCollectionConfig().MaxKeptDecisionBatchSize*5)
}

// spin up one collector because this is a single threaded collector
Expand Down Expand Up @@ -425,14 +426,14 @@ func (i *InMemCollector) collect() {
span.End()
return
}
i.processDropDecisions(msg)
i.processTraceDecisions(msg, dropDecision)
case msg, ok := <-i.keptDecisionMessages:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}
i.processKeptDecision(msg)
i.processTraceDecisions(msg, keptDecision)
case <-ticker.C:
i.sendExpiredTracesInCache(ctx, i.Clock.Now())
i.checkAlloc(ctx)
Expand Down Expand Up @@ -1065,7 +1066,7 @@ func (i *InMemCollector) Stop() error {
close(i.outgoingTraces)

if !i.Config.GetCollectionConfig().EnableTraceLocality {
close(i.dropDecisionBatch)
close(i.dropDecisionBuffer)
close(i.keptDecisionBuffer)
}

Expand Down Expand Up @@ -1371,62 +1372,58 @@ func (i *InMemCollector) signalDroppedTraceDecisions(ctx context.Context, msg st
i.Logger.Warn().Logf("dropped trace decision channel is full. Dropping message")
}
}
func (i *InMemCollector) processTraceDecisions(msg string, decisionType decisionType) {
i.Metrics.Increment(fmt.Sprintf("%s_decision_batches_received", decisionType.String()))
if len(msg) == 0 {
return
}

func (i *InMemCollector) processDropDecisions(msg string) {
i.Metrics.Increment("drop_decision_batches_received")
// Deserialize the message into trace decisions
decisions := make([]TraceDecision, 0)
var err error
switch decisionType {
case keptDecision:
decisions, err = newKeptTraceDecision(msg)
if err != nil {
i.Logger.Error().Logf("Failed to unmarshal kept trace decision message. %s", err)
return
}
case dropDecision:
decisions, err = newDroppedTraceDecision(msg)
if err != nil {
i.Logger.Error().Logf("Failed to unmarshal drop trace decision message. %s", err)
return
}
default:
i.Logger.Error().Logf("unknown decision type %s while processing trace decisions", decisionType)
return
}

ids := newDroppedTraceDecision(msg)
i.Metrics.Count(fmt.Sprintf("%s_decisions_received", decisionType.String()), len(decisions))

if len(ids) == 0 {
if len(decisions) == 0 {
return
}

i.Metrics.Count("drop_decisions_received", len(ids))

toDelete := generics.NewSet[string]()
for _, id := range ids {

trace := i.cache.Get(id)
// if we don't have the trace in the cache, we don't need to do anything
for _, decision := range decisions {
// Assume TraceDecision implements a common interface like TraceID
trace := i.cache.Get(decision.TraceID)
if trace == nil {
i.Logger.Debug().Logf("trace not found in cache for trace decision")
i.Logger.Debug().Logf("trace not found in cache for %s decision", decisionType.String())
continue
}
toDelete.Add(id)

i.sampleTraceCache.Record(trace, false, "")

}

i.cache.RemoveTraces(toDelete)
}

func (i *InMemCollector) processKeptDecision(msg string) {
i.Metrics.Increment("kept_decision_batches_received")
toDelete.Add(decision.TraceID)

td, err := newKeptTraceDecision(msg)
if err != nil {
i.Logger.Error().Logf("Failed to unmarshal trace decision message. %s", err)
return
}
if decisionType == keptDecision {
trace.SetSampleRate(decision.SampleRate)
trace.KeepSample = decision.Kept
}

// TODO: when we batch keep decisions too, we should count the number of traces in the batch, eg:
// i.Metrics.Count("kept_decisions_received", int64(len(td)))
i.sampleTraceCache.Record(trace, decision.Kept, decision.KeptReason)

toDelete := generics.NewSet[string]()
trace := i.cache.Get(td.TraceID)
// if we don't have the trace in the cache, we don't need to do anything
if trace == nil {
i.Logger.Debug().Logf("trace not found in cache for trace decision")
return
i.send(context.Background(), trace, &decision)
}
toDelete.Add(td.TraceID)
trace.SetSampleRate(td.SampleRate)
trace.KeepSample = td.Kept

i.sampleTraceCache.Record(trace, td.Kept, td.KeptReason)

i.send(context.Background(), trace, td)

i.cache.RemoveTraces(toDelete)
}
Expand Down Expand Up @@ -1528,35 +1525,17 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis
_, span := otelutil.StartSpanWith(ctx, i.Tracer, "publishTraceDecision", "decision", td.Kept)
defer span.End()

var (
decisionMsg string
err error
)

if td.Kept {
decisionMsg, err = newKeptDecisionMessage(td)
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"trace_id": td.TraceID,
"kept": td.Kept,
"reason": td.KeptReason,
"sampler": td.SamplerKey,
"selector": td.SamplerSelector,
"error": err.Error(),
}).Logf("Failed to create trace decision message")
return
}

select {
case i.keptDecisionBuffer <- decisionMsg:
case i.keptDecisionBuffer <- td:
default:
i.Metrics.Increment("collector_kept_decisions_queue_full")
i.Logger.Warn().Logf("kept trace decision buffer is full. Dropping message")
}
return
} else {
select {
case i.dropDecisionBatch <- td.TraceID:
case i.dropDecisionBuffer <- td:
default:
i.Metrics.Increment("collector_drop_decisions_queue_full")
i.Logger.Warn().Logf("drop trace decision buffer is full. Dropping message")
Expand All @@ -1569,93 +1548,106 @@ func (i *InMemCollector) sendKeptDecisions() {
if i.Config.GetCollectionConfig().EnableTraceLocality {
return
}

ctx := context.Background()
for msg := range i.keptDecisionBuffer {
err := i.PubSub.Publish(ctx, keptTraceDecisionTopic, msg)
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"error": err.Error(),
}).Logf("Failed to publish trace decision")
}

interval := time.Duration(i.Config.GetCollectionConfig().KeptDecisionSendInterval)
if interval == 0 {
interval = defaultKeptDecisionTickerInterval
}
i.sendDecisions(i.keptDecisionBuffer, interval, i.Config.GetCollectionConfig().MaxKeptDecisionBatchSize, keptDecision)
}

func (i *InMemCollector) sendDropDecisions() {
if i.Config.GetCollectionConfig().EnableTraceLocality {
return
}

timerInterval := time.Duration(i.Config.GetCollectionConfig().DropDecisionSendInterval)
if i.Config.GetCollectionConfig().DropDecisionSendInterval == 0 {
timerInterval = defaultDropDecisionTicker
interval := time.Duration(i.Config.GetCollectionConfig().DropDecisionSendInterval)
if interval == 0 {
interval = defaultDropDecisionTickerInterval
}
i.sendDecisions(i.dropDecisionBuffer, interval, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize, dropDecision)
}

// use a timer here so that we don't send a batch immediately after
// reaching the max batch size
timer := i.Clock.NewTimer(timerInterval)
// Unified sendDecisions function for batching and processing TraceDecisions
func (i *InMemCollector) sendDecisions(decisionChan <-chan TraceDecision, interval time.Duration, maxBatchSize int, decisionType decisionType) {
timer := i.Clock.NewTimer(interval)
defer timer.Stop()
traceIDs := make([]string, 0, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize)
decisions := make([]TraceDecision, 0, maxBatchSize)
send := false
eg := &errgroup.Group{}
ctx := context.Background()
var createDecisionMessage newDecisionMessage
var metricName, topic string
switch decisionType {
case keptDecision:
metricName = "collector_kept_decisions_batch_size"
topic = keptTraceDecisionTopic
createDecisionMessage = newKeptDecisionMessage
case dropDecision:
metricName = "collector_drop_decisions_batch_size"
topic = dropTraceDecisionTopic
createDecisionMessage = newDroppedDecisionMessage
default:
i.Logger.Error().Logf("Invalid decision type")
return // invalid decision type
}

for {
select {
case <-i.done:
eg.Wait()
return
case id, ok := <-i.dropDecisionBatch:
case td, ok := <-decisionChan:
if !ok {
eg.Wait()
return
}
// if we get a trace ID, add it to the list
traceIDs = append(traceIDs, id)
// if we exceeded the max count, we need to send
if len(traceIDs) >= i.Config.GetCollectionConfig().MaxDropDecisionBatchSize {
// Add TraceDecision to the batch
decisions = append(decisions, td)
if len(decisions) >= maxBatchSize {
send = true
}
case <-timer.Chan():
// timer fired, so send what we have
send = true
}

// if we need to send, do so
if send && len(traceIDs) > 0 {
i.Metrics.Histogram("collector_drop_decision_batch_count", len(traceIDs))
// Send the batch if ready
if send && len(decisions) > 0 {
i.Metrics.Histogram(metricName, len(decisions))

// copy the traceIDs so we can clear the list
idsToProcess := make([]string, len(traceIDs))
copy(idsToProcess, traceIDs)
// clear the list
traceIDs = traceIDs[:0]
// Copy current batch to process
decisionsToProcess := make([]TraceDecision, len(decisions))
copy(decisionsToProcess, decisions)
decisions = decisions[:0] // Reset the batch

// now process the result in a goroutine so we can keep listening
eg.Go(func() error {
select {
case <-i.done:
return nil
default:
msg, err := newDroppedDecisionMessage(idsToProcess...)
msg, err := createDecisionMessage(decisionsToProcess)
if err != nil {
i.Logger.Error().Logf("Failed to marshal dropped trace decision")
i.Logger.Error().WithFields(map[string]interface{}{
"error": err.Error(),
}).Logf("Failed to create trace decision message")
return nil
}
err = i.PubSub.Publish(context.Background(), droppedTraceDecisionTopic, msg)
err = i.PubSub.Publish(ctx, topic, msg)
if err != nil {
i.Logger.Error().Logf("Failed to publish dropped trace decision")
i.Logger.Error().WithFields(map[string]interface{}{
"error": err.Error(),
}).Logf("Failed to publish trace decision")
}
}

return nil
})

// Reset timer after send
if !timer.Stop() {
select {
case <-timer.Chan():
default:
}
}

timer.Reset(timerInterval)
timer.Reset(interval)
send = false
}
}
Expand Down
Loading

0 comments on commit 20896ee

Please sign in to comment.