Skip to content

Commit

Permalink
Fix informers on stale object deletion (#1420) (#1424)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariomac authored Dec 3, 2024
1 parent fb6dbef commit cd2d312
Showing 1 changed file with 116 additions and 60 deletions.
176 changes: 116 additions & 60 deletions pkg/kubecache/meta/informers_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,68 +225,13 @@ func (inf *Informers) initPodInformer(ctx context.Context, informerFactory infor
if pi, ok := i.(*indexableEntity); ok {
return pi, nil
}
return nil, fmt.Errorf("was expecting a *v1.Pod. Got: %T", i)
}
containers := make([]*informer.ContainerInfo, 0,
len(pod.Status.ContainerStatuses)+
len(pod.Status.InitContainerStatuses)+
len(pod.Status.EphemeralContainerStatuses))
for i := range pod.Status.ContainerStatuses {
containers = append(containers,
&informer.ContainerInfo{
Name: pod.Spec.Containers[i].Name,
Id: rmContainerIDSchema(pod.Status.ContainerStatuses[i].ContainerID),
Env: envToMap(inf.config.kubeClient, pod.ObjectMeta, pod.Spec.Containers[i].Env),
},
)
}
for i := range pod.Status.InitContainerStatuses {
containers = append(containers,
&informer.ContainerInfo{
Name: pod.Spec.InitContainers[i].Name,
Id: rmContainerIDSchema(pod.Status.InitContainerStatuses[i].ContainerID),
Env: envToMap(inf.config.kubeClient, pod.ObjectMeta, pod.Spec.InitContainers[i].Env),
},
)
}
for i := range pod.Status.EphemeralContainerStatuses {
containers = append(containers,
&informer.ContainerInfo{
Name: pod.Spec.EphemeralContainers[i].Name,
Id: rmContainerIDSchema(pod.Status.EphemeralContainerStatuses[i].ContainerID),
Env: envToMap(inf.config.kubeClient, pod.ObjectMeta, pod.Spec.EphemeralContainers[i].Env),
},
)
}

ips := make([]string, 0, len(pod.Status.PodIPs))
for _, ip := range pod.Status.PodIPs {
// ignoring host-networked Pod IPs
// TODO: check towards all the Status.HostIPs slice
if ip.IP != pod.Status.HostIP {
ips = append(ips, ip.IP)
// let's forward the stale object to the event handler
if obj, stale := i.(cache.DeletedFinalStateUnknown); stale {
return obj, nil
}
return nil, fmt.Errorf("was expecting a *v1.Pod. Got: %T", i)
}

startTime := pod.GetCreationTimestamp().String()
return &indexableEntity{
ObjectMeta: minimalIndex(&pod.ObjectMeta),
EncodedMeta: &informer.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
Labels: pod.Labels,
Ips: ips,
Kind: typePod,
Pod: &informer.PodInfo{
Uid: string(pod.UID),
NodeName: pod.Spec.NodeName,
StartTimeStr: startTime,
Containers: containers,
Owners: ownersFrom(&pod.ObjectMeta),
HostIp: pod.Status.HostIP,
},
},
}, nil
return inf.podToIndexableEntity(pod)
}); err != nil {
return fmt.Errorf("can't set pods transform: %w", err)
}
Expand All @@ -302,6 +247,82 @@ func (inf *Informers) initPodInformer(ctx context.Context, informerFactory infor
return nil
}

func (inf *Informers) podToIndexableEntity(pod *v1.Pod) (interface{}, error) {
containers := make([]*informer.ContainerInfo, 0,
len(pod.Status.ContainerStatuses)+
len(pod.Status.InitContainerStatuses)+
len(pod.Status.EphemeralContainerStatuses))
for i := range pod.Status.ContainerStatuses {
cs := &pod.Status.ContainerStatuses[i]
envs := envsFromContainerSpec(cs.Name, pod.Spec.Containers)
containers = append(containers,
&informer.ContainerInfo{
Name: cs.Name,
Id: rmContainerIDSchema(cs.ContainerID),
Env: envToMap(inf.config.kubeClient, pod.ObjectMeta, envs),
},
)
}
for i := range pod.Status.InitContainerStatuses {
ics := &pod.Status.InitContainerStatuses[i]
envs := envsFromContainerSpec(ics.Name, pod.Spec.InitContainers)
containers = append(containers,
&informer.ContainerInfo{
Name: ics.Name,
Id: rmContainerIDSchema(ics.ContainerID),
Env: envToMap(inf.config.kubeClient, pod.ObjectMeta, envs),
},
)
}
for i := range pod.Status.EphemeralContainerStatuses {
ecs := &pod.Status.EphemeralContainerStatuses[i]
var envs []v1.EnvVar
for i := range pod.Spec.EphemeralContainers {
c := &pod.Spec.EphemeralContainers[i]
if c.Name == ecs.Name {
envs = c.Env
break
}
}
containers = append(containers,
&informer.ContainerInfo{
Name: ecs.Name,
Id: rmContainerIDSchema(ecs.ContainerID),
Env: envToMap(inf.config.kubeClient, pod.ObjectMeta, envs),
},
)
}

ips := make([]string, 0, len(pod.Status.PodIPs))
for _, ip := range pod.Status.PodIPs {
// ignoring host-networked Pod IPs
// TODO: check towards all the Status.HostIPs slice
if ip.IP != pod.Status.HostIP {
ips = append(ips, ip.IP)
}
}

startTime := pod.GetCreationTimestamp().String()
return &indexableEntity{
ObjectMeta: minimalIndex(&pod.ObjectMeta),
EncodedMeta: &informer.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
Labels: pod.Labels,
Ips: ips,
Kind: typePod,
Pod: &informer.PodInfo{
Uid: string(pod.UID),
NodeName: pod.Spec.NodeName,
StartTimeStr: startTime,
Containers: containers,
Owners: ownersFrom(&pod.ObjectMeta),
HostIp: pod.Status.HostIP,
},
},
}, nil
}

func envToMap(kc kubernetes.Interface, objMeta metav1.ObjectMeta, containerEnv []v1.EnvVar) map[string]string {
envMap := map[string]string{}
for _, envV := range containerEnv {
Expand All @@ -321,6 +342,18 @@ func envToMap(kc kubernetes.Interface, objMeta metav1.ObjectMeta, containerEnv [
return envMap
}

func envsFromContainerSpec(containerName string, containers []v1.Container) []v1.EnvVar {
var envs []v1.EnvVar
for i := range containers {
c := &containers[i]
if c.Name == containerName {
envs = c.Env
break
}
}
return envs
}

// rmContainerIDSchema extracts the hex ID of a container ID that is provided in the form:
// containerd://40c03570b6f4c30bc8d69923d37ee698f5cfcced92c7b7df1c47f6f7887378a9
func rmContainerIDSchema(containerID string) string {
Expand All @@ -336,12 +369,17 @@ func (inf *Informers) initNodeIPInformer(ctx context.Context, informerFactory in
// in the informer's cache
if err := nodes.SetTransform(func(i interface{}) (interface{}, error) {
node, ok := i.(*v1.Node)
// todo: move to generic function
if !ok {
// it's Ok. The K8s library just informed from an entity
// that has been previously transformed/stored
if pi, ok := i.(*indexableEntity); ok {
return pi, nil
}
// let's forward the stale object to the event handler
if obj, stale := i.(cache.DeletedFinalStateUnknown); stale {
return obj, nil
}
return nil, fmt.Errorf("was expecting a *v1.Node. Got: %T", i)
}
ips := make([]string, 0, len(node.Status.Addresses))
Expand Down Expand Up @@ -389,6 +427,10 @@ func (inf *Informers) initServiceIPInformer(ctx context.Context, informerFactory
if pi, ok := i.(*indexableEntity); ok {
return pi, nil
}
// let's forward the stale object to the event handler
if obj, stale := i.(cache.DeletedFinalStateUnknown); stale {
return obj, nil
}
return nil, fmt.Errorf("was expecting a *v1.Service. Got: %T", i)
}
var ips []string
Expand Down Expand Up @@ -456,6 +498,20 @@ func (inf *Informers) ipInfoEventHandler(ctx context.Context) *cache.ResourceEve
})
},
DeleteFunc: func(obj interface{}) {
// this type is received when an object was deleted but the watch deletion event was missed
// while disconnected from the API server. In this case we don't know the final "resting"
// state of the object, so there's a chance the included `Obj` is stale.
// We delete it anyway despite some data could be kept in the cache if the last snapshot we have
// don't contain all the IPs associated to that object
if stale, ok := obj.(cache.DeletedFinalStateUnknown); ok {
inf.log.Debug("stale object received in the informer. Deleting", "key", stale.Key)
if obj, ok = stale.Obj.(*indexableEntity); !ok {
inf.log.Warn("can't cast stale object to *indexableEntity",
"obj", stale.Obj, "type", fmt.Sprintf("%T", stale.Obj))
return
}
}

metrics.InformerDelete()
inf.Notify(&informer.Event{
Type: informer.EventType_DELETED,
Expand Down

0 comments on commit cd2d312

Please sign in to comment.