Skip to content

Commit

Permalink
feat: Rename EnableTraceLocality to DisableTraceLocality (#1442)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

To allow users to opt-in safely for disabling trace locality, we're
renaming the option to disable trace locality.

## Short description of the changes
- Rename `EnableTraceLocality` to `DisableTraceLocality`

---------

Co-authored-by: Yingrong Zhao <[email protected]>
Co-authored-by: Kent Quirk <[email protected]>
  • Loading branch information
3 people authored Nov 20, 2024
1 parent 48fcac0 commit 3d430fe
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 69 deletions.
9 changes: 4 additions & 5 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,9 @@ func defaultConfig(basePort int, redisDB int) *config.MockConfig {
GetPeerListenAddrVal: "127.0.0.1:" + strconv.Itoa(basePort+1),
GetHoneycombAPIVal: "http://api.honeycomb.io",
GetCollectionConfigVal: config.CollectionConfig{
CacheCapacity: 10000,
ShutdownDelay: config.Duration(1 * time.Second),
HealthCheckTimeout: config.Duration(3 * time.Second),
EnableTraceLocality: true,
CacheCapacity: 10000,
ShutdownDelay: config.Duration(1 * time.Second),
HealthCheckTimeout: config.Duration(3 * time.Second),
},
TraceIdFieldNames: []string{"trace.trace_id"},
ParentIdFieldNames: []string{"trace.parent_id"},
Expand Down Expand Up @@ -767,7 +766,7 @@ func TestPeerRouting_TraceLocalityDisabled(t *testing.T) {
redisDB := 12 + i
cfg := defaultConfig(basePort, redisDB)
collectionCfg := cfg.GetCollectionConfig()
collectionCfg.EnableTraceLocality = false
collectionCfg.DisableTraceLocality = true
cfg.GetCollectionConfigVal = collectionCfg

apps[i], graph = newStartedApp(t, senders[i], nil, peers, cfg)
Expand Down
53 changes: 32 additions & 21 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (i *InMemCollector) Start() error {
i.Peers.RegisterUpdatedPeersCallback(i.redistributeTimer.Reset)
}

if !i.Config.GetCollectionConfig().EnableTraceLocality {
if i.Config.GetCollectionConfig().DisableTraceLocality {
i.keptDecisionMessages = make(chan string, decisionMessageBufferSize)
i.dropDecisionMessages = make(chan string, decisionMessageBufferSize)
i.PubSub.Subscribe(context.Background(), keptTraceDecisionTopic, i.signalKeptTraceDecisions)
Expand Down Expand Up @@ -524,7 +524,7 @@ func (i *InMemCollector) redistributeTraces(ctx context.Context) {

for _, sp := range trace.GetSpans() {

if !i.Config.GetCollectionConfig().EnableTraceLocality {
if i.Config.GetCollectionConfig().DisableTraceLocality {
dc := i.createDecisionSpan(sp, trace, newTarget)
i.PeerTransmission.EnqueueEvent(dc)
continue
Expand Down Expand Up @@ -664,17 +664,26 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span, source
span.End()
}()

targetShard, isMyTrace := i.IsMyTrace(sp.TraceID)
// if the span is a decision span and the trace no longer belong to us, we should not forward it to the peer
if !isMyTrace && sp.IsDecisionSpan() {
return
}

var (
targetShard sharder.Shard
isMyTrace bool
)
// if trace locality is enabled, we should forward all spans to its correct peer
if i.Config.GetCollectionConfig().EnableTraceLocality && !targetShard.Equals(i.Sharder.MyShard()) {
sp.APIHost = targetShard.GetAddress()
i.PeerTransmission.EnqueueSpan(sp)
return
if i.Config.GetCollectionConfig().DisableTraceLocality {
targetShard, isMyTrace = i.IsMyTrace(sp.TraceID)
// if the span is a decision span and the trace no longer belong to us, we should not forward it to the peer
if !isMyTrace && sp.IsDecisionSpan() {
return
}

} else {
targetShard = i.Sharder.WhichShard(sp.TraceID)
isMyTrace = true
if !targetShard.Equals(i.Sharder.MyShard()) {
sp.APIHost = targetShard.GetAddress()
i.PeerTransmission.EnqueueSpan(sp)
return
}
}

tcfg := i.Config.GetTracesConfig()
Expand Down Expand Up @@ -1060,7 +1069,7 @@ func (i *InMemCollector) Stop() error {
close(i.fromPeer)
close(i.outgoingTraces)

if !i.Config.GetCollectionConfig().EnableTraceLocality {
if i.Config.GetCollectionConfig().DisableTraceLocality {
close(i.dropDecisionBuffer)
close(i.keptDecisionBuffer)
}
Expand Down Expand Up @@ -1518,22 +1527,24 @@ func (i *InMemCollector) makeDecision(ctx context.Context, trace *types.Trace, s
HasRoot: hasRoot,
}

if !i.Config.GetCollectionConfig().EnableTraceLocality {
if i.Config.GetCollectionConfig().DisableTraceLocality {
i.publishTraceDecision(ctx, td)
}

return &td, nil
}

func (i *InMemCollector) IsMyTrace(traceID string) (sharder.Shard, bool) {
// if trace locality is enabled, we should always process the trace
if i.Config.GetCollectionConfig().EnableTraceLocality {
return i.Sharder.MyShard(), true
// if trace locality is disabled, we should only process
// traces that belong to the current refinery
if i.Config.GetCollectionConfig().DisableTraceLocality {
targeShard := i.Sharder.WhichShard(traceID)

return targeShard, i.Sharder.MyShard().Equals(targeShard)
}

targeShard := i.Sharder.WhichShard(traceID)
return i.Sharder.MyShard(), true

return targeShard, i.Sharder.MyShard().Equals(targeShard)
}

func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecision) {
Expand Down Expand Up @@ -1567,7 +1578,7 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis
}

func (i *InMemCollector) sendKeptDecisions() {
if i.Config.GetCollectionConfig().EnableTraceLocality {
if !i.Config.GetCollectionConfig().DisableTraceLocality {
return
}
interval := time.Duration(i.Config.GetCollectionConfig().KeptDecisionSendInterval)
Expand All @@ -1578,7 +1589,7 @@ func (i *InMemCollector) sendKeptDecisions() {
}

func (i *InMemCollector) sendDropDecisions() {
if i.Config.GetCollectionConfig().EnableTraceLocality {
if !i.Config.GetCollectionConfig().DisableTraceLocality {
return
}
interval := time.Duration(i.Config.GetCollectionConfig().DropDecisionSendInterval)
Expand Down
63 changes: 38 additions & 25 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"math/rand"
"runtime"
"slices"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -99,7 +98,7 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission, pe
redistributeTimer: redistributeNotifier,
}

if !conf.GetCollectionConfig().EnableTraceLocality {
if conf.GetCollectionConfig().DisableTraceLocality {
localPubSub.Subscribe(context.Background(), keptTraceDecisionTopic, c.signalKeptTraceDecisions)
}

Expand Down Expand Up @@ -195,6 +194,12 @@ func TestAddRootSpan(t *testing.T) {

assert.Nil(t, coll.getFromCache(traceID1), "after sending the span, it should be removed from the cache")

conf.Mux.Lock()
collectionCfg := conf.GetCollectionConfigVal
collectionCfg.DisableTraceLocality = true
conf.GetCollectionConfigVal = collectionCfg
conf.Mux.Unlock()

decisionSpanTraceID := "decision_root_span"
span = &types.Span{
TraceID: decisionSpanTraceID,
Expand Down Expand Up @@ -522,8 +527,8 @@ func TestDryRunMode(t *testing.T) {
DryRun: true,
ParentIdFieldNames: []string{"trace.parent_id", "parentId"},
GetCollectionConfigVal: config.CollectionConfig{
ShutdownDelay: config.Duration(1 * time.Millisecond),
EnableTraceLocality: true,
ShutdownDelay: config.Duration(1 * time.Millisecond),
DisableTraceLocality: true,
},
}
transmission := &transmit.MockTransmission{}
Expand Down Expand Up @@ -793,13 +798,14 @@ func TestStableMaxAlloc(t *testing.T) {
},
}

if i < 3 {
// add some spans that belongs to peer
span.TraceID = peerTraceIDs[i]
// add extrac data so that the peer traces have bigger
// cache impact, which will get evicted first
span.Data["extra_data"] = strings.Repeat("abc", 100)
}
// TODO: enable this once we want to turn on DisableTraceLocality
// if i < 3 {
// // add some spans that belongs to peer
// span.TraceID = peerTraceIDs[i]
// // add extrac data so that the peer traces have bigger
// // cache impact, which will get evicted first
// span.Data["extra_data"] = strings.Repeat("abc", 100)
// }

coll.AddSpan(span)
}
Expand All @@ -819,13 +825,15 @@ func TestStableMaxAlloc(t *testing.T) {
runtime.ReadMemStats(&mem)
// Set MaxAlloc, which should cause cache evictions.
conf.GetCollectionConfigVal.MaxAlloc = config.MemorySize(mem.Alloc * 99 / 100)
orphanPeerTrace := coll.cache.Get(peerTraceIDs[0])

orphanPeerTrace.SendBy = coll.Clock.Now().Add(-conf.GetTracesConfig().GetTraceTimeout() * 5)
peerSpan := orphanPeerTrace.GetSpans()[0]
// cache impact is also calculated based on the arrival time
peerSpan.ArrivalTime = coll.Clock.Now().Add(-conf.GetTracesConfig().GetTraceTimeout())
assert.True(t, orphanPeerTrace.IsOrphan(conf.GetTracesConfig().GetTraceTimeout(), coll.Clock.Now()))
// TODO: enable this once we want to turn on DisableTraceLocality
// orphanPeerTrace := coll.cache.Get(peerTraceIDs[0])
//
// orphanPeerTrace.SendBy = coll.Clock.Now().Add(-conf.GetTracesConfig().GetTraceTimeout() * 5)
// peerSpan := orphanPeerTrace.GetSpans()[0]
// // cache impact is also calculated based on the arrival time
// peerSpan.ArrivalTime = coll.Clock.Now().Add(-conf.GetTracesConfig().GetTraceTimeout())
// assert.True(t, orphanPeerTrace.IsOrphan(conf.GetTracesConfig().GetTraceTimeout(), coll.Clock.Now()))

coll.mutex.Unlock()
// wait for the cache to take some action
Expand All @@ -844,13 +852,14 @@ func TestStableMaxAlloc(t *testing.T) {
tracesLeft := len(traces)
assert.Less(t, tracesLeft, 480, "should have sent some traces")
assert.Greater(t, tracesLeft, 100, "should have NOT sent some traces")
peerTracesLeft := 0
for _, trace := range traces {
if slices.Contains(peerTraceIDs, trace.TraceID) {
peerTracesLeft++
}
}
assert.Equal(t, 2, peerTracesLeft, "should have kept the peer traces")
// TODO: enable this once we want to turn on DisableTraceLocality
// peerTracesLeft := 0
// for _, trace := range traces {
// if slices.Contains(peerTraceIDs, trace.TraceID) {
// peerTracesLeft++
// }
// }
// assert.Equal(t, 2, peerTracesLeft, "should have kept the peer traces")
coll.mutex.Unlock()

// We discarded the most costly spans, and sent them.
Expand Down Expand Up @@ -2224,7 +2233,8 @@ func TestSendDropDecisions(t *testing.T) {
GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1},
ParentIdFieldNames: []string{"trace.parent_id", "parentId"},
GetCollectionConfigVal: config.CollectionConfig{
ShutdownDelay: config.Duration(1 * time.Millisecond),
ShutdownDelay: config.Duration(1 * time.Millisecond),
DisableTraceLocality: true,
},
}
transmission := &transmit.MockTransmission{}
Expand Down Expand Up @@ -2307,6 +2317,9 @@ func TestExpiredTracesCleanup(t *testing.T) {
TraceTimeout: config.Duration(500 * time.Millisecond),
MaxBatchSize: 1500,
},
GetCollectionConfigVal: config.CollectionConfig{
DisableTraceLocality: true,
},
GetSamplerTypeVal: &config.DeterministicSamplerConfig{SampleRate: 1},
AddSpanCountToRoot: true,
AddCountsToRoot: true,
Expand Down
10 changes: 5 additions & 5 deletions collect/stressRelief.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,12 +424,12 @@ func (s *StressRelief) Recalc() uint {
s.lock.Lock()
defer s.lock.Unlock()

overallStressLevel := clusterStressLevel
// The overall stress level is the max of the individual and cluster stress levels
// If a single node is under significant stress, it can activate stress relief mode
overallStressLevel := uint(math.Max(float64(clusterStressLevel), float64(localLevel)))

if s.Config.GetCollectionConfig().EnableTraceLocality {
// The overall stress level is the max of the individual and cluster stress levels
// If a single node is under significant stress, it can activate stress relief mode
overallStressLevel = uint(math.Max(float64(clusterStressLevel), float64(localLevel)))
if s.Config.GetCollectionConfig().DisableTraceLocality {
overallStressLevel = clusterStressLevel
}
s.overallStressLevel = overallStressLevel
s.RefineryMetrics.Gauge("stress_level", s.overallStressLevel)
Expand Down
8 changes: 4 additions & 4 deletions collect/stress_relief_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestStressRelief_Peer(t *testing.T) {
}, 2*time.Second, 100*time.Millisecond, "stress relief should be false")
}

func TestStressRelief_OverallStressLevel(t *testing.T) {
func TestStressRelief_OverallStressLevel_DisableTraceLocality(t *testing.T) {
clock := clockwork.NewFakeClock()
sr, stop := newStressRelief(t, clock, nil)
defer stop()
Expand All @@ -157,6 +157,9 @@ func TestStressRelief_OverallStressLevel(t *testing.T) {
DeactivationLevel: 65,
MinimumActivationDuration: config.Duration(5 * time.Second),
},
GetCollectionConfigVal: config.CollectionConfig{
DisableTraceLocality: true,
},
}
// On startup, the stress relief should not be active
sr.UpdateFromConfig()
Expand Down Expand Up @@ -239,9 +242,6 @@ func TestStressRelief_OverallStressLevel_EnableTraceLocality(t *testing.T) {
DeactivationLevel: 65,
MinimumActivationDuration: config.Duration(5 * time.Second),
},
GetCollectionConfigVal: config.CollectionConfig{
EnableTraceLocality: true,
},
}
// On startup, the stress relief should not be active
sr.UpdateFromConfig()
Expand Down
4 changes: 2 additions & 2 deletions config/file_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,8 @@ type CollectionConfig struct {
DisableRedistribution bool `yaml:"DisableRedistribution"`
RedistributionDelay Duration `yaml:"RedistributionDelay" default:"30s"`

ShutdownDelay Duration `yaml:"ShutdownDelay" default:"15s"`
EnableTraceLocality bool `yaml:"EnableTraceLocality"`
ShutdownDelay Duration `yaml:"ShutdownDelay" default:"15s"`
DisableTraceLocality bool `yaml:"DisableTraceLocality"`

MaxDropDecisionBatchSize int `yaml:"MaxDropDecisionBatchSize" default:"1000"`
DropDecisionSendInterval Duration `yaml:"DropDecisionSendInterval" default:"1s"`
Expand Down
21 changes: 15 additions & 6 deletions config/metadata/configMeta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,9 @@ groups:
`meta.refinery.dryrun.sample_rate` will be set to the sample rate
that would have been used.
NOTE: This settng is not compatible with `EnableTraceLocality` set to
false because drop trace decisions shared among peers does not contain
all relevant information to send traces to Honeycomb.
NOTE: This setting is not compatible with `DisableTraceLocality=true`,
because drop trace decisions shared among peers do not contain all
the relevant information needed to send traces to Honeycomb.
- name: Logger
title: "Refinery Logger"
Expand Down Expand Up @@ -1340,17 +1340,26 @@ groups:
This value should be set to a bit less than the normal timeout period
for shutting down without forcibly terminating the process.
- name: EnableTraceLocality
- name: DisableTraceLocality
type: bool
valuetype: nondefault
firstversion: v2.9
default: false
reload: false
summary: controls whether all spans that belongs to the same trace are sent to a single Refinery for processing.
description: >
If `true`, Refinery's will route all spans that belongs to the same trace to a single peer.
When `false`, Refinery will route all spans that belong to the same trace to a single peer. This is the
default behavior ("Trace Locality") and the way Refinery has worked in the past. When `true`, Refinery
will instead keep spans on the node where they were received, and forward proxy spans that contain only
the key information needed to make a trace decision. This can reduce the amount of traffic between peers
in most cases, and can help avoid a situation where a single large trace can cause a memory overrun on
a single node.
NOTE: This setting is not compatible with `DryRun` when set to false. See `DryRun` for more information.
If `true`, the amount of traffic between peers will be reduced, but the amount of traffic between Refinery
and Redis will significantly increase, because Refinery uses Redis to distribute the trace decisions to all
nodes in the cluster. It is important to adjust the size of the Redis cluster in this case.
NOTE: This setting is not compatible with `DryRun` when set to true. See `DryRun` for more information.
- name: HealthCheckTimeout
type: duration
Expand Down
2 changes: 1 addition & 1 deletion route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error {
}
}

if r.Config.GetCollectionConfig().EnableTraceLocality {
if !r.Config.GetCollectionConfig().DisableTraceLocality {
// Figure out if we should handle this span locally or pass on to a peer
targetShard := r.Sharder.WhichShard(traceID)
if !targetShard.Equals(r.Sharder.MyShard()) {
Expand Down

0 comments on commit 3d430fe

Please sign in to comment.