From b20bdfaa2dd6865dac873b37d55ebff9ca5524ff Mon Sep 17 00:00:00 2001 From: Chi-Sheng Liu Date: Mon, 2 Dec 2024 23:52:57 +0800 Subject: [PATCH 1/4] [RayService][Refactor] Change the ServeConfigs to nested map Closes: ray-project/kuberay#2550 Signed-off-by: Chi-Sheng Liu --- .../controllers/ray/rayservice_controller.go | 67 +++++++++++-------- .../ray/rayservice_controller_unit_test.go | 13 ++-- 2 files changed, 45 insertions(+), 35 deletions(-) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index 124f49db4f..ad5b6f4736 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -57,7 +57,7 @@ type RayServiceReconciler struct { Recorder record.EventRecorder // Currently, the Ray dashboard doesn't cache the Serve deployment config. // To avoid reapplying the same config repeatedly, cache the config in this map. - ServeConfigs cmap.ConcurrentMap[string, string] + ServeConfigs cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, string]] RayClusterDeletionTimestamps cmap.ConcurrentMap[string, time.Time] dashboardClientFunc func() utils.RayDashboardClientInterface @@ -72,7 +72,7 @@ func NewRayServiceReconciler(_ context.Context, mgr manager.Manager, provider ut Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Recorder: mgr.GetEventRecorderFor("rayservice-controller"), - ServeConfigs: cmap.New[string](), + ServeConfigs: cmap.New[cmap.ConcurrentMap[string, string]](), RayClusterDeletionTimestamps: cmap.New[time.Time](), dashboardClientFunc: dashboardClientFunc, @@ -529,21 +529,21 @@ func (r *RayServiceReconciler) getRayClusterByNamespacedName(ctx context.Context // cleanUpServeConfigCache cleans up the unused serve deployments config in the cached map. func (r *RayServiceReconciler) cleanUpServeConfigCache(ctx context.Context, rayServiceInstance *rayv1.RayService) { logger := ctrl.LoggerFrom(ctx) - activeConfigKey := r.generateConfigKey(rayServiceInstance, rayServiceInstance.Status.ActiveServiceStatus.RayClusterName) - pendingConfigKey := r.generateConfigKey(rayServiceInstance, rayServiceInstance.Status.PendingServiceStatus.RayClusterName) - configPrefix := r.generateConfigKeyPrefix(rayServiceInstance) + activeRayClusterName := rayServiceInstance.Status.ActiveServiceStatus.RayClusterName + pendingRayClusterName := rayServiceInstance.Status.PendingServiceStatus.RayClusterName - // Clean up RayCluster serve deployment configs. - for key := range r.ServeConfigs.Items() { - if key == activeConfigKey || key == pendingConfigKey { - continue - } - if !strings.HasPrefix(key, configPrefix) { - // Skip configs owned by other RayService Instance. + cacheKey := rayServiceInstance.Namespace + "/" + rayServiceInstance.Name + serveConfigs, exist := r.ServeConfigs.Get(cacheKey) + if !exist { + return + } + + for key := range serveConfigs.Items() { + if key == activeRayClusterName || key == pendingRayClusterName { continue } - logger.Info("cleanUpServeConfigCache", "activeConfigKey", activeConfigKey, "pendingConfigKey", pendingConfigKey, "remove key", key) - r.ServeConfigs.Remove(key) + logger.Info("cleanUpServeConfigCache", "activeRayClusterName", activeRayClusterName, "pendingRayClusterName", pendingRayClusterName, "remove key", key) + serveConfigs.Remove(key) } } @@ -803,16 +803,13 @@ func (r *RayServiceReconciler) checkIfNeedSubmitServeDeployment(ctx context.Cont logger := ctrl.LoggerFrom(ctx) // If the Serve config has not been cached, update the Serve config. - cacheKey := r.generateConfigKey(rayServiceInstance, rayClusterInstance.Name) - cachedServeConfigV2, exist := r.ServeConfigs.Get(cacheKey) - - if !exist { + cachedServeConfigV2 := r.getServeConfigFromCache(rayServiceInstance, rayClusterInstance.Name) + if cachedServeConfigV2 == "" { logger.Info( "shouldUpdate", "shouldUpdateServe", true, - "reason", "Nothing has been cached for the cluster with the key", + "reason", "Nothing has been cached for the cluster", "rayClusterName", rayClusterInstance.Name, - "cacheKey", cacheKey, ) return true } @@ -838,7 +835,7 @@ func (r *RayServiceReconciler) checkIfNeedSubmitServeDeployment(ctx context.Cont if cachedServeConfigV2 != rayServiceInstance.Spec.ServeConfigV2 { shouldUpdate = true - reason = fmt.Sprintf("Current V2 Serve config doesn't match cached Serve config for cluster %s with key %s", rayClusterInstance.Name, cacheKey) + reason = fmt.Sprintf("Current V2 Serve config doesn't match cached Serve config for cluster %s", rayClusterInstance.Name) } logger.Info("shouldUpdate", "shouldUpdateServe", shouldUpdate, "reason", reason, "cachedServeConfig", cachedServeConfigV2, "current Serve config", rayServiceInstance.Spec.ServeConfigV2) @@ -868,9 +865,8 @@ func (r *RayServiceReconciler) updateServeDeployment(ctx context.Context, raySer return err } - cacheKey := r.generateConfigKey(rayServiceInstance, clusterName) - r.ServeConfigs.Set(cacheKey, rayServiceInstance.Spec.ServeConfigV2) - logger.Info("updateServeDeployment", "message", "Cached Serve config for Ray cluster with the key", "rayClusterName", clusterName, "cacheKey", cacheKey) + r.cacheServeConfig(rayServiceInstance, clusterName) + logger.Info("updateServeDeployment", "message", "Cached Serve config for Ray cluster with the key", "rayClusterName", clusterName) return nil } @@ -960,12 +956,27 @@ func (r *RayServiceReconciler) getAndCheckServeStatus(ctx context.Context, dashb return isReady, nil } -func (r *RayServiceReconciler) generateConfigKey(rayServiceInstance *rayv1.RayService, clusterName string) string { - return r.generateConfigKeyPrefix(rayServiceInstance) + clusterName +func (r *RayServiceReconciler) getServeConfigFromCache(rayServiceInstance *rayv1.RayService, clusterName string) string { + cacheKey := rayServiceInstance.Namespace + "/" + rayServiceInstance.Name + serveConfigs, exist := r.ServeConfigs.Get(cacheKey) + if !exist { + return "" + } + serveConfig, exist := serveConfigs.Get(clusterName) + if !exist { + return "" + } + return serveConfig } -func (r *RayServiceReconciler) generateConfigKeyPrefix(rayServiceInstance *rayv1.RayService) string { - return rayServiceInstance.Namespace + "/" + rayServiceInstance.Name + "/" +func (r *RayServiceReconciler) cacheServeConfig(rayServiceInstance *rayv1.RayService, clusterName string) { + cacheKey := rayServiceInstance.Namespace + "/" + rayServiceInstance.Name + rayServiceServeConfigs, exist := r.ServeConfigs.Get(cacheKey) + if !exist { + rayServiceServeConfigs = cmap.New[string]() + r.ServeConfigs.Set(cacheKey, rayServiceServeConfigs) + } + rayServiceServeConfigs.Set(clusterName, rayServiceInstance.Spec.ServeConfigV2) } func (r *RayServiceReconciler) markRestartAndAddPendingClusterName(ctx context.Context, rayServiceInstance *rayv1.RayService) { diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index 9c1aed7394..dec5cae8be 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -667,7 +667,7 @@ func TestCheckIfNeedSubmitServeDeployment(t *testing.T) { Client: fakeClient, Recorder: &record.FakeRecorder{}, Scheme: scheme.Scheme, - ServeConfigs: cmap.New[string](), + ServeConfigs: cmap.New[cmap.ConcurrentMap[string, string]](), } namespace := "ray" @@ -703,22 +703,21 @@ applications: // Test 1: The RayCluster is new, and this is the first reconciliation after the RayCluster becomes ready. // No Serve application has been created yet, so the RayService's serve configuration has not been cached in // `r.ServeConfigs`. - cacheKey := r.generateConfigKey(&rayService, cluster.Name) - _, exist := r.ServeConfigs.Get(cacheKey) - assert.False(t, exist) + serveConfig := r.getServeConfigFromCache(&rayService, cluster.Name) + assert.Empty(t, serveConfig) shouldCreate := r.checkIfNeedSubmitServeDeployment(ctx, &rayService, &cluster, &rayv1.RayServiceStatus{}) assert.True(t, shouldCreate) // Test 2: The RayCluster is not new, but the head Pod without GCS FT-enabled crashes and restarts. // Hence, the RayService's Serve application status is empty, but the KubeRay operator has cached the Serve // application's configuration. - r.ServeConfigs.Set(cacheKey, rayService.Spec.ServeConfigV2) // Simulate the Serve application's configuration has been cached. + r.cacheServeConfig(&rayService, cluster.Name) // Simulate the Serve application's configuration has been cached. shouldCreate = r.checkIfNeedSubmitServeDeployment(ctx, &rayService, &cluster, &rayv1.RayServiceStatus{}) assert.True(t, shouldCreate) // Test 3: The Serve application has been created, and the RayService's status has been updated. - _, exist = r.ServeConfigs.Get(cacheKey) - assert.True(t, exist) + serveConfig = r.getServeConfigFromCache(&rayService, cluster.Name) + assert.NotEmpty(t, serveConfig) serveStatus := rayv1.RayServiceStatus{ Applications: map[string]rayv1.AppStatus{ "myapp": { From 68471d67addf754324fc604630df37a6130392b8 Mon Sep 17 00:00:00 2001 From: Chi-Sheng Liu Date: Tue, 3 Dec 2024 14:02:52 +0800 Subject: [PATCH 2/4] Address comment: Prevent cache empty serveConfig Closes: ray-project/kuberay#2550 Signed-off-by: Chi-Sheng Liu --- ray-operator/controllers/ray/rayservice_controller.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index ad5b6f4736..fe4c6dbcb0 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -970,13 +970,17 @@ func (r *RayServiceReconciler) getServeConfigFromCache(rayServiceInstance *rayv1 } func (r *RayServiceReconciler) cacheServeConfig(rayServiceInstance *rayv1.RayService, clusterName string) { + serveConfig := rayServiceInstance.Spec.ServeConfigV2 + if serveConfig == "" { + return + } cacheKey := rayServiceInstance.Namespace + "/" + rayServiceInstance.Name rayServiceServeConfigs, exist := r.ServeConfigs.Get(cacheKey) if !exist { rayServiceServeConfigs = cmap.New[string]() r.ServeConfigs.Set(cacheKey, rayServiceServeConfigs) } - rayServiceServeConfigs.Set(clusterName, rayServiceInstance.Spec.ServeConfigV2) + rayServiceServeConfigs.Set(clusterName, serveConfig) } func (r *RayServiceReconciler) markRestartAndAddPendingClusterName(ctx context.Context, rayServiceInstance *rayv1.RayService) { From b2cb82a34bc9bc6cf3daaeebd5362575def4c231 Mon Sep 17 00:00:00 2001 From: Chi-Sheng Liu Date: Wed, 4 Dec 2024 09:36:40 +0800 Subject: [PATCH 3/4] Address comment: Add comment for ServeConfigs Closes: ray-project/kuberay#2550 Signed-off-by: Chi-Sheng Liu --- ray-operator/controllers/ray/rayservice_controller.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index fe4c6dbcb0..3772e5e225 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -57,6 +57,8 @@ type RayServiceReconciler struct { Recorder record.EventRecorder // Currently, the Ray dashboard doesn't cache the Serve deployment config. // To avoid reapplying the same config repeatedly, cache the config in this map. + // Stores map of cacheKey to map of RayCluster name to Serve deployment config, + // where cacheKey is the combination of RayService namespace and name. ServeConfigs cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, string]] RayClusterDeletionTimestamps cmap.ConcurrentMap[string, time.Time] From 27271d57aa8bccafcef2468433220c2a2e82a5f3 Mon Sep 17 00:00:00 2001 From: Chi-Sheng Liu Date: Wed, 4 Dec 2024 09:50:29 +0800 Subject: [PATCH 4/4] Address comment: Change comments Closes: ray-project/kuberay#2550 Signed-off-by: Chi-Sheng Liu --- ray-operator/controllers/ray/rayservice_controller.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index 3772e5e225..cee733be4c 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -55,9 +55,9 @@ type RayServiceReconciler struct { client.Client Scheme *runtime.Scheme Recorder record.EventRecorder - // Currently, the Ray dashboard doesn't cache the Serve deployment config. + // Currently, the Ray dashboard doesn't cache the Serve application config. // To avoid reapplying the same config repeatedly, cache the config in this map. - // Stores map of cacheKey to map of RayCluster name to Serve deployment config, + // Stores map of cacheKey to map of RayCluster name to Serve application config, // where cacheKey is the combination of RayService namespace and name. ServeConfigs cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, string]] RayClusterDeletionTimestamps cmap.ConcurrentMap[string, time.Time] @@ -528,7 +528,7 @@ func (r *RayServiceReconciler) getRayClusterByNamespacedName(ctx context.Context return rayCluster, nil } -// cleanUpServeConfigCache cleans up the unused serve deployments config in the cached map. +// cleanUpServeConfigCache cleans up the unused serve applications config in the cached map. func (r *RayServiceReconciler) cleanUpServeConfigCache(ctx context.Context, rayServiceInstance *rayv1.RayService) { logger := ctrl.LoggerFrom(ctx) activeRayClusterName := rayServiceInstance.Status.ActiveServiceStatus.RayClusterName @@ -1131,7 +1131,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns if err != nil { logger.Error(err, "Failed to check if head Pod is running and ready!") } else { - logger.Info("Skipping the update of Serve deployments because the Ray head Pod is not ready.") + logger.Info("Skipping the update of Serve applications because the Ray head Pod is not ready.") } return false, err } @@ -1172,7 +1172,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns if err := r.Status().Update(ctx, rayServiceInstance); err != nil { return false, err } - logger.Info("Mark cluster as waiting for Serve deployments", "rayCluster", rayClusterInstance) + logger.Info("Mark cluster as waiting for Serve applications", "rayCluster", rayClusterInstance) } return isReady, nil