Skip to content

Commit

Permalink
Merge pull request kubernetes#6132 from atwamahmoud/status-taints-sup…
Browse files Browse the repository at this point in the history
…port

Adds support for startup taints and extends support for status taints
  • Loading branch information
k8s-ci-robot authored and Panic Stevenson committed Aug 24, 2024
1 parent 06465a3 commit d9c7e26
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 60 deletions.
6 changes: 3 additions & 3 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ type AutoscalingOptions struct {
MaxBulkSoftTaintTime time.Duration
// MaxPodEvictionTime sets the maximum time CA tries to evict a pod before giving up.
MaxPodEvictionTime time.Duration
// IgnoredTaints is a list of taints CA considers to reflect transient node
// StartupTaints is a list of taints CA considers to reflect transient node
// status that should be removed when creating a node template for scheduling.
// The ignored taints are expected to appear during node startup.
IgnoredTaints []string
// startup taints are expected to appear during node startup.
StartupTaints []string
// StatusTaints is a list of taints CA considers to reflect transient node
// status that should be removed when creating a node template for scheduling.
// The status taints are expected to appear during node lifetime, after startup.
Expand Down
161 changes: 156 additions & 5 deletions cluster-autoscaler/core/scaledown/actuation/actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,11 +788,162 @@ func TestStartDeletion(t *testing.T) {
return true, nil, nil
})

// Hook node deletion at the level of cloud provider, to gather which nodes were deleted, and to fail the deletion for
// certain nodes to simulate errors.
provider := testprovider.NewTestCloudProvider(nil, func(nodeGroup string, node string) error {
if tc.failedNodeDeletion[node] {
return fmt.Errorf("SIMULATED ERROR: won't remove node")
// Set up other needed structures and options.
opts := config.AutoscalingOptions{
MaxScaleDownParallelism: 10,
MaxDrainParallelism: 5,
MaxPodEvictionTime: 0,
DaemonSetEvictionForEmptyNodes: true,
}

allPods := []*apiv1.Pod{}

for _, pods := range tc.pods {
allPods = append(allPods, pods...)
}

podLister := kube_util.NewTestPodLister(allPods)
pdbLister := kube_util.NewTestPodDisruptionBudgetLister([]*policyv1.PodDisruptionBudget{})
dsLister, err := kube_util.NewTestDaemonSetLister([]*appsv1.DaemonSet{ds})
if err != nil {
t.Fatalf("Couldn't create daemonset lister")
}

registry := kube_util.NewListerRegistry(nil, nil, podLister, pdbLister, dsLister, nil, nil, nil, nil)
ctx, err := NewScaleTestAutoscalingContext(opts, fakeClient, registry, provider, nil, nil)
if err != nil {
t.Fatalf("Couldn't set up autoscaling context: %v", err)
}
csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}))
for _, bucket := range emptyNodeGroupViews {
for _, node := range bucket.Nodes {
err := ctx.ClusterSnapshot.AddNodeWithPods(node, tc.pods[node.Name])
if err != nil {
t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err)
}
}
}
for _, bucket := range drainNodeGroupViews {
for _, node := range bucket.Nodes {
pods, found := tc.pods[node.Name]
if !found {
t.Fatalf("Drain node %q doesn't have pods defined in the test case.", node.Name)
}
err := ctx.ClusterSnapshot.AddNodeWithPods(node, pods)
if err != nil {
t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err)
}
}
}

wantScaleDownStatus := &status.ScaleDownStatus{
Result: tc.wantStatus.result,
}
for _, scaleDownNodeInfo := range tc.wantStatus.scaledDownNodes {
statusScaledDownNode := &status.ScaleDownNode{
Node: generateNode(scaleDownNodeInfo.name),
NodeGroup: tc.nodeGroups[scaleDownNodeInfo.nodeGroup],
EvictedPods: scaleDownNodeInfo.evictedPods,
UtilInfo: scaleDownNodeInfo.utilInfo,
}
wantScaleDownStatus.ScaledDownNodes = append(wantScaleDownStatus.ScaledDownNodes, statusScaledDownNode)
}

// Create Actuator, run StartDeletion, and verify the error.
ndt := deletiontracker.NewNodeDeletionTracker(0)
ndb := NewNodeDeletionBatcher(&ctx, csr, ndt, 0*time.Second)
evictor := Evictor{EvictionRetryTime: 0, DsEvictionRetryTime: 0, DsEvictionEmptyNodeTimeout: 0, PodEvictionHeadroom: DefaultPodEvictionHeadroom}
actuator := Actuator{
ctx: &ctx, clusterState: csr, nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(&ctx, ndt, ndb, evictor),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx),
configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(ctx.NodeGroupDefaults),
}
gotStatus, gotErr := actuator.StartDeletion(allEmptyNodes, allDrainNodes)
if diff := cmp.Diff(tc.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" {
t.Errorf("StartDeletion error diff (-want +got):\n%s", diff)
}

// Verify ScaleDownStatus looks as expected.
ignoreSdNodeOrder := cmpopts.SortSlices(func(a, b *status.ScaleDownNode) bool { return a.Node.Name < b.Node.Name })
ignoreTimestamps := cmpopts.IgnoreFields(status.ScaleDownStatus{}, "NodeDeleteResultsAsOf")
cmpNg := cmp.Comparer(func(a, b *testprovider.TestNodeGroup) bool { return a.Id() == b.Id() })
statusCmpOpts := cmp.Options{ignoreSdNodeOrder, ignoreTimestamps, cmpNg, cmpopts.EquateEmpty()}
if diff := cmp.Diff(wantScaleDownStatus, gotStatus, statusCmpOpts); diff != "" {
t.Errorf("StartDeletion status diff (-want +got):\n%s", diff)
}

// Verify that all expected nodes were deleted using the cloud provider hook.
var gotDeletedNodes []string
nodesLoop:
for i := 0; i < len(tc.wantDeletedNodes); i++ {
select {
case deletedNode := <-deletedNodes:
gotDeletedNodes = append(gotDeletedNodes, deletedNode)
case <-time.After(3 * time.Second):
t.Errorf("Timeout while waiting for deleted nodes.")
break nodesLoop
}
}
ignoreStrOrder := cmpopts.SortSlices(func(a, b string) bool { return a < b })
if diff := cmp.Diff(tc.wantDeletedNodes, gotDeletedNodes, ignoreStrOrder); diff != "" {
t.Errorf("deletedNodes diff (-want +got):\n%s", diff)
}

// Verify that all expected pods were deleted using the fake k8s client hook.
var gotDeletedPods []string
podsLoop:
for i := 0; i < len(tc.wantDeletedPods); i++ {
select {
case deletedPod := <-deletedPods:
gotDeletedPods = append(gotDeletedPods, deletedPod)
case <-time.After(3 * time.Second):
t.Errorf("Timeout while waiting for deleted pods.")
break podsLoop
}
}
if diff := cmp.Diff(tc.wantDeletedPods, gotDeletedPods, ignoreStrOrder); diff != "" {
t.Errorf("deletedPods diff (-want +got):\n%s", diff)
}

// Verify that all expected taint updates happened using the fake k8s client hook.
allUpdatesCount := 0
for _, updates := range tc.wantTaintUpdates {
allUpdatesCount += len(updates)
}
gotTaintUpdates := make(map[string][][]apiv1.Taint)
taintsLoop:
for i := 0; i < allUpdatesCount; i++ {
select {
case taintUpdate := <-taintUpdates:
gotTaintUpdates[taintUpdate.nodeName] = append(gotTaintUpdates[taintUpdate.nodeName], taintUpdate.taints)
case <-time.After(3 * time.Second):
t.Errorf("Timeout while waiting for taint updates.")
break taintsLoop
}
}
startupTaintValue := cmpopts.IgnoreFields(apiv1.Taint{}, "Value")
if diff := cmp.Diff(tc.wantTaintUpdates, gotTaintUpdates, startupTaintValue, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("taintUpdates diff (-want +got):\n%s", diff)
}

// Wait for all expected deletions to be reported in NodeDeletionTracker. Reporting happens shortly after the deletion
// in cloud provider we sync to above and so this will usually not wait at all. However, it can still happen
// that there is a delay between cloud provider deletion and reporting, in which case the results are not there yet
// and we need to wait for them before asserting.
err = waitForDeletionResultsCount(actuator.nodeDeletionTracker, len(tc.wantNodeDeleteResults), 3*time.Second, 200*time.Millisecond)
if err != nil {
t.Errorf("Timeout while waiting for node deletion results")
}

// Run StartDeletion again to gather node deletion results for deletions started in the previous call, and verify
// that they look as expected.
gotNextStatus, gotNextErr := actuator.StartDeletion(nil, nil)
if gotNextErr != nil {
t.Errorf("StartDeletion unexpected error: %v", gotNextErr)
}
if diff := cmp.Diff(tc.wantNodeDeleteResults, gotNextStatus.NodeDeleteResults, cmpopts.EquateEmpty(), cmpopts.EquateErrors()); diff != "" {
t.Errorf("NodeDeleteResults diff (-want +got):\n%s", diff)
}
deletedNodes <- node
return nil
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ func (a *StaticAutoscaler) obtainNodeLists(cp cloudprovider.CloudProvider) ([]*a
// our normal handling for booting up nodes deal with this.
// TODO: Remove this call when we handle dynamically provisioned resources.
allNodes, readyNodes = a.processors.CustomResourcesProcessor.FilterOutNodesWithUnreadyResources(a.AutoscalingContext, allNodes, readyNodes)
allNodes, readyNodes = taints.FilterOutNodesWithIgnoredTaints(a.taintConfig.IgnoredTaints, allNodes, readyNodes)
allNodes, readyNodes = taints.FilterOutNodesWithStartupTaints(a.taintConfig, allNodes, readyNodes)
return allNodes, readyNodes, nil
}

Expand Down
7 changes: 5 additions & 2 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ var (
regional = flag.Bool("regional", false, "Cluster is regional.")
newPodScaleUpDelay = flag.Duration("new-pod-scale-up-delay", 0*time.Second, "Pods less than this old will not be considered for scale-up. Can be increased for individual pods through annotation 'cluster-autoscaler.kubernetes.io/pod-scale-up-delay'.")

ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group")
ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Deprecated, use startup-taints instead)")
startupTaintsFlag = multiStringFlag("startup-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Equivalent to ignore-taint)")
statusTaintsFlag = multiStringFlag("status-taint", "Specifies a taint to ignore in node templates when considering to scale a node group but nodes will not be treated as unready")
balancingIgnoreLabelsFlag = multiStringFlag("balancing-ignore-label", "Specifies a label to ignore in addition to the basic and cloud-provider set of labels when comparing if two node groups are similar")
balancingLabelsFlag = multiStringFlag("balancing-label", "Specifies a label to use for comparing if two node groups are similar, rather than the built in heuristics. Setting this flag disables all other comparison logic, and cannot be combined with --balancing-ignore-label.")
awsUseStaticInstanceList = flag.Bool("aws-use-static-instance-list", false, "Should CA fetch instance types in runtime or use a static list. AWS only")
Expand Down Expand Up @@ -327,7 +329,8 @@ func createAutoscalingOptions() config.AutoscalingOptions {
ExpendablePodsPriorityCutoff: *expendablePodsPriorityCutoff,
Regional: *regional,
NewPodScaleUpDelay: *newPodScaleUpDelay,
IgnoredTaints: *ignoreTaintsFlag,
StartupTaints: append(*ignoreTaintsFlag, *startupTaintsFlag...),
StatusTaints: *statusTaintsFlag,
BalancingExtraIgnoredLabels: *balancingIgnoreLabelsFlag,
BalancingLabels: *balancingLabelsFlag,
KubeConfigPath: *kubeConfigFile,
Expand Down
6 changes: 3 additions & 3 deletions cluster-autoscaler/utils/kubernetes/ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ const (
// still upcoming due to a missing resource (e.g. GPU).
ResourceUnready NodeNotReadyReason = "cluster-autoscaler.kubernetes.io/resource-not-ready"

// IgnoreTaint is a fake identifier used internally by Cluster Autoscaler
// StartupNodes is a fake identifier used internally by Cluster Autoscaler
// to indicate nodes that appear Ready in the API, but are treated as
// still upcoming due to applied ignore taint.
IgnoreTaint NodeNotReadyReason = "cluster-autoscaler.kubernetes.io/ignore-taint"
// still upcoming due to applied startup taint.
StartupNodes NodeNotReadyReason = "cluster-autoscaler.kubernetes.io/startup-taint"
)

// IsNodeReadyAndSchedulable returns true if the node is ready and schedulable.
Expand Down
59 changes: 39 additions & 20 deletions cluster-autoscaler/utils/taints/taints.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ const (
// IgnoreTaintPrefix any taint starting with it will be filtered out from autoscaler template node.
IgnoreTaintPrefix = "ignore-taint.cluster-autoscaler.kubernetes.io/"

// StartupTaintPrefix (Same as IgnoreTaintPrefix) any taint starting with it will be filtered out from autoscaler template node.
StartupTaintPrefix = "startup-taint.cluster-autoscaler.kubernetes.io/"

// StatusTaintPrefix any taint starting with it will be filtered out from autoscaler template node but unlike IgnoreTaintPrefix & StartupTaintPrefix it should not be trated as unready.
StatusTaintPrefix = "status-taint.cluster-autoscaler.kubernetes.io/"

gkeNodeTerminationHandlerTaint = "cloud.google.com/impending-node-termination"

// AWS: Indicates that a node has volumes stuck in attaching state and hence it is not fit for scheduling more pods
Expand All @@ -57,16 +63,18 @@ type TaintKeySet map[string]bool

// TaintConfig is a config of taints that require special handling
type TaintConfig struct {
IgnoredTaints TaintKeySet
StatusTaints TaintKeySet
StartupTaints TaintKeySet
StatusTaints TaintKeySet
StartupTaintPrefixes []string
StatusTaintPrefixes []string
}

// NewTaintConfig returns the taint config extracted from options
func NewTaintConfig(opts config.AutoscalingOptions) TaintConfig {
ignoredTaints := make(TaintKeySet)
for _, taintKey := range opts.IgnoredTaints {
klog.V(4).Infof("Ignoring taint %s on all NodeGroups", taintKey)
ignoredTaints[taintKey] = true
startupTaints := make(TaintKeySet)
for _, taintKey := range opts.StartupTaints {
klog.V(4).Infof("Startup taint %s on all NodeGroups", taintKey)
startupTaints[taintKey] = true
}

statusTaints := make(TaintKeySet)
Expand All @@ -76,8 +84,10 @@ func NewTaintConfig(opts config.AutoscalingOptions) TaintConfig {
}

return TaintConfig{
IgnoredTaints: ignoredTaints,
StatusTaints: statusTaints,
StartupTaints: startupTaints,
StatusTaints: statusTaints,
StartupTaintPrefixes: []string{IgnoreTaintPrefix, StartupTaintPrefix},
StatusTaintPrefixes: []string{StatusTaintPrefix},
}
}

Expand Down Expand Up @@ -319,6 +329,15 @@ func CleanAllTaints(nodes []*apiv1.Node, client kube_client.Interface, recorder
}
}

func matchesAnyPrefix(prefixes []string, key string) bool {
for _, prefix := range prefixes {
if strings.HasPrefix(key, prefix) {
return true
}
}
return false
}

// SanitizeTaints returns filtered taints
func SanitizeTaints(taints []apiv1.Taint, taintConfig TaintConfig) []apiv1.Taint {
var newTaints []apiv1.Taint
Expand All @@ -344,12 +363,12 @@ func SanitizeTaints(taints []apiv1.Taint, taintConfig TaintConfig) []apiv1.Taint
continue
}

if _, exists := taintConfig.IgnoredTaints[taint.Key]; exists {
klog.V(4).Infof("Removing ignored taint %s, when creating template from node", taint.Key)
if _, exists := taintConfig.StartupTaints[taint.Key]; exists {
klog.V(4).Infof("Removing startup taint %s, when creating template from node", taint.Key)
continue
}

if strings.HasPrefix(taint.Key, IgnoreTaintPrefix) {
shouldRemoveBasedOnPrefix := matchesAnyPrefix(taintConfig.StartupTaintPrefixes, taint.Key) || matchesAnyPrefix(taintConfig.StatusTaintPrefixes, taint.Key)
if shouldRemoveBasedOnPrefix {
klog.V(4).Infof("Removing taint %s based on prefix, when creation template from node", taint.Key)
continue
}
Expand All @@ -364,24 +383,24 @@ func SanitizeTaints(taints []apiv1.Taint, taintConfig TaintConfig) []apiv1.Taint
return newTaints
}

// FilterOutNodesWithIgnoredTaints override the condition status of the given nodes to mark them as NotReady when they have
// FilterOutNodesWithStartupTaints override the condition status of the given nodes to mark them as NotReady when they have
// filtered taints.
func FilterOutNodesWithIgnoredTaints(ignoredTaints TaintKeySet, allNodes, readyNodes []*apiv1.Node) ([]*apiv1.Node, []*apiv1.Node) {
func FilterOutNodesWithStartupTaints(taintConfig TaintConfig, allNodes, readyNodes []*apiv1.Node) ([]*apiv1.Node, []*apiv1.Node) {
newAllNodes := make([]*apiv1.Node, 0)
newReadyNodes := make([]*apiv1.Node, 0)
nodesWithIgnoredTaints := make(map[string]*apiv1.Node)
nodesWithStartupTaints := make(map[string]*apiv1.Node)
for _, node := range readyNodes {
if len(node.Spec.Taints) == 0 {
newReadyNodes = append(newReadyNodes, node)
continue
}
ready := true
for _, t := range node.Spec.Taints {
_, hasIgnoredTaint := ignoredTaints[t.Key]
if hasIgnoredTaint || strings.HasPrefix(t.Key, IgnoreTaintPrefix) {
_, hasStartupTaint := taintConfig.StartupTaints[t.Key]
if hasStartupTaint || matchesAnyPrefix(taintConfig.StartupTaintPrefixes, t.Key) {
ready = false
nodesWithIgnoredTaints[node.Name] = kubernetes.GetUnreadyNodeCopy(node, kubernetes.IgnoreTaint)
klog.V(3).Infof("Overriding status of node %v, which seems to have ignored taint %q", node.Name, t.Key)
nodesWithStartupTaints[node.Name] = kubernetes.GetUnreadyNodeCopy(node, kubernetes.StartupNodes)
klog.V(3).Infof("Overriding status of node %v, which seems to have startup taint %q", node.Name, t.Key)
break
}
}
Expand All @@ -391,7 +410,7 @@ func FilterOutNodesWithIgnoredTaints(ignoredTaints TaintKeySet, allNodes, readyN
}
// Override any node with ignored taint with its "unready" copy
for _, node := range allNodes {
if newNode, found := nodesWithIgnoredTaints[node.Name]; found {
if newNode, found := nodesWithStartupTaints[node.Name]; found {
newAllNodes = append(newAllNodes, newNode)
} else {
newAllNodes = append(newAllNodes, node)
Expand Down
Loading

0 comments on commit d9c7e26

Please sign in to comment.