Skip to content

Commit

Permalink
Introduce Log processor (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
pellared authored Mar 11, 2024
1 parent 8b70d18 commit 876ddd6
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 77 deletions.
43 changes: 19 additions & 24 deletions sdk/log/batcher.go → sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ const (
maxBatchSizeDefault = 512
)

var _ Exporter = (*Batcher)(nil)
var _ Processor = (*BatchingProcessor)(nil)

// Batcher is an exporter decorator
// that asynchronously exports batches of log records.
type Batcher struct {
// BatchingProcessor is an processor that asynchronously exports batches of log records.
type BatchingProcessor struct {
exporter Exporter
cfg batcherConfig

Expand All @@ -54,9 +53,9 @@ type exportRequest struct {
Result chan error
}

// NewBatchingExporter decorates the provided exporter
// NewBatchingProcessor decorates the provided exporter
// so that the log records are batched before exporting.
func NewBatchingExporter(exporter Exporter, opts ...BatchingOption) *Batcher {
func NewBatchingProcessor(exporter Exporter, opts ...BatchingOption) *BatchingProcessor {
cfg := batcherConfig{
queueSize: queueSizeDefault,
interval: intervalDefault,
Expand Down Expand Up @@ -113,7 +112,7 @@ func NewBatchingExporter(exporter Exporter, opts ...BatchingOption) *Batcher {
cfg.maxBatchSize = maxBatchSizeDefault
}

b := &Batcher{
b := &BatchingProcessor{
exporter: exporter,
cfg: cfg,
flush: make(chan exportRequest),
Expand All @@ -128,27 +127,25 @@ func NewBatchingExporter(exporter Exporter, opts ...BatchingOption) *Batcher {
return b
}

// Export batches provided log records.
func (b *Batcher) Export(ctx context.Context, records []Record) error {
// OnEmit batches provided log record.
func (b *BatchingProcessor) OnEmit(ctx context.Context, r Record) error {
if b.isShutdown.Load() {
return nil
}

defer b.mu.Unlock()
b.mu.Lock()

for _, r := range records {
if len(b.queue) == b.cfg.queueSize {
// Queue is full.
return nil
}
b.queue = append(b.queue, r)
if len(b.queue) == b.cfg.queueSize {
// Queue is full.
return nil
}
b.queue = append(b.queue, r)
return nil
}

// Shutdown flushes queued log records and shuts down the decorated expoter.
func (b *Batcher) Shutdown(ctx context.Context) error {
func (b *BatchingProcessor) Shutdown(ctx context.Context) error {
wasShutdown := b.isShutdown.Swap(true)
if wasShutdown {
return nil
Expand All @@ -159,15 +156,11 @@ func (b *Batcher) Shutdown(ctx context.Context) error {
Result: make(chan error, 1), // Heap allocation.
}
b.stop <- req
err := <-req.Result

err = errors.Join(err, b.exporter.Shutdown(ctx))

return err
return <-req.Result
}

// ForceFlush flushes queued log records and flushes the decorated expoter.
func (b *Batcher) ForceFlush(ctx context.Context) error {
func (b *BatchingProcessor) ForceFlush(ctx context.Context) error {
if b.isShutdown.Load() {
return nil
}
Expand All @@ -186,7 +179,7 @@ func (b *Batcher) ForceFlush(ctx context.Context) error {
}
}

func (b *Batcher) run() {
func (b *BatchingProcessor) run() {
defer close(b.done)

ticker := time.NewTicker(b.cfg.interval)
Expand All @@ -201,17 +194,19 @@ func (b *Batcher) run() {
}
case req := <-b.flush:
err := b.export(req.Context)
err = errors.Join(err, b.exporter.ForceFlush(req.Context))
req.Result <- err
ticker.Reset(b.cfg.interval)
case req := <-b.stop:
err := b.export(req.Context)
err = errors.Join(err, b.exporter.Shutdown(req.Context))
req.Result <- err
return
}
}
}

func (b *Batcher) export(ctx context.Context) error {
func (b *BatchingProcessor) export(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, b.cfg.timeout) // 5 heap allocations.
defer cancel()

Expand Down
41 changes: 11 additions & 30 deletions sdk/log/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"
"log/slog"
"sync"
"testing"
"time"

Expand All @@ -30,7 +29,7 @@ var (
var runs = 5

func TestZeroAllocsSimple(t *testing.T) {
provider := NewLoggerProvider(WithExporter(noopExporter{}))
provider := NewLoggerProvider(WithProcessor(NewSimpleProcessor(noopExporter{})))
t.Cleanup(func() { assert.NoError(t, provider.Shutdown(context.Background())) })
logger := slog.New(&slogHandler{provider.Logger("log/slog")})

Expand All @@ -46,7 +45,7 @@ func TestZeroAllocsSimple(t *testing.T) {
}

func TestZeroAllocsModifyProcessor(t *testing.T) {
provider := NewLoggerProvider(WithExporter(timestampDecorator{noopExporter{}}))
provider := NewLoggerProvider(WithProcessor(timestampDecorator{NewSimpleProcessor(noopExporter{})}))
t.Cleanup(func() { assert.NoError(t, provider.Shutdown(context.Background())) })
logger := slog.New(&slogHandler{provider.Logger("log/slog")})

Expand All @@ -62,7 +61,7 @@ func TestZeroAllocsModifyProcessor(t *testing.T) {
}

func TestZeroAllocsBatch(t *testing.T) {
provider := NewLoggerProvider(WithExporter(NewBatchingExporter(noopExporter{})))
provider := NewLoggerProvider(WithProcessor(NewBatchingProcessor(noopExporter{})))
t.Cleanup(func() { assert.NoError(t, provider.Shutdown(context.Background())) })
logger := slog.New(&slogHandler{provider.Logger("log/slog")})

Expand All @@ -78,7 +77,7 @@ func TestZeroAllocsBatch(t *testing.T) {
}

func TestZeroAllocsNoSpan(t *testing.T) {
provider := NewLoggerProvider(WithExporter(noopExporter{}))
provider := NewLoggerProvider(WithProcessor(NewSimpleProcessor(noopExporter{})))
t.Cleanup(func() { assert.NoError(t, provider.Shutdown(context.Background())) })
logger := slog.New(&slogHandler{provider.Logger("log/slog")})

Expand Down Expand Up @@ -193,7 +192,7 @@ func Benchmark(b *testing.B) {
} {
b.Run(call.name, func(b *testing.B) {
b.Run("Simple", func(b *testing.B) {
provider := NewLoggerProvider(WithExporter(noopExporter{}))
provider := NewLoggerProvider(WithProcessor(NewSimpleProcessor(noopExporter{})))
logger := slog.New(&slogHandler{provider.Logger("log/slog")})

b.ReportAllocs()
Expand All @@ -203,7 +202,7 @@ func Benchmark(b *testing.B) {
_ = provider.Shutdown(context.Background())
})
b.Run("Batch", func(b *testing.B) {
provider := NewLoggerProvider(WithExporter(NewBatchingExporter(noopExporter{})))
provider := NewLoggerProvider(WithProcessor(NewBatchingProcessor(noopExporter{})))
logger := slog.New(&slogHandler{provider.Logger("log/slog")})

b.ReportAllocs()
Expand Down Expand Up @@ -302,30 +301,12 @@ func (e noopExporter) ForceFlush(_ context.Context) error {
return nil
}

var pool = sync.Pool{
New: func() any {
b := make([]Record, 0, 1)
return &b
},
}

type timestampDecorator struct {
Exporter
Processor
}

func (e timestampDecorator) Export(ctx context.Context, records []Record) error {
bPtr := pool.Get().(*[]Record)
defer func() {
*bPtr = (*bPtr)[:0]
pool.Put(bPtr)
}()
b := *bPtr

for _, r := range records {
r = r.Clone()
r.SetObservedTimestamp(testTimestamp)
b = append(b, r)
}

return e.Exporter.Export(ctx, b)
func (e timestampDecorator) OnEmit(ctx context.Context, r Record) error {
r = r.Clone()
r.SetObservedTimestamp(testTimestamp)
return e.Processor.OnEmit(ctx, r)
}
4 changes: 2 additions & 2 deletions sdk/log/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type Exporter interface {
//
// Implementations must not retain the records slice.
//
// Implementations should clone the records before modifying
// them to avoid possible data races.
// Before modifying a Record, the implementation must use Record.Clone
// to create a copy that shares no state with the original.
Export(ctx context.Context, records []Record) error
// DO NOT CHANGE: any modification will not be backwards compatible and
// must never be done outside of a new major release.
Expand Down
15 changes: 2 additions & 13 deletions sdk/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"context"
"sync"
"time"

"go.opentelemetry.io/otel"
Expand All @@ -15,13 +14,6 @@ import (
"go.opentelemetry.io/otel/trace"
)

var recordsPool = sync.Pool{
New: func() any {
b := make([]Record, 1)
return &b
},
}

// Compile-time check logger implements metric.log.Logger.
var _ log.Logger = (*logger)(nil)

Expand Down Expand Up @@ -62,12 +54,9 @@ func (l *logger) Emit(ctx context.Context, r log.Record) {
return true
})

records := recordsPool.Get().(*[]Record)
(*records)[0] = record
for _, exporter := range l.provider.cfg.exporters {
if err := exporter.Export(ctx, *records); err != nil {
for _, processor := range l.provider.cfg.processors {
if err := processor.OnEmit(ctx, record); err != nil {
otel.Handle(err)
}
}
recordsPool.Put(records)
}
55 changes: 55 additions & 0 deletions sdk/log/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"context"
)

// Processor handles the processing of log records.
//
// Any of the Exporter's methods may be called concurrently with itself
// or with other methods. It is the responsibility of the Exporter to manage
// this concurrency.
type Processor interface {
// DO NOT CHANGE: any modification will not be backwards compatible and
// must never be done outside of a new major release.

// OnEmit is called when a Record is emitted.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
//
// All retry logic must be contained in this function. The SDK does not
// implement any retry logic. All errors returned by this function are
// considered unrecoverable and will be reported to a configured error
// Handler.
//
// Before modifying a Record, the implementation must use Record.Clone
// to create a copy that shares no state with the original.
OnEmit(ctx context.Context, record Record) error
// DO NOT CHANGE: any modification will not be backwards compatible and
// must never be done outside of a new major release.

// Shutdown is called when the SDK shuts down. Any cleanup or release of
// resources held by the exporter should be done in this call.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
//
// After Shutdown is called, calls to Export, Shutdown, or ForceFlush
// should perform no operation and return nil error.
Shutdown(ctx context.Context) error
// DO NOT CHANGE: any modification will not be backwards compatible and
// must never be done outside of a new major release.

// ForceFlush exports log records to the configured Exporter that have not yet
// been exported.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
ForceFlush(ctx context.Context) error
// DO NOT CHANGE: any modification will not be backwards compatible and
// must never be done outside of a new major release.
}
20 changes: 12 additions & 8 deletions sdk/log/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type LoggerProvider struct {

type providerConfig struct {
resource *resource.Resource
exporters []Exporter
processors []Processor
attributeCountLimit int
attributeValueLengthLimit int
}
Expand Down Expand Up @@ -124,7 +124,7 @@ func (p *LoggerProvider) Shutdown(ctx context.Context) error {
}

var err error
for _, exporter := range p.cfg.exporters {
for _, exporter := range p.cfg.processors {
err = exporter.Shutdown(ctx)
}
return err
Expand All @@ -137,7 +137,7 @@ func (p *LoggerProvider) ForceFlush(ctx context.Context) error {
}

var err error
for _, exporter := range p.cfg.exporters {
for _, exporter := range p.cfg.processors {
err = exporter.ForceFlush(ctx)
}
return err
Expand Down Expand Up @@ -167,15 +167,19 @@ func WithResource(res *resource.Resource) LoggerProviderOption {
})
}

// WithExporter associates Exporter with a LoggerProvider.
// WithProcessor associates Processor with a LoggerProvider.
//
// By default, if this option is not used, the LoggerProvider will perform no
// operations; no data will be exported without an Exporter.
// operations; no data will be exported without a processor.
//
// Use NewBatchingExporter to batch log records before they are exported.
func WithExporter(exporter Exporter) LoggerProviderOption {
// Each WithProcessor creates a separate pipeline. Use custom decotarators
// for advanced scenarios such as enriching with attributes.
//
// Use NewBatchingProcessor to batch log records before they are exported.
// Use NewSimpleProcessor to synchronously export log records.
func WithProcessor(processor Processor) LoggerProviderOption {
return loggerProviderOptionFunc(func(cfg providerConfig) providerConfig {
cfg.exporters = append(cfg.exporters, exporter)
cfg.processors = append(cfg.processors, processor)
return cfg
})
}
Expand Down
Loading

0 comments on commit 876ddd6

Please sign in to comment.