Skip to content

Commit

Permalink
feat: compress kept trace decision message (#1430)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

Due to the multiplication effect of pubsub messages through Redis, we
would like to bring down the amount of network traffic sent by Redis.

One of the biggest contributors is `TraceDecison` message used to
communicate kept trace decisions.

This PR uses `snappy` to compress the data and then encoded using `gob`

Here's the benchmark results comparing to the original JSON encoding.
In the `BenchmarkCompressionSizes` test, I generated a batch of 1000
trace decisions as the input. The amount of decrease in data size is 90%
```
goos: darwin
goarch: arm64
pkg: github.com/honeycombio/refinery/collect
cpu: Apple M2 Max
BenchmarkDynamicJSONEncoding-12          	   3139	   353164 ns/op	 443032 B/op	      3 allocs/op
BenchmarkDynamicCompressedEncoding-12    	   3050	   383788 ns/op	 475333 B/op	     62 allocs/op
BenchmarkDynamicJSONDecoding-12          	    561	  2114997 ns/op	 590167 B/op	   5018 allocs/op
BenchmarkDynamicCompressedDecoding-12    	   3285	   360892 ns/op	 445667 B/op	   5244 allocs/op
BenchmarkCompressionSizes/JSON_Encoding-12         	   5000	   238046 ns/op	 164091 B/op	      2 allocs/op
--- BENCH: BenchmarkCompressionSizes/JSON_Encoding-12
    trace_decision_test.go:305: JSON Encoding: Total Batch: 1, Total Size: 160671 bytes, Average Size: 160671.00 bytes
    trace_decision_test.go:305: JSON Encoding: Total Batch: 100, Total Size: 16227771 bytes, Average Size: 162277.71 bytes
    trace_decision_test.go:305: JSON Encoding: Total Batch: 5000, Total Size: 819582771 bytes, Average Size: 163916.55 bytes
BenchmarkCompressionSizes/Snappy_Compression-12    	   5052	   236998 ns/op	 299547 B/op	     59 allocs/op
--- BENCH: BenchmarkCompressionSizes/Snappy_Compression-12
    trace_decision_test.go:319: Snappy Compression: Total Batch: 1, Total Size: 12115 bytes, Average Size: 12115.00 bytes
    trace_decision_test.go:319: Snappy Compression: Total Batch: 100, Total Size: 1223615 bytes, Average Size: 12236.15 bytes
    trace_decision_test.go:319: Snappy Compression: Total Batch: 5052, Total Size: 62428595 bytes, Average Size: 12357.20 bytes
```

I also used `sync.Pool` for both compression and decompression buffers
as well as `snappy.Writer` to reduce allocation

---------

Co-authored-by: Mike Goldsmith <[email protected]>
Co-authored-by: Kent Quirk <[email protected]>
  • Loading branch information
3 people authored Nov 15, 2024
1 parent ffb6e4c commit d2f6f2a
Show file tree
Hide file tree
Showing 5 changed files with 328 additions and 147 deletions.
10 changes: 8 additions & 2 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2254,7 +2254,10 @@ func TestSendDropDecisions(t *testing.T) {
coll.dropDecisionBuffer <- TraceDecision{TraceID: "trace1"}
close(coll.dropDecisionBuffer)
droppedMessage := <-messages
assert.Equal(t, "trace1", droppedMessage)

decompressedData, err := decompressDropDecisions([]byte(droppedMessage))
assert.NoError(t, err)
assert.Equal(t, "trace1", decompressedData)

<-closed

Expand All @@ -2278,7 +2281,10 @@ func TestSendDropDecisions(t *testing.T) {
}
close(coll.dropDecisionBuffer)
droppedMessage = <-messages
assert.Equal(t, "trace0,trace1,trace2,trace3,trace4", droppedMessage)

decompressedData, err = decompressDropDecisions([]byte(droppedMessage))
assert.NoError(t, err)
assert.Equal(t, "trace0,trace1,trace2,trace3,trace4", decompressedData)

<-closed
}
Expand Down
131 changes: 101 additions & 30 deletions collect/trace_decision.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package collect

import (
"encoding/json"
"bytes"
"encoding/gob"
"fmt"
"strings"
"sync"

"github.com/golang/snappy"
"github.com/honeycombio/refinery/collect/cache"
)

Expand Down Expand Up @@ -40,47 +43,46 @@ func newDroppedDecisionMessage(tds []TraceDecision) (string, error) {
}
}

if len(traceIDs) == 0 {
return "", fmt.Errorf("no valid trace IDs provided")
}

return strings.Join(traceIDs, ","), nil
}
func newKeptDecisionMessage(tds []TraceDecision) (string, error) {
if len(tds) == 0 {
return "", fmt.Errorf("no kept trace decisions provided")
}

data, err := json.Marshal(tds)
compressed, err := compress(strings.Join(traceIDs, ","))
if err != nil {
return "", err
}

return string(data), nil
return string(compressed), nil
}

func newDroppedTraceDecision(msg string) ([]TraceDecision, error) {
if msg == "" {
return nil, fmt.Errorf("empty drop message")
data, err := decompressDropDecisions([]byte(msg))
if err != nil {
return nil, err
}
var decisions []TraceDecision
for _, traceID := range strings.Split(msg, ",") {

traceIDs := strings.Split(data, ",")
decisions := make([]TraceDecision, 0, len(traceIDs))
for _, traceID := range traceIDs {
decisions = append(decisions, TraceDecision{
TraceID: traceID,
})
}

return decisions, nil
}

func newKeptDecisionMessage(tds []TraceDecision) (string, error) {
if len(tds) == 0 {
return "", fmt.Errorf("no kept trace decisions provided")
}
compressed, err := compress(tds)
if err != nil {
return "", err
}
return string(compressed), nil
}

func newKeptTraceDecision(msg string) ([]TraceDecision, error) {
keptDecisions := make([]TraceDecision, 0)
err := json.Unmarshal([]byte(msg), &keptDecisions)
compressed, err := decompressKeptDecisions([]byte(msg))
if err != nil {
return nil, err
}

return keptDecisions, nil
return compressed, nil
}

var _ cache.KeptTrace = &TraceDecision{}
Expand All @@ -92,16 +94,16 @@ type TraceDecision struct {
// keptDecision
Kept bool
Rate uint
SamplerKey string `json:",omitempty"`
SamplerSelector string `json:",omitempty"`
SamplerKey string
SamplerSelector string
SendReason string
HasRoot bool
Reason string
Count uint32 `json:",omitempty"` // number of spans in the trace
EventCount uint32 `json:",omitempty"` // number of span events in the trace
LinkCount uint32 `json:",omitempty"` // number of span links in the trace
Count uint32
EventCount uint32
LinkCount uint32

keptReasonIdx uint `json:",omitempty"`
keptReasonIdx uint
}

func (td *TraceDecision) DescendantCount() uint32 {
Expand Down Expand Up @@ -135,3 +137,72 @@ func (td *TraceDecision) KeptReason() uint {
func (td *TraceDecision) SetKeptReason(reasonIdx uint) {
td.keptReasonIdx = reasonIdx
}

var bufferPool = sync.Pool{
New: func() any { return new(bytes.Buffer) },
}

var snappyWriterPool = sync.Pool{
New: func() any { return snappy.NewBufferedWriter(nil) },
}

func compress(data any) ([]byte, error) {
// Get a buffer from the pool and reset it
buf := bufferPool.Get().(*bytes.Buffer)
buf.Reset()
defer bufferPool.Put(buf)

// Get a snappy writer from the pool, set it to write to the buffer, and reset it
compr := snappyWriterPool.Get().(*snappy.Writer)
compr.Reset(buf)
defer snappyWriterPool.Put(compr)

enc := gob.NewEncoder(compr)
if err := enc.Encode(data); err != nil {
return nil, err
}

// Flush snappy writer
if err := compr.Close(); err != nil {
return nil, err
}

// Copy the buffer’s bytes to avoid reuse issues when returning
return bytes.Clone(buf.Bytes()), nil
}

func decompressKeptDecisions(data []byte) ([]TraceDecision, error) {
// Get a buffer from the pool and set it up with data
buf := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buf)
buf.Reset()
buf.Write(data)

// Snappy reader to decompress data in buffer
reader := snappy.NewReader(buf)
dec := gob.NewDecoder(reader)

var tds []TraceDecision
if err := dec.Decode(&tds); err != nil {
return nil, err
}
return tds, nil
}

func decompressDropDecisions(data []byte) (string, error) {
// Get a buffer from the pool and set it up with data
buf := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buf)
buf.Reset()
buf.Write(data)

// Snappy reader to decompress data in buffer
reader := snappy.NewReader(buf)
dec := gob.NewDecoder(reader)

var traceIDs string
if err := dec.Decode(&traceIDs); err != nil {
return "", err
}
return traceIDs, nil
}
Loading

0 comments on commit d2f6f2a

Please sign in to comment.