Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix informers on stale object deletion [1.9 backport] #1424

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 116 additions & 60 deletions pkg/kubecache/meta/informers_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,68 +225,13 @@ func (inf *Informers) initPodInformer(ctx context.Context, informerFactory infor
if pi, ok := i.(*indexableEntity); ok {
return pi, nil
}
return nil, fmt.Errorf("was expecting a *v1.Pod. Got: %T", i)
}
containers := make([]*informer.ContainerInfo, 0,
len(pod.Status.ContainerStatuses)+
len(pod.Status.InitContainerStatuses)+
len(pod.Status.EphemeralContainerStatuses))
for i := range pod.Status.ContainerStatuses {
containers = append(containers,
&informer.ContainerInfo{
Name: pod.Spec.Containers[i].Name,
Id: rmContainerIDSchema(pod.Status.ContainerStatuses[i].ContainerID),
Env: envToMap(inf.config.kubeClient, pod.ObjectMeta, pod.Spec.Containers[i].Env),
},
)
}
for i := range pod.Status.InitContainerStatuses {
containers = append(containers,
&informer.ContainerInfo{
Name: pod.Spec.InitContainers[i].Name,
Id: rmContainerIDSchema(pod.Status.InitContainerStatuses[i].ContainerID),
Env: envToMap(inf.config.kubeClient, pod.ObjectMeta, pod.Spec.InitContainers[i].Env),
},
)
}
for i := range pod.Status.EphemeralContainerStatuses {
containers = append(containers,
&informer.ContainerInfo{
Name: pod.Spec.EphemeralContainers[i].Name,
Id: rmContainerIDSchema(pod.Status.EphemeralContainerStatuses[i].ContainerID),
Env: envToMap(inf.config.kubeClient, pod.ObjectMeta, pod.Spec.EphemeralContainers[i].Env),
},
)
}

ips := make([]string, 0, len(pod.Status.PodIPs))
for _, ip := range pod.Status.PodIPs {
// ignoring host-networked Pod IPs
// TODO: check towards all the Status.HostIPs slice
if ip.IP != pod.Status.HostIP {
ips = append(ips, ip.IP)
// let's forward the stale object to the event handler
if obj, stale := i.(cache.DeletedFinalStateUnknown); stale {
return obj, nil
}
return nil, fmt.Errorf("was expecting a *v1.Pod. Got: %T", i)
}

startTime := pod.GetCreationTimestamp().String()
return &indexableEntity{
ObjectMeta: minimalIndex(&pod.ObjectMeta),
EncodedMeta: &informer.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
Labels: pod.Labels,
Ips: ips,
Kind: typePod,
Pod: &informer.PodInfo{
Uid: string(pod.UID),
NodeName: pod.Spec.NodeName,
StartTimeStr: startTime,
Containers: containers,
Owners: ownersFrom(&pod.ObjectMeta),
HostIp: pod.Status.HostIP,
},
},
}, nil
return inf.podToIndexableEntity(pod)
}); err != nil {
return fmt.Errorf("can't set pods transform: %w", err)
}
Expand All @@ -302,6 +247,82 @@ func (inf *Informers) initPodInformer(ctx context.Context, informerFactory infor
return nil
}

func (inf *Informers) podToIndexableEntity(pod *v1.Pod) (interface{}, error) {
containers := make([]*informer.ContainerInfo, 0,
len(pod.Status.ContainerStatuses)+
len(pod.Status.InitContainerStatuses)+
len(pod.Status.EphemeralContainerStatuses))
for i := range pod.Status.ContainerStatuses {
cs := &pod.Status.ContainerStatuses[i]
envs := envsFromContainerSpec(cs.Name, pod.Spec.Containers)
containers = append(containers,
&informer.ContainerInfo{
Name: cs.Name,
Id: rmContainerIDSchema(cs.ContainerID),
Env: envToMap(inf.config.kubeClient, pod.ObjectMeta, envs),
},
)
}
for i := range pod.Status.InitContainerStatuses {
ics := &pod.Status.InitContainerStatuses[i]
envs := envsFromContainerSpec(ics.Name, pod.Spec.InitContainers)
containers = append(containers,
&informer.ContainerInfo{
Name: ics.Name,
Id: rmContainerIDSchema(ics.ContainerID),
Env: envToMap(inf.config.kubeClient, pod.ObjectMeta, envs),
},
)
}
for i := range pod.Status.EphemeralContainerStatuses {
ecs := &pod.Status.EphemeralContainerStatuses[i]
var envs []v1.EnvVar
for i := range pod.Spec.EphemeralContainers {
c := &pod.Spec.EphemeralContainers[i]
if c.Name == ecs.Name {
envs = c.Env
break
}
}
containers = append(containers,
&informer.ContainerInfo{
Name: ecs.Name,
Id: rmContainerIDSchema(ecs.ContainerID),
Env: envToMap(inf.config.kubeClient, pod.ObjectMeta, envs),
},
)
}

ips := make([]string, 0, len(pod.Status.PodIPs))
for _, ip := range pod.Status.PodIPs {
// ignoring host-networked Pod IPs
// TODO: check towards all the Status.HostIPs slice
if ip.IP != pod.Status.HostIP {
ips = append(ips, ip.IP)
}
}

startTime := pod.GetCreationTimestamp().String()
return &indexableEntity{
ObjectMeta: minimalIndex(&pod.ObjectMeta),
EncodedMeta: &informer.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
Labels: pod.Labels,
Ips: ips,
Kind: typePod,
Pod: &informer.PodInfo{
Uid: string(pod.UID),
NodeName: pod.Spec.NodeName,
StartTimeStr: startTime,
Containers: containers,
Owners: ownersFrom(&pod.ObjectMeta),
HostIp: pod.Status.HostIP,
},
},
}, nil
}

func envToMap(kc kubernetes.Interface, objMeta metav1.ObjectMeta, containerEnv []v1.EnvVar) map[string]string {
envMap := map[string]string{}
for _, envV := range containerEnv {
Expand All @@ -321,6 +342,18 @@ func envToMap(kc kubernetes.Interface, objMeta metav1.ObjectMeta, containerEnv [
return envMap
}

func envsFromContainerSpec(containerName string, containers []v1.Container) []v1.EnvVar {
var envs []v1.EnvVar
for i := range containers {
c := &containers[i]
if c.Name == containerName {
envs = c.Env
break
}
}
return envs
}

// rmContainerIDSchema extracts the hex ID of a container ID that is provided in the form:
// containerd://40c03570b6f4c30bc8d69923d37ee698f5cfcced92c7b7df1c47f6f7887378a9
func rmContainerIDSchema(containerID string) string {
Expand All @@ -336,12 +369,17 @@ func (inf *Informers) initNodeIPInformer(ctx context.Context, informerFactory in
// in the informer's cache
if err := nodes.SetTransform(func(i interface{}) (interface{}, error) {
node, ok := i.(*v1.Node)
// todo: move to generic function
if !ok {
// it's Ok. The K8s library just informed from an entity
// that has been previously transformed/stored
if pi, ok := i.(*indexableEntity); ok {
return pi, nil
}
// let's forward the stale object to the event handler
if obj, stale := i.(cache.DeletedFinalStateUnknown); stale {
return obj, nil
}
return nil, fmt.Errorf("was expecting a *v1.Node. Got: %T", i)
}
ips := make([]string, 0, len(node.Status.Addresses))
Expand Down Expand Up @@ -389,6 +427,10 @@ func (inf *Informers) initServiceIPInformer(ctx context.Context, informerFactory
if pi, ok := i.(*indexableEntity); ok {
return pi, nil
}
// let's forward the stale object to the event handler
if obj, stale := i.(cache.DeletedFinalStateUnknown); stale {
return obj, nil
}
return nil, fmt.Errorf("was expecting a *v1.Service. Got: %T", i)
}
var ips []string
Expand Down Expand Up @@ -456,6 +498,20 @@ func (inf *Informers) ipInfoEventHandler(ctx context.Context) *cache.ResourceEve
})
},
DeleteFunc: func(obj interface{}) {
// this type is received when an object was deleted but the watch deletion event was missed
// while disconnected from the API server. In this case we don't know the final "resting"
// state of the object, so there's a chance the included `Obj` is stale.
// We delete it anyway despite some data could be kept in the cache if the last snapshot we have
// don't contain all the IPs associated to that object
if stale, ok := obj.(cache.DeletedFinalStateUnknown); ok {
inf.log.Debug("stale object received in the informer. Deleting", "key", stale.Key)
if obj, ok = stale.Obj.(*indexableEntity); !ok {
inf.log.Warn("can't cast stale object to *indexableEntity",
"obj", stale.Obj, "type", fmt.Sprintf("%T", stale.Obj))
return
}
}

metrics.InformerDelete()
inf.Notify(&informer.Event{
Type: informer.EventType_DELETED,
Expand Down
Loading