diff --git a/.github/workflows/pull_request_integration_tests.yml b/.github/workflows/pull_request_integration_tests.yml index b79586f01..ff3bddfcc 100644 --- a/.github/workflows/pull_request_integration_tests.yml +++ b/.github/workflows/pull_request_integration_tests.yml @@ -9,7 +9,7 @@ on: jobs: test: name: test - runs-on: ubuntu-latest + runs-on: ubuntu-latest-8-cores strategy: matrix: go: [ '1.23' ] diff --git a/.github/workflows/pull_request_k8s_integration_tests.yml b/.github/workflows/pull_request_k8s_integration_tests.yml index 0b01c6b39..1680083e4 100644 --- a/.github/workflows/pull_request_k8s_integration_tests.yml +++ b/.github/workflows/pull_request_k8s_integration_tests.yml @@ -9,7 +9,7 @@ on: jobs: test: name: test - runs-on: ubuntu-latest + runs-on: ubuntu-latest-8-cores strategy: matrix: go: [ '1.23' ] diff --git a/docs/sources/configure/options.md b/docs/sources/configure/options.md index 5a175ee07..22b09347c 100644 --- a/docs/sources/configure/options.md +++ b/docs/sources/configure/options.md @@ -696,6 +696,17 @@ reduces the load of the Kubernetes API. The Pods informer can't be disabled. For that purpose, you should disable the whole Kubernetes metadata decoration. +| YAML | Environment variable | Type | Default | +|----------------------------|---------------------------------------|---------|---------| +| `meta_restrict_local_node` | `BEYLA_KUBE_META_RESTRICT_LOCAL_NODE` | boolean | false | + +If true, Beyla stores Pod and Node metadata only from the node where the Beyla instance is running. + +This option decreases the memory used to store the metadata, but some metrics +(such as network bytes or service graph metrics) would miss the metadata from destination +pods that are located in a different node. + + | YAML | Environment variable | Type | Default | |--------------------------|-------------------------------------|----------|---------| | `informers_sync_timeout` | `BEYLA_KUBE_INFORMERS_SYNC_TIMEOUT` | Duration | 30s | diff --git a/pkg/components/beyla.go b/pkg/components/beyla.go index 3eb7d11ae..e2ae152c0 100644 --- a/pkg/components/beyla.go +++ b/pkg/components/beyla.go @@ -114,6 +114,7 @@ func buildCommonContextInfo( DisabledInformers: config.Attributes.Kubernetes.DisableInformers, MetaCacheAddr: config.Attributes.Kubernetes.MetaCacheAddress, MetadataSources: config.Attributes.Kubernetes.MetadataSources, + RestrictLocalNode: config.Attributes.Kubernetes.MetaRestrictLocalNode, }), } switch { diff --git a/pkg/internal/kube/informer_provider.go b/pkg/internal/kube/informer_provider.go index 5e4d431bb..17f44753f 100644 --- a/pkg/internal/kube/informer_provider.go +++ b/pkg/internal/kube/informer_provider.go @@ -35,6 +35,7 @@ type MetadataConfig struct { ResyncPeriod time.Duration MetaCacheAddr string MetadataSources MetadataSources + RestrictLocalNode bool } type MetadataProvider struct { @@ -43,6 +44,8 @@ type MetadataProvider struct { metadata *Store informer meta.Notifier + localNodeName string + cfg *MetadataConfig } @@ -126,6 +129,9 @@ func (mp *MetadataProvider) getInformer(ctx context.Context) (meta.Notifier, err } func (mp *MetadataProvider) CurrentNodeName(ctx context.Context) (string, error) { + if mp.localNodeName != "" { + return mp.localNodeName, nil + } log := klog().With("func", "NodeName") kubeClient, err := mp.KubeClient() if err != nil { @@ -153,7 +159,8 @@ func (mp *MetadataProvider) CurrentNodeName(ctx context.Context) (string, error) " host name as node name", "nodeName", currentPod, "namespace", currentNamespace, "error", err) return currentPod, nil } - return pods.Items[0].Spec.NodeName, nil + mp.localNodeName = pods.Items[0].Spec.NodeName + return mp.localNodeName, nil } // initLocalInformers initializes an informer client that directly connects to the Node Kube API @@ -167,6 +174,13 @@ func (mp *MetadataProvider) initLocalInformers(ctx context.Context) (*meta.Infor meta.WaitForCacheSync(), meta.WithCacheSyncTimeout(mp.cfg.SyncTimeout), ) + if mp.cfg.RestrictLocalNode { + localNode, err := mp.CurrentNodeName(ctx) + if err != nil { + return nil, fmt.Errorf("getting local node name: %w", err) + } + opts = append(opts, meta.RestrictNode(localNode)) + } return meta.InitInformers(ctx, opts...) } diff --git a/pkg/kubecache/meta/informers_init.go b/pkg/kubecache/meta/informers_init.go index dc2e75957..c3f0d0bac 100644 --- a/pkg/kubecache/meta/informers_init.go +++ b/pkg/kubecache/meta/informers_init.go @@ -8,12 +8,14 @@ import ( "os" "path" "strings" + "sync" "time" "github.com/google/go-cmp/cmp" "google.golang.org/protobuf/testing/protocmp" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -44,6 +46,8 @@ type informersConfig struct { disableNodes bool disableServices bool + restrictNode string + // waits for cache synchronization at start waitCacheSync bool cacheSyncTimeout time.Duration @@ -80,6 +84,12 @@ func WithoutServices() InformerOption { } } +func RestrictNode(nodeName string) InformerOption { + return func(c *informersConfig) { + c.restrictNode = nodeName + } +} + func WithKubeClient(client kubernetes.Interface) InformerOption { return func(c *informersConfig) { c.kubeClient = client @@ -119,27 +129,25 @@ func InitInformers(ctx context.Context, opts ...InformerOption) (*Informers, err } } - informerFactory := informers.NewSharedInformerFactory(svc.config.kubeClient, svc.config.resyncPeriod) - - if err := svc.initPodInformer(ctx, informerFactory); err != nil { + createdFactories, err := svc.initInformers(ctx, config) + if err != nil { return nil, err } - if !svc.config.disableNodes { - if err := svc.initNodeIPInformer(ctx, informerFactory); err != nil { - return nil, err - } - } - if !svc.config.disableServices { - if err := svc.initServiceIPInformer(ctx, informerFactory); err != nil { - return nil, err - } - } svc.log.Debug("starting kubernetes informers") - informerFactory.Start(ctx.Done()) + allSynced := sync.WaitGroup{} + allSynced.Add(len(createdFactories)) + for _, factory := range createdFactories { + factory.Start(ctx.Done()) + go func() { + factory.WaitForCacheSync(ctx.Done()) + allSynced.Done() + }() + } + go func() { - svc.log.Debug("waiting for informers' syncronization") - informerFactory.WaitForCacheSync(ctx.Done()) + svc.log.Debug("waiting for informers' synchronization") + allSynced.Wait() svc.log.Debug("informers synchronized") close(svc.waitForSync) }() @@ -159,6 +167,49 @@ func InitInformers(ctx context.Context, opts ...InformerOption) (*Informers, err } +func (inf *Informers) initInformers(ctx context.Context, config *informersConfig) ([]informers.SharedInformerFactory, error) { + var informerFactory informers.SharedInformerFactory + if config.restrictNode == "" { + informerFactory = informers.NewSharedInformerFactory(inf.config.kubeClient, inf.config.resyncPeriod) + } else { + informerFactory = informers.NewSharedInformerFactoryWithOptions(inf.config.kubeClient, inf.config.resyncPeriod, + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.Set{"spec.nodeName": config.restrictNode}.String() + })) + } + createdFactories := []informers.SharedInformerFactory{informerFactory} + if err := inf.initPodInformer(ctx, informerFactory); err != nil { + return nil, err + } + + if !inf.config.disableNodes { + nodeIFactory := informerFactory + if config.restrictNode != "" { + nodeIFactory = informers.NewSharedInformerFactoryWithOptions(inf.config.kubeClient, inf.config.resyncPeriod, + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.Set{"metadata.name": config.restrictNode}.String() + })) + createdFactories = append(createdFactories, nodeIFactory) + } // else: use default, unfiltered informerFactory instance + if err := inf.initNodeIPInformer(ctx, nodeIFactory); err != nil { + return nil, err + } + } + if !inf.config.disableServices { + svcIFactory := informerFactory + if config.restrictNode != "" { + // informerFactory will be initially set to a "spec.nodeName"-filtered instance, so we need + // to create an unfiltered one for global services + svcIFactory = informers.NewSharedInformerFactory(inf.config.kubeClient, inf.config.resyncPeriod) + createdFactories = append(createdFactories, svcIFactory) + } + if err := inf.initServiceIPInformer(ctx, svcIFactory); err != nil { + return nil, err + } + } + return createdFactories, nil +} + func initConfigOpts(opts []InformerOption) *informersConfig { config := &informersConfig{} for _, opt := range opts { @@ -467,32 +518,34 @@ func headlessService(om *informer.ObjectMeta) bool { func (inf *Informers) ipInfoEventHandler(ctx context.Context) *cache.ResourceEventHandlerFuncs { metrics := instrument.FromContext(ctx) + log := inf.log.With("func", "ipInfoEventHandler") return &cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { metrics.InformerNew() + em := obj.(*indexableEntity).EncodedMeta + log.Debug("AddFunc", "kind", em.Kind, "name", em.Name, "ips", em.Ips) // ignore headless services from being added if headlessService(obj.(*indexableEntity).EncodedMeta) { return } inf.Notify(&informer.Event{ Type: informer.EventType_CREATED, - Resource: obj.(*indexableEntity).EncodedMeta, + Resource: em, }) }, UpdateFunc: func(oldObj, newObj interface{}) { metrics.InformerUpdate() + newEM := newObj.(*indexableEntity).EncodedMeta + oldEM := oldObj.(*indexableEntity).EncodedMeta // ignore headless services from being added - if headlessService(newObj.(*indexableEntity).EncodedMeta) && - headlessService(oldObj.(*indexableEntity).EncodedMeta) { + if headlessService(newEM) && headlessService(oldEM) { return } - if cmp.Equal( - oldObj.(*indexableEntity).EncodedMeta, - newObj.(*indexableEntity).EncodedMeta, - protoCmpTransform, - ) { + if cmp.Equal(oldEM, newEM, protoCmpTransform) { return } + log.Debug("UpdateFunc", "kind", newEM.Kind, "name", newEM.Name, + "ips", newEM.Ips, "oldIps", oldEM.Ips) inf.Notify(&informer.Event{ Type: informer.EventType_UPDATED, Resource: newObj.(*indexableEntity).EncodedMeta, @@ -512,6 +565,8 @@ func (inf *Informers) ipInfoEventHandler(ctx context.Context) *cache.ResourceEve return } } + em := obj.(*indexableEntity).EncodedMeta + log.Debug("DeleteFunc", "kind", em.Kind, "name", em.Name, "ips", em.Ips) metrics.InformerDelete() inf.Notify(&informer.Event{ diff --git a/pkg/transform/k8s.go b/pkg/transform/k8s.go index df899a34a..00d109fa2 100644 --- a/pkg/transform/k8s.go +++ b/pkg/transform/k8s.go @@ -49,6 +49,10 @@ type KubernetesDecorator struct { // MetaCacheAddress is the host:port address of the beyla-k8s-cache service instance MetaCacheAddress string `yaml:"meta_cache_address" env:"BEYLA_KUBE_META_CACHE_ADDRESS"` + // MetaRestrictLocalNode will download only the metadata from the Pods that are located in the same + // node as the Beyla instance. It will also restrict the Node information to the local node. + MetaRestrictLocalNode bool `yaml:"meta_restrict_local_node" env:"BEYLA_KUBE_META_RESTRICT_LOCAL_NODE"` + // MetadataSources allows Beyla overriding the service name and namespace of an application from // the given labels. MetadataSources kube.MetadataSources `yaml:"meta_naming_sources"` diff --git a/test/integration/k8s/manifests/02-prometheus-otelscrape.yml b/test/integration/k8s/manifests/02-prometheus-otelscrape.yml index ba496b0f4..27ebdd82b 100644 --- a/test/integration/k8s/manifests/02-prometheus-otelscrape.yml +++ b/test/integration/k8s/manifests/02-prometheus-otelscrape.yml @@ -25,7 +25,7 @@ spec: - name: prometheus image: quay.io/prometheus/prometheus:v2.53.0 args: - - --storage.tsdb.retention.time=1m + - --storage.tsdb.retention.time=10m - --config.file=/etc/prometheus/prometheus-config.yml - --storage.tsdb.path=/prometheus - --web.enable-lifecycle diff --git a/test/integration/k8s/manifests/05-uninstrumented-server-client-different-nodes.yml b/test/integration/k8s/manifests/05-uninstrumented-server-client-different-nodes.yml new file mode 100644 index 000000000..77a19f1a9 --- /dev/null +++ b/test/integration/k8s/manifests/05-uninstrumented-server-client-different-nodes.yml @@ -0,0 +1,112 @@ +# this file depends in the annotations and 00-kind-multi-node.yml to deploy a otherinstance +# and a client in different nodes. +# Beyla will instrument both, but restricting the metadata only to the local node, +# so network flows between client and otherinstance would be incomplete +apiVersion: v1 +kind: Pod +metadata: + name: httppinger + labels: + component: httppinger + # this label will trigger a deletion of beyla pods before tearing down + # kind, to force Beyla writing the coverage data + teardown: delete +spec: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: deployment/zone + operator: In + values: + - other-progs + volumes: + - name: configs + persistentVolumeClaim: + claimName: configs + - name: testoutput + persistentVolumeClaim: + claimName: testoutput + - name: maincode + configMap: + name: maincode + containers: + - name: httppinger + image: httppinger:dev + env: + - name: TARGET_URL + value: "http://otherinstance:8080" +--- +apiVersion: v1 +kind: Service +metadata: + name: otherinstance +spec: + selector: + app: otherinstance + ports: + - port: 8080 + name: http0 + targetPort: http0 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: otherinstance + labels: + app: otherinstance +spec: + replicas: 1 + selector: + matchLabels: + app: otherinstance + template: + metadata: + name: otherinstance + labels: + app: otherinstance + # this label will trigger a deletion of beyla pods before tearing down + # kind, to force Beyla writing the coverage data + teardown: delete + spec: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: deployment/zone + operator: In + values: + - otel + volumes: + - name: configs + persistentVolumeClaim: + claimName: configs + - name: testoutput + persistentVolumeClaim: + claimName: testoutput + containers: + - name: otherinstance + image: testserver:dev + imagePullPolicy: Never # loaded into Kind from localhost + ports: + # exposing hostports to enable operation from tests + - containerPort: 8080 + hostPort: 8080 + name: http0 + - containerPort: 8081 + hostPort: 8081 + name: http1 + - containerPort: 8082 + hostPort: 8082 + name: http2 + - containerPort: 8083 + hostPort: 8083 + name: http3 + - containerPort: 5051 + hostPort: 5051 + name: grpc + env: + - name: LOG_LEVEL + value: "DEBUG" diff --git a/test/integration/k8s/manifests/06-beyla-netolly.yml b/test/integration/k8s/manifests/06-beyla-netolly.yml index 5c338ce20..6cf27654c 100644 --- a/test/integration/k8s/manifests/06-beyla-netolly.yml +++ b/test/integration/k8s/manifests/06-beyla-netolly.yml @@ -80,3 +80,7 @@ spec: value: "10ms" - name: BEYLA_BPF_BATCH_TIMEOUT value: "10ms" + # in tests not running on multi-node Kind setups, this property + # should have no effect + - name: BEYLA_KUBE_META_RESTRICT_LOCAL_NODE + value: "true" \ No newline at end of file diff --git a/test/integration/k8s/netolly_promexport/k8s_netolly_prom_main_test.go b/test/integration/k8s/netolly_promexport/k8s_netolly_prom_main_test.go index 5f62caadb..a7e4c29c4 100644 --- a/test/integration/k8s/netolly_promexport/k8s_netolly_prom_main_test.go +++ b/test/integration/k8s/netolly_promexport/k8s_netolly_prom_main_test.go @@ -22,7 +22,6 @@ func TestMain(m *testing.M) { docker.ImageBuild{Tag: "beyla:dev", Dockerfile: k8s.DockerfileBeyla}, docker.ImageBuild{Tag: "httppinger:dev", Dockerfile: k8s.DockerfileHTTPPinger}, docker.ImageBuild{Tag: "quay.io/prometheus/prometheus:v2.53.0"}, - docker.ImageBuild{Tag: "otel/opentelemetry-collector-contrib:0.103.0"}, ); err != nil { slog.Error("can't build docker images", "error", err) os.Exit(-1) @@ -35,7 +34,6 @@ func TestMain(m *testing.M) { kube.LocalImage("beyla:dev"), kube.LocalImage("httppinger:dev"), kube.LocalImage("quay.io/prometheus/prometheus:v2.53.0"), - kube.LocalImage("otel/opentelemetry-collector-contrib:0.103.0"), kube.Deploy(k8s.PathManifests+"/01-volumes.yml"), kube.Deploy(k8s.PathManifests+"/01-serviceaccount.yml"), kube.Deploy(k8s.PathManifests+"/02-prometheus-promscrape.yml"), diff --git a/test/integration/k8s/restrict_local_node/restrict_local_node_main_test.go b/test/integration/k8s/restrict_local_node/restrict_local_node_main_test.go new file mode 100644 index 000000000..19a42567a --- /dev/null +++ b/test/integration/k8s/restrict_local_node/restrict_local_node_main_test.go @@ -0,0 +1,88 @@ +//go:build integration_k8s + +package otel + +import ( + "log/slog" + "os" + "testing" + "time" + + "github.com/mariomac/guara/pkg/test" + "github.com/stretchr/testify/require" + + "github.com/grafana/beyla/test/integration/components/docker" + "github.com/grafana/beyla/test/integration/components/kube" + "github.com/grafana/beyla/test/integration/components/prom" + k8s "github.com/grafana/beyla/test/integration/k8s/common" + "github.com/grafana/beyla/test/tools" +) + +const ( + prometheusHostPort = "localhost:39090" + testTimeout = 3 * time.Minute +) + +var cluster *kube.Kind + +func TestMain(m *testing.M) { + if err := docker.Build(os.Stdout, tools.ProjectDir(), + docker.ImageBuild{Tag: "testserver:dev", Dockerfile: k8s.DockerfileTestServer}, + docker.ImageBuild{Tag: "httppinger:dev", Dockerfile: k8s.DockerfileHTTPPinger}, + docker.ImageBuild{Tag: "beyla:dev", Dockerfile: k8s.DockerfileBeyla}, + docker.ImageBuild{Tag: "quay.io/prometheus/prometheus:v2.53.0"}, + docker.ImageBuild{Tag: "otel/opentelemetry-collector-contrib:0.103.0"}, + ); err != nil { + slog.Error("can't build docker images", "error", err) + os.Exit(-1) + } + + cluster = kube.NewKind("test-kind-cluster-otel-multi", + kube.ExportLogs(k8s.PathKindLogs), + kube.KindConfig(k8s.PathManifests+"/00-kind-multi-node.yml"), + kube.LocalImage("testserver:dev"), + kube.LocalImage("httppinger:dev"), + kube.LocalImage("beyla:dev"), + kube.LocalImage("quay.io/prometheus/prometheus:v2.53.0"), + kube.LocalImage("otel/opentelemetry-collector-contrib:0.103.0"), + kube.Deploy(k8s.PathManifests+"/01-volumes.yml"), + kube.Deploy(k8s.PathManifests+"/01-serviceaccount.yml"), + kube.Deploy(k8s.PathManifests+"/02-prometheus-otelscrape-multi-node.yml"), + kube.Deploy(k8s.PathManifests+"/03-otelcol.yml"), + kube.Deploy(k8s.PathManifests+"/05-uninstrumented-server-client-different-nodes.yml"), + kube.Deploy(k8s.PathManifests+"/06-beyla-netolly.yml"), + ) + + cluster.Run(m) +} + +func TestNoSourceAndDestAvailable(t *testing.T) { + // Wait for some metrics available at Prometheus + pq := prom.Client{HostPort: prometheusHostPort} + for _, args := range []string{ + `k8s_dst_name="httppinger"`, + `k8s_src_name="httppinger"`, + `k8s_dst_name=~"otherinstance.*"`, + `k8s_src_name=~"otherinstance.*"`, + } { + t.Run("check "+args, func(t *testing.T) { + test.Eventually(t, testTimeout, func(t require.TestingT) { + var err error + results, err := pq.Query(`beyla_network_flow_bytes_total{` + args + `}`) + require.NoError(t, err) + require.NotEmpty(t, results) + }) + }) + } + + // Verify that HTTP pinger/testserver metrics can't have both source and destination labels, + // as the test client and server are in different nodes, and Beyla is only getting information + // from its local node + results, err := pq.Query(`beyla_network_flow_bytes_total{k8s_dst_name="httppinger",k8s_src_name=~"otherinstance.*",k8s_src_kind="Pod"}`) + require.NoError(t, err) + require.Empty(t, results) + + results, err = pq.Query(`beyla_network_flow_bytes_total{k8s_src_name="httppinger",k8s_dst_name=~"otherinstance.*",k8s_dst_kind="Pod"}`) + require.NoError(t, err) + require.Empty(t, results) +}