diff --git a/README.md b/README.md index 794782e8..1af09c59 100644 --- a/README.md +++ b/README.md @@ -124,6 +124,37 @@ receivers: serverName: # optional, the domain, the certificate was issued for, in case it doesn't match the hostname used for the connection caFile: # optional, path to the CA file of the trusted authority the cert was signed with ``` +### OpenSsearch + +[OpenSearch](https://opensearch.org/) is a community-driven, open source search and analytics suite derived from Apache 2.0 licensed Elasticsearch 7.10.2 & Kibana 7.10.2. +OpenSearch enables people to easily ingest, secure, search, aggregate, view, and analyze data. These capabilities are popular for use cases such as application search, log analytics, and more. +You may decide to push all events to OpenSearch and do some interesting queries over time to find out +which images are pulled, how often pod schedules happen etc. + +```yaml +# ... +receivers: + - name: "dump" + opensearch: + hosts: + - http://localhost:9200 + index: kube-events + # Ca be used optionally for time based indices, accepts Go time formatting directives + indexFormat: "kube-events-{2006-01-02}" + username: # optional + password: # optional + # If set to true, it allows updating the same document in ES (might be useful handling count) + useEventID: true|false + # Type should be only used for clusters Version 6 and lower. + # type: kube-event + # If set to true, all dots in labels and annotation keys are replaced by underscores. Defaults false + deDot: true|false + layout: # Optional + tls: # optional, advanced options for tls + insecureSkipVerify: true|false # optional, if set to true, the tls cert won't be verified + serverName: # optional, the domain, the certificate was issued for, in case it doesn't match the hostname used for the connection + caFile: # optional, path to the CA file of the trusted authority the cert was signed with +``` ### Slack diff --git a/config.example.yaml b/config.example.yaml index a2815627..43eaf02c 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -27,6 +27,11 @@ receivers: hosts: - "http://localhost:9200" indexFormat: "kube-events-{2006-01-02}" + - name: "opensearch-dump" + opensearch: + hosts: + - "http://localhost:9200" + indexFormat: "kube-events-{2006-01-02}" - name: "alert" opsgenie: apiKey: "" diff --git a/go.mod b/go.mod index 9bae91a6..e5bc7f1a 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/elastic/go-elasticsearch/v7 v7.4.1 github.com/hashicorp/golang-lru v0.5.3 github.com/linkedin/goavro/v2 v2.10.1 + github.com/opensearch-project/opensearch-go v1.0.0 github.com/opsgenie/opsgenie-go-sdk-v2 v1.0.3 github.com/rs/zerolog v1.16.0 github.com/slack-go/slack v0.9.1 diff --git a/go.sum b/go.sum index 412aac27..93e3bbc7 100644 --- a/go.sum +++ b/go.sum @@ -245,6 +245,8 @@ github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9k github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/opensearch-project/opensearch-go v1.0.0 h1:8Gh7B7Un5BxuxWAgmzleEF7lpOtC71pCgPp7lKr3ca8= +github.com/opensearch-project/opensearch-go v1.0.0/go.mod h1:FrUl/52DBegRYvK7ISF278AXmjDV647lyTnsLGBR7J4= github.com/opsgenie/opsgenie-go-sdk-v2 v1.0.3 h1:XAqJ0IIb/Q/mss3OMrXInA6KOQzGOrZRtJIY7qWpSxQ= github.com/opsgenie/opsgenie-go-sdk-v2 v1.0.3/go.mod h1:f0ezb0R/mrB9Hpm5RrIS6EX3ydjsR2nAB88nYYXZcNY= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= diff --git a/pkg/sinks/opensearch.go b/pkg/sinks/opensearch.go new file mode 100644 index 00000000..83942176 --- /dev/null +++ b/pkg/sinks/opensearch.go @@ -0,0 +1,146 @@ +package sinks + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + opensearch "github.com/opensearch-project/opensearch-go" + opensearchapi "github.com/opensearch-project/opensearch-go/opensearchapi" + "github.com/opsgenie/kubernetes-event-exporter/pkg/kube" + "github.com/rs/zerolog/log" + "io/ioutil" + "net/http" + "regexp" + "strings" + "time" +) + +type OpenSearchConfig struct { + // Connection specific + Hosts []string `yaml:"hosts"` + Username string `yaml:"username"` + Password string `yaml:"password"` + // Indexing preferences + UseEventID bool `yaml:"useEventID"` + // DeDot all labels and annotations in the event. For both the event and the involvedObject + DeDot bool `yaml:"deDot"` + Index string `yaml:"index"` + IndexFormat string `yaml:"indexFormat"` + Type string `yaml:"type"` + TLS TLS `yaml:"tls"` + Layout map[string]interface{} `yaml:"layout"` +} + +func NewOpenSearch(cfg *OpenSearchConfig) (*OpenSearch, error) { + + tlsClientConfig, err := setupTLS(&cfg.TLS) + if err != nil { + return nil, fmt.Errorf("failed to setup TLS: %w", err) + } + + client, err := opensearch.NewClient(opensearch.Config{ + Addresses: cfg.Hosts, + Username: cfg.Username, + Password: cfg.Password, + Transport: &http.Transport{ + TLSClientConfig: tlsClientConfig, + }, + }) + if err != nil { + return nil, err + } + + return &OpenSearch{ + client: client, + cfg: cfg, + }, nil +} + +type OpenSearch struct { + client *opensearch.Client + cfg *OpenSearchConfig +} + +var osRegex = regexp.MustCompile(`(?s){(.*)}`) + +func osFormatIndexName(pattern string, when time.Time) string { + m := osRegex.FindAllStringSubmatchIndex(pattern, -1) + current := 0 + var builder strings.Builder + + for i := 0; i < len(m); i++ { + pair := m[i] + + builder.WriteString(pattern[current:pair[0]]) + builder.WriteString(when.Format(pattern[pair[0]+1 : pair[1]-1])) + current = pair[1] + } + + builder.WriteString(pattern[current:]) + + return builder.String() +} + +func (e *OpenSearch) Send(ctx context.Context, ev *kube.EnhancedEvent) error { + var toSend []byte + + if e.cfg.DeDot { + de := ev.DeDot() + ev = &de + } + if e.cfg.Layout != nil { + res, err := convertLayoutTemplate(e.cfg.Layout, ev) + if err != nil { + return err + } + + toSend, err = json.Marshal(res) + if err != nil { + return err + } + } else { + toSend = ev.ToJSON() + } + + var index string + if len(e.cfg.IndexFormat) > 0 { + now := time.Now() + index = osFormatIndexName(e.cfg.IndexFormat, now) + } else { + index = e.cfg.Index + } + + req := opensearchapi.IndexRequest{ + Body: bytes.NewBuffer(toSend), + Index: index, + } + + // This should not be used for clusters with ES8.0+. + if len(e.cfg.Type) > 0 { + req.DocumentType = e.cfg.Type + } + + if e.cfg.UseEventID { + req.DocumentID = string(ev.UID) + } + + resp, err := req.Do(ctx, e.client) + if err != nil { + return err + } + + defer resp.Body.Close() + if resp.StatusCode > 399 { + rb, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + log.Error().Msgf("Indexing failed: %s", string(rb)) + } + return nil +} + +func (e *OpenSearch) Close() { + // No-op +} diff --git a/pkg/sinks/receiver.go b/pkg/sinks/receiver.go index 508441ea..2bb49048 100644 --- a/pkg/sinks/receiver.go +++ b/pkg/sinks/receiver.go @@ -13,6 +13,7 @@ type ReceiverConfig struct { Elasticsearch *ElasticsearchConfig `yaml:"elasticsearch"` Kinesis *KinesisConfig `yaml:"kinesis"` Firehose *FirehoseConfig `yaml:"firehose"` + OpenSearch *OpenSearchConfig `yaml:"opensearch"` Opsgenie *OpsgenieConfig `yaml:"opsgenie"` SQS *SQSConfig `yaml:"sqs"` SNS *SNSConfig `yaml:"sns"` @@ -72,6 +73,10 @@ func (r *ReceiverConfig) GetSink() (Sink, error) { return NewFirehoseSink(r.Firehose) } + if r.OpenSearch != nil { + return NewOpenSearch(r.OpenSearch) + } + if r.Opsgenie != nil { return NewOpsgenieSink(r.Opsgenie) } diff --git a/vendor/modules.txt b/vendor/modules.txt index 160fb002..484e1c4c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -206,6 +206,12 @@ github.com/modern-go/concurrent # github.com/modern-go/reflect2 v1.0.1 ## explicit github.com/modern-go/reflect2 +# github.com/opensearch-project/opensearch-go v1.0.0 +## explicit; go 1.11 +github.com/opensearch-project/opensearch-go +github.com/opensearch-project/opensearch-go/internal/version +github.com/opensearch-project/opensearch-go/opensearchapi +github.com/opensearch-project/opensearch-go/opensearchtransport # github.com/opsgenie/opsgenie-go-sdk-v2 v1.0.3 ## explicit; go 1.12 github.com/opsgenie/opsgenie-go-sdk-v2/alert