Skip to content

Commit

Permalink
[exporter/otlphttp] added support for configurable telemetry encoding (
Browse files Browse the repository at this point in the history
…#9276)

**Description:**
This PR adds support for encoding configuration in the `otlphttp`
exporter.

**Link to tracking Issue:** 
#6945

**Testing:** 
Updated existing tests, and added relevant tests 

**Documentation:** 
Updated the `otlphttp` docs to include the new configuration option.

---------

Co-authored-by: Yang Song <[email protected]>
  • Loading branch information
tvaintrob and songy23 authored Feb 7, 2024
1 parent ca0eab2 commit e874866
Show file tree
Hide file tree
Showing 9 changed files with 355 additions and 27 deletions.
25 changes: 25 additions & 0 deletions .chloggen/otlphttp-json-encoding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otlphttpexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for json content encoding when exporting telemetry

# One or more tracking issues or pull requests related to the change
issues: [6945]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
10 changes: 10 additions & 0 deletions exporter/otlphttpexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ The following settings can be optionally configured:
- `timeout` (default = 30s): HTTP request time limit. For details see https://golang.org/pkg/net/http/#Client
- `read_buffer_size` (default = 0): ReadBufferSize for HTTP client.
- `write_buffer_size` (default = 512 * 1024): WriteBufferSize for HTTP client.
- `encoding` (default = proto): The encoding to use for the messages (valid options: `proto`, `json`)

Example:

Expand All @@ -55,5 +56,14 @@ exporters:
compression: none
```

By default `proto` encoding is used, to change the content encoding of the message configure it as follows:

```yaml
exporters:
otlphttp:
...
encoding: json
```

The full list of settings exposed for this exporter are documented [here](./config.go)
with detailed sample configurations [here](./testdata/config.yaml).
34 changes: 34 additions & 0 deletions exporter/otlphttpexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,45 @@
package otlphttpexporter // import "go.opentelemetry.io/collector/exporter/otlphttpexporter"

import (
"encoding"
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// EncodingType defines the type for content encoding
type EncodingType string

const (
EncodingProto EncodingType = "proto"
EncodingJSON EncodingType = "json"
)

var _ encoding.TextUnmarshaler = (*EncodingType)(nil)

// UnmarshalText unmarshalls text to an EncodingType.
func (e *EncodingType) UnmarshalText(text []byte) error {
if e == nil {
return errors.New("cannot unmarshal to a nil *EncodingType")
}

str := string(text)
switch str {
case string(EncodingProto):
*e = EncodingProto
case string(EncodingJSON):
*e = EncodingJSON
default:
return fmt.Errorf("invalid encoding type: %s", str)
}

return nil
}

// Config defines configuration for OTLP/HTTP exporter.
type Config struct {
confighttp.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
Expand All @@ -26,6 +57,9 @@ type Config struct {

// The URL to send logs to. If omitted the Endpoint + "/v1/logs" will be used.
LogsEndpoint string `mapstructure:"logs_endpoint"`

// The encoding to export telemetry (default: "proto")
Encoding EncodingType `mapstructure:"encoding"`
}

var _ component.Config = (*Config)(nil)
Expand Down
55 changes: 55 additions & 0 deletions exporter/otlphttpexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func TestUnmarshalConfig(t *testing.T) {
NumConsumers: 2,
QueueSize: 10,
},
Encoding: EncodingProto,
ClientConfig: confighttp.ClientConfig{
Headers: map[string]configopaque.String{
"can you have a . here?": "F0000000-0000-0000-0000-000000000000",
Expand All @@ -73,3 +74,57 @@ func TestUnmarshalConfig(t *testing.T) {
},
}, cfg)
}

func TestUnmarshalConfigInvalidEncoding(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "bad_invalid_encoding.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.Error(t, component.UnmarshalConfig(cm, cfg))
}

func TestUnmarshalEncoding(t *testing.T) {
tests := []struct {
name string
encodingBytes []byte
expected EncodingType
shouldError bool
}{
{
name: "UnmarshalEncodingProto",
encodingBytes: []byte("proto"),
expected: EncodingProto,
shouldError: false,
},
{
name: "UnmarshalEncodingJson",
encodingBytes: []byte("json"),
expected: EncodingJSON,
shouldError: false,
},
{
name: "UnmarshalEmptyEncoding",
encodingBytes: []byte(""),
shouldError: true,
},
{
name: "UnmarshalInvalidEncoding",
encodingBytes: []byte("invalid"),
shouldError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var encoding EncodingType
err := encoding.UnmarshalText(tt.encodingBytes)

if tt.shouldError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.expected, encoding)
}
})
}
}
1 change: 1 addition & 0 deletions exporter/otlphttpexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func createDefaultConfig() component.Config {
return &Config{
RetryConfig: configretry.NewDefaultBackOffConfig(),
QueueConfig: exporterhelper.NewDefaultQueueSettings(),
Encoding: EncodingProto,
ClientConfig: confighttp.ClientConfig{
Endpoint: "",
Timeout: 30 * time.Second,
Expand Down
15 changes: 15 additions & 0 deletions exporter/otlphttpexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestCreateDefaultConfig(t *testing.T) {
assert.Equal(t, ocfg.RetryConfig.InitialInterval, 5*time.Second, "default retry InitialInterval")
assert.Equal(t, ocfg.RetryConfig.MaxInterval, 30*time.Second, "default retry MaxInterval")
assert.Equal(t, ocfg.QueueConfig.Enabled, true, "default sending queue is enabled")
assert.Equal(t, ocfg.Encoding, EncodingProto)
assert.Equal(t, ocfg.Compression, configcompression.TypeGzip)
}

Expand Down Expand Up @@ -154,6 +155,20 @@ func TestCreateTracesExporter(t *testing.T) {
},
},
},
{
name: "ProtoEncoding",
config: &Config{
Encoding: EncodingProto,
ClientConfig: confighttp.ClientConfig{Endpoint: endpoint},
},
},
{
name: "JSONEncoding",
config: &Config{
Encoding: EncodingJSON,
ClientConfig: confighttp.ClientConfig{Endpoint: endpoint},
},
},
}

for _, tt := range tests {
Expand Down
113 changes: 89 additions & 24 deletions exporter/otlphttpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
headerRetryAfter = "Retry-After"
maxHTTPResponseReadBytes = 64 * 1024

jsonContentType = "application/json"
protobufContentType = "application/x-protobuf"
)

Expand Down Expand Up @@ -87,7 +88,18 @@ func (e *baseExporter) start(_ context.Context, host component.Host) error {

func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
tr := ptraceotlp.NewExportRequestFromTraces(td)
request, err := tr.MarshalProto()

var err error
var request []byte
switch e.config.Encoding {
case EncodingJSON:
request, err = tr.MarshalJSON()
case EncodingProto:
request, err = tr.MarshalProto()
default:
err = fmt.Errorf("invalid encoding: %s", e.config.Encoding)
}

if err != nil {
return consumererror.NewPermanent(err)
}
Expand All @@ -97,7 +109,18 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {

func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error {
tr := pmetricotlp.NewExportRequestFromMetrics(md)
request, err := tr.MarshalProto()

var err error
var request []byte
switch e.config.Encoding {
case EncodingJSON:
request, err = tr.MarshalJSON()
case EncodingProto:
request, err = tr.MarshalProto()
default:
err = fmt.Errorf("invalid encoding: %s", e.config.Encoding)
}

if err != nil {
return consumererror.NewPermanent(err)
}
Expand All @@ -106,7 +129,18 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro

func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
tr := plogotlp.NewExportRequestFromLogs(ld)
request, err := tr.MarshalProto()

var err error
var request []byte
switch e.config.Encoding {
case EncodingJSON:
request, err = tr.MarshalJSON()
case EncodingProto:
request, err = tr.MarshalProto()
default:
err = fmt.Errorf("invalid encoding: %s", e.config.Encoding)
}

if err != nil {
return consumererror.NewPermanent(err)
}
Expand All @@ -120,7 +154,16 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p
if err != nil {
return consumererror.NewPermanent(err)
}
req.Header.Set("Content-Type", protobufContentType)

switch e.config.Encoding {
case EncodingJSON:
req.Header.Set("Content-Type", jsonContentType)
case EncodingProto:
req.Header.Set("Content-Type", protobufContentType)
default:
return fmt.Errorf("invalid encoding: %s", e.config.Encoding)
}

req.Header.Set("User-Agent", e.userAgent)

resp, err := e.client.Do(req)
Expand Down Expand Up @@ -231,7 +274,6 @@ func readResponseStatus(resp *http.Response) *status.Status {
// "Response body for all HTTP 4xx and HTTP 5xx responses MUST be a
// Protobuf-encoded Status message that describes the problem."
respBytes, err := readResponseBody(resp)

if err != nil {
return nil
}
Expand All @@ -249,7 +291,6 @@ func readResponseStatus(resp *http.Response) *status.Status {

func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler partialSuccessHandler) error {
bodyBytes, err := readResponseBody(resp)

if err != nil {
return err
}
Expand All @@ -260,14 +301,22 @@ func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler par
type partialSuccessHandler func(bytes []byte, contentType string) error

func (e *baseExporter) tracesPartialSuccessHandler(protoBytes []byte, contentType string) error {
if contentType != protobufContentType {
return nil
}
exportResponse := ptraceotlp.NewExportResponse()
err := exportResponse.UnmarshalProto(protoBytes)
if err != nil {
return fmt.Errorf("error parsing protobuf response: %w", err)
switch contentType {
case protobufContentType:
err := exportResponse.UnmarshalProto(protoBytes)
if err != nil {
return fmt.Errorf("error parsing protobuf response: %w", err)
}
case jsonContentType:
err := exportResponse.UnmarshalJSON(protoBytes)
if err != nil {
return fmt.Errorf("error parsing json response: %w", err)
}
default:
return nil
}

partialSuccess := exportResponse.PartialSuccess()
if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedSpans() == 0) {
e.logger.Warn("Partial success response",
Expand All @@ -279,14 +328,22 @@ func (e *baseExporter) tracesPartialSuccessHandler(protoBytes []byte, contentTyp
}

func (e *baseExporter) metricsPartialSuccessHandler(protoBytes []byte, contentType string) error {
if contentType != protobufContentType {
return nil
}
exportResponse := pmetricotlp.NewExportResponse()
err := exportResponse.UnmarshalProto(protoBytes)
if err != nil {
return fmt.Errorf("error parsing protobuf response: %w", err)
switch contentType {
case protobufContentType:
err := exportResponse.UnmarshalProto(protoBytes)
if err != nil {
return fmt.Errorf("error parsing protobuf response: %w", err)
}
case jsonContentType:
err := exportResponse.UnmarshalJSON(protoBytes)
if err != nil {
return fmt.Errorf("error parsing json response: %w", err)
}
default:
return nil
}

partialSuccess := exportResponse.PartialSuccess()
if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedDataPoints() == 0) {
e.logger.Warn("Partial success response",
Expand All @@ -298,14 +355,22 @@ func (e *baseExporter) metricsPartialSuccessHandler(protoBytes []byte, contentTy
}

func (e *baseExporter) logsPartialSuccessHandler(protoBytes []byte, contentType string) error {
if contentType != protobufContentType {
return nil
}
exportResponse := plogotlp.NewExportResponse()
err := exportResponse.UnmarshalProto(protoBytes)
if err != nil {
return fmt.Errorf("error parsing protobuf response: %w", err)
switch contentType {
case protobufContentType:
err := exportResponse.UnmarshalProto(protoBytes)
if err != nil {
return fmt.Errorf("error parsing protobuf response: %w", err)
}
case jsonContentType:
err := exportResponse.UnmarshalJSON(protoBytes)
if err != nil {
return fmt.Errorf("error parsing json response: %w", err)
}
default:
return nil
}

partialSuccess := exportResponse.PartialSuccess()
if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) {
e.logger.Warn("Partial success response",
Expand Down
Loading

0 comments on commit e874866

Please sign in to comment.