Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RayCluster][Fix] leave .Status.State untouched when there is a reconcile error #2622

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1284,8 +1284,11 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
newInstance.Status.DesiredGPU = sumGPUs(totalResources)
newInstance.Status.DesiredTPU = totalResources[corev1.ResourceName("google.com/tpu")]

if utils.CheckAllPodsRunning(ctx, runtimePods) {
newInstance.Status.State = rayv1.Ready //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288
if reconcileErr == nil && len(runtimePods.Items) == int(newInstance.Status.DesiredWorkerReplicas)+1 { // workers + 1 head
if utils.CheckAllPodsRunning(ctx, runtimePods) {
newInstance.Status.State = rayv1.Ready //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288
newInstance.Status.Reason = ""
}
}

// Check if the head node is running and ready by checking the head pod's status or if the cluster has been suspended.
Expand Down
199 changes: 185 additions & 14 deletions ray-operator/controllers/ray/raycluster_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"errors"
"os"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -1703,27 +1704,43 @@ func TestCalculateStatus(t *testing.T) {
headService, err := common.BuildServiceForHeadPod(context.Background(), *testRayCluster, nil, nil)
assert.Nil(t, err, "Failed to build head service.")
headService.Spec.ClusterIP = headServiceIP
podReadyStatus := corev1.PodStatus{
PodIP: headNodeIP,
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
},
},
}
headLabel := map[string]string{
utils.RayClusterLabelKey: instanceName,
utils.RayNodeTypeLabelKey: string(rayv1.HeadNode),
}
workerLabel := map[string]string{
utils.RayClusterLabelKey: instanceName,
utils.RayNodeTypeLabelKey: string(rayv1.WorkerNode),
}
headPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "headNode",
Namespace: namespaceStr,
Labels: map[string]string{
utils.RayClusterLabelKey: instanceName,
utils.RayNodeTypeLabelKey: string(rayv1.HeadNode),
},
},
Status: corev1.PodStatus{
PodIP: headNodeIP,
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
},
},
Labels: headLabel,
},
Status: podReadyStatus,
}
runtimeObjects := []runtime.Object{headPod, headService}
for i := int32(0); i < expectReplicaNum; i++ {
runtimeObjects = append(runtimeObjects, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "workerNode-" + strconv.Itoa(int(i)),
Namespace: namespaceStr,
Labels: workerLabel,
},
Status: podReadyStatus,
})
}

// Initialize a fake client with newScheme and runtimeObjects.
fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()
Expand Down Expand Up @@ -1784,6 +1801,160 @@ func TestCalculateStatus(t *testing.T) {
assert.True(t, meta.IsStatusConditionPresentAndEqual(newInstance.Status.Conditions, string(rayv1.RayClusterReplicaFailure), metav1.ConditionTrue))
}

// TestCalculateStatusWithoutDesiredReplicas tests that the cluster CR should not be marked as Ready if
// DesiredWorkerReplicas > 0 and DesiredWorkerReplicas != ReadyWorkerReplicas
func TestCalculateStatusWithoutDesiredReplicas(t *testing.T) {
setupTest(t)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Add comments to explain this test.
  2. Add assertions to ensure that testRayCluster fulfills the assumptions made by this test (i.e., the worker group's replicas should not be 0).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// Create a new scheme with CRDs, Pod, Service schemes.
newScheme := runtime.NewScheme()
_ = rayv1.AddToScheme(newScheme)
_ = corev1.AddToScheme(newScheme)

// Mock data
headServiceIP := "aaa.bbb.ccc.ddd"
headService, err := common.BuildServiceForHeadPod(context.Background(), *testRayCluster, nil, nil)
assert.Nil(t, err, "Failed to build head service.")
headService.Spec.ClusterIP = headServiceIP
headPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "headNode",
Namespace: namespaceStr,
Labels: map[string]string{
utils.RayClusterLabelKey: instanceName,
utils.RayNodeTypeLabelKey: string(rayv1.HeadNode),
},
},
Status: corev1.PodStatus{
PodIP: headNodeIP,
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
},
},
},
}
runtimeObjects := []runtime.Object{headPod, headService}

// Initialize a fake client with newScheme and runtimeObjects.
fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()
ctx := context.Background()

// Initialize a RayCluster reconciler.
r := &RayClusterReconciler{
Client: fakeClient,
Recorder: &record.FakeRecorder{},
Scheme: scheme.Scheme,
}

newInstance, err := r.calculateStatus(ctx, testRayCluster, nil)
assert.Nil(t, err)
assert.NotEqual(t, newInstance.Status.DesiredWorkerReplicas, 0)
assert.NotEqual(t, newInstance.Status.DesiredWorkerReplicas, newInstance.Status.ReadyWorkerReplicas)
assert.Equal(t, newInstance.Status.State, rayv1.ClusterState("")) //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288
assert.Equal(t, newInstance.Status.Reason, "")
assert.Nil(t, newInstance.Status.StateTransitionTimes)
}

// TestCalculateStatusWithReconcileErrorBackAndForth tests that the cluster CR should not be marked as Ready if reconcileErr != nil
// and the Ready state should not be removed after being Ready even if reconcileErr != nil
func TestCalculateStatusWithReconcileErrorBackAndForth(t *testing.T) {
setupTest(t)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// Create a new scheme with CRDs, Pod, Service schemes.
newScheme := runtime.NewScheme()
_ = rayv1.AddToScheme(newScheme)
_ = corev1.AddToScheme(newScheme)

// Mock data
headServiceIP := "aaa.bbb.ccc.ddd"
headService, err := common.BuildServiceForHeadPod(context.Background(), *testRayCluster, nil, nil)
assert.Nil(t, err, "Failed to build head service.")
headService.Spec.ClusterIP = headServiceIP
podReadyStatus := corev1.PodStatus{
PodIP: headNodeIP,
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
},
},
}
headLabel := map[string]string{
utils.RayClusterLabelKey: instanceName,
utils.RayNodeTypeLabelKey: string(rayv1.HeadNode),
}
workerLabel := map[string]string{
utils.RayClusterLabelKey: instanceName,
utils.RayNodeTypeLabelKey: string(rayv1.WorkerNode),
}
headPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "headNode",
Namespace: namespaceStr,
Labels: headLabel,
},
Status: podReadyStatus,
}
runtimeObjects := []runtime.Object{headPod, headService}
for i := int32(0); i < expectReplicaNum; i++ {
runtimeObjects = append(runtimeObjects, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "workerNode-" + strconv.Itoa(int(i)),
Namespace: namespaceStr,
Labels: workerLabel,
},
Status: podReadyStatus,
})
}

// Initialize a fake client with newScheme and runtimeObjects.
fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()
ctx := context.Background()

// Initialize a RayCluster reconciler.
r := &RayClusterReconciler{
Client: fakeClient,
Recorder: &record.FakeRecorder{},
Scheme: scheme.Scheme,
}

// Test head information with a reconcile error
newInstance, err := r.calculateStatus(ctx, testRayCluster, errors.New("invalid"))
assert.Nil(t, err)
assert.NotEqual(t, newInstance.Status.DesiredWorkerReplicas, 0)
// Note that even if there are DesiredWorkerReplicas ready, we don't mark CR to be Ready state due to the reconcile error.
assert.Equal(t, newInstance.Status.DesiredWorkerReplicas, newInstance.Status.ReadyWorkerReplicas)
assert.Equal(t, newInstance.Status.State, rayv1.ClusterState("")) //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288
assert.Equal(t, newInstance.Status.Reason, "")
assert.Nil(t, newInstance.Status.StateTransitionTimes)

// Test head information without a reconcile error
newInstance, err = r.calculateStatus(ctx, newInstance, nil)
assert.Nil(t, err)
assert.NotEqual(t, newInstance.Status.DesiredWorkerReplicas, 0)
assert.Equal(t, newInstance.Status.DesiredWorkerReplicas, newInstance.Status.ReadyWorkerReplicas)
assert.Equal(t, newInstance.Status.State, rayv1.Ready) //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288
assert.Equal(t, newInstance.Status.Reason, "")
assert.NotNil(t, newInstance.Status.StateTransitionTimes)
assert.NotNil(t, newInstance.Status.StateTransitionTimes[rayv1.Ready])
t1 := newInstance.Status.StateTransitionTimes[rayv1.Ready]

// Test head information with a reconcile error again
newInstance, err = r.calculateStatus(ctx, newInstance, errors.New("invalid2"))
assert.Nil(t, err)
assert.NotEqual(t, newInstance.Status.DesiredWorkerReplicas, 0)
assert.Equal(t, newInstance.Status.DesiredWorkerReplicas, newInstance.Status.ReadyWorkerReplicas)
assert.Equal(t, newInstance.Status.State, rayv1.Ready) //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288
assert.Equal(t, newInstance.Status.Reason, "")
assert.NotNil(t, newInstance.Status.StateTransitionTimes)
assert.NotNil(t, newInstance.Status.StateTransitionTimes[rayv1.Ready])
assert.Equal(t, t1, newInstance.Status.StateTransitionTimes[rayv1.Ready]) // no change to StateTransitionTimes
}

func TestRayClusterProvisionedCondition(t *testing.T) {
setupTest(t)
assert.True(t, features.Enabled(features.RayClusterStatusConditions))
Expand Down
Loading