diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index 124f49db4f..cee733be4c 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -55,9 +55,11 @@ type RayServiceReconciler struct { client.Client Scheme *runtime.Scheme Recorder record.EventRecorder - // Currently, the Ray dashboard doesn't cache the Serve deployment config. + // Currently, the Ray dashboard doesn't cache the Serve application config. // To avoid reapplying the same config repeatedly, cache the config in this map. - ServeConfigs cmap.ConcurrentMap[string, string] + // Stores map of cacheKey to map of RayCluster name to Serve application config, + // where cacheKey is the combination of RayService namespace and name. + ServeConfigs cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, string]] RayClusterDeletionTimestamps cmap.ConcurrentMap[string, time.Time] dashboardClientFunc func() utils.RayDashboardClientInterface @@ -72,7 +74,7 @@ func NewRayServiceReconciler(_ context.Context, mgr manager.Manager, provider ut Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Recorder: mgr.GetEventRecorderFor("rayservice-controller"), - ServeConfigs: cmap.New[string](), + ServeConfigs: cmap.New[cmap.ConcurrentMap[string, string]](), RayClusterDeletionTimestamps: cmap.New[time.Time](), dashboardClientFunc: dashboardClientFunc, @@ -526,24 +528,24 @@ func (r *RayServiceReconciler) getRayClusterByNamespacedName(ctx context.Context return rayCluster, nil } -// cleanUpServeConfigCache cleans up the unused serve deployments config in the cached map. +// cleanUpServeConfigCache cleans up the unused serve applications config in the cached map. func (r *RayServiceReconciler) cleanUpServeConfigCache(ctx context.Context, rayServiceInstance *rayv1.RayService) { logger := ctrl.LoggerFrom(ctx) - activeConfigKey := r.generateConfigKey(rayServiceInstance, rayServiceInstance.Status.ActiveServiceStatus.RayClusterName) - pendingConfigKey := r.generateConfigKey(rayServiceInstance, rayServiceInstance.Status.PendingServiceStatus.RayClusterName) - configPrefix := r.generateConfigKeyPrefix(rayServiceInstance) + activeRayClusterName := rayServiceInstance.Status.ActiveServiceStatus.RayClusterName + pendingRayClusterName := rayServiceInstance.Status.PendingServiceStatus.RayClusterName - // Clean up RayCluster serve deployment configs. - for key := range r.ServeConfigs.Items() { - if key == activeConfigKey || key == pendingConfigKey { - continue - } - if !strings.HasPrefix(key, configPrefix) { - // Skip configs owned by other RayService Instance. + cacheKey := rayServiceInstance.Namespace + "/" + rayServiceInstance.Name + serveConfigs, exist := r.ServeConfigs.Get(cacheKey) + if !exist { + return + } + + for key := range serveConfigs.Items() { + if key == activeRayClusterName || key == pendingRayClusterName { continue } - logger.Info("cleanUpServeConfigCache", "activeConfigKey", activeConfigKey, "pendingConfigKey", pendingConfigKey, "remove key", key) - r.ServeConfigs.Remove(key) + logger.Info("cleanUpServeConfigCache", "activeRayClusterName", activeRayClusterName, "pendingRayClusterName", pendingRayClusterName, "remove key", key) + serveConfigs.Remove(key) } } @@ -803,16 +805,13 @@ func (r *RayServiceReconciler) checkIfNeedSubmitServeDeployment(ctx context.Cont logger := ctrl.LoggerFrom(ctx) // If the Serve config has not been cached, update the Serve config. - cacheKey := r.generateConfigKey(rayServiceInstance, rayClusterInstance.Name) - cachedServeConfigV2, exist := r.ServeConfigs.Get(cacheKey) - - if !exist { + cachedServeConfigV2 := r.getServeConfigFromCache(rayServiceInstance, rayClusterInstance.Name) + if cachedServeConfigV2 == "" { logger.Info( "shouldUpdate", "shouldUpdateServe", true, - "reason", "Nothing has been cached for the cluster with the key", + "reason", "Nothing has been cached for the cluster", "rayClusterName", rayClusterInstance.Name, - "cacheKey", cacheKey, ) return true } @@ -838,7 +837,7 @@ func (r *RayServiceReconciler) checkIfNeedSubmitServeDeployment(ctx context.Cont if cachedServeConfigV2 != rayServiceInstance.Spec.ServeConfigV2 { shouldUpdate = true - reason = fmt.Sprintf("Current V2 Serve config doesn't match cached Serve config for cluster %s with key %s", rayClusterInstance.Name, cacheKey) + reason = fmt.Sprintf("Current V2 Serve config doesn't match cached Serve config for cluster %s", rayClusterInstance.Name) } logger.Info("shouldUpdate", "shouldUpdateServe", shouldUpdate, "reason", reason, "cachedServeConfig", cachedServeConfigV2, "current Serve config", rayServiceInstance.Spec.ServeConfigV2) @@ -868,9 +867,8 @@ func (r *RayServiceReconciler) updateServeDeployment(ctx context.Context, raySer return err } - cacheKey := r.generateConfigKey(rayServiceInstance, clusterName) - r.ServeConfigs.Set(cacheKey, rayServiceInstance.Spec.ServeConfigV2) - logger.Info("updateServeDeployment", "message", "Cached Serve config for Ray cluster with the key", "rayClusterName", clusterName, "cacheKey", cacheKey) + r.cacheServeConfig(rayServiceInstance, clusterName) + logger.Info("updateServeDeployment", "message", "Cached Serve config for Ray cluster with the key", "rayClusterName", clusterName) return nil } @@ -960,12 +958,31 @@ func (r *RayServiceReconciler) getAndCheckServeStatus(ctx context.Context, dashb return isReady, nil } -func (r *RayServiceReconciler) generateConfigKey(rayServiceInstance *rayv1.RayService, clusterName string) string { - return r.generateConfigKeyPrefix(rayServiceInstance) + clusterName +func (r *RayServiceReconciler) getServeConfigFromCache(rayServiceInstance *rayv1.RayService, clusterName string) string { + cacheKey := rayServiceInstance.Namespace + "/" + rayServiceInstance.Name + serveConfigs, exist := r.ServeConfigs.Get(cacheKey) + if !exist { + return "" + } + serveConfig, exist := serveConfigs.Get(clusterName) + if !exist { + return "" + } + return serveConfig } -func (r *RayServiceReconciler) generateConfigKeyPrefix(rayServiceInstance *rayv1.RayService) string { - return rayServiceInstance.Namespace + "/" + rayServiceInstance.Name + "/" +func (r *RayServiceReconciler) cacheServeConfig(rayServiceInstance *rayv1.RayService, clusterName string) { + serveConfig := rayServiceInstance.Spec.ServeConfigV2 + if serveConfig == "" { + return + } + cacheKey := rayServiceInstance.Namespace + "/" + rayServiceInstance.Name + rayServiceServeConfigs, exist := r.ServeConfigs.Get(cacheKey) + if !exist { + rayServiceServeConfigs = cmap.New[string]() + r.ServeConfigs.Set(cacheKey, rayServiceServeConfigs) + } + rayServiceServeConfigs.Set(clusterName, serveConfig) } func (r *RayServiceReconciler) markRestartAndAddPendingClusterName(ctx context.Context, rayServiceInstance *rayv1.RayService) { @@ -1114,7 +1131,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns if err != nil { logger.Error(err, "Failed to check if head Pod is running and ready!") } else { - logger.Info("Skipping the update of Serve deployments because the Ray head Pod is not ready.") + logger.Info("Skipping the update of Serve applications because the Ray head Pod is not ready.") } return false, err } @@ -1155,7 +1172,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns if err := r.Status().Update(ctx, rayServiceInstance); err != nil { return false, err } - logger.Info("Mark cluster as waiting for Serve deployments", "rayCluster", rayClusterInstance) + logger.Info("Mark cluster as waiting for Serve applications", "rayCluster", rayClusterInstance) } return isReady, nil diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index 9c1aed7394..dec5cae8be 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -667,7 +667,7 @@ func TestCheckIfNeedSubmitServeDeployment(t *testing.T) { Client: fakeClient, Recorder: &record.FakeRecorder{}, Scheme: scheme.Scheme, - ServeConfigs: cmap.New[string](), + ServeConfigs: cmap.New[cmap.ConcurrentMap[string, string]](), } namespace := "ray" @@ -703,22 +703,21 @@ applications: // Test 1: The RayCluster is new, and this is the first reconciliation after the RayCluster becomes ready. // No Serve application has been created yet, so the RayService's serve configuration has not been cached in // `r.ServeConfigs`. - cacheKey := r.generateConfigKey(&rayService, cluster.Name) - _, exist := r.ServeConfigs.Get(cacheKey) - assert.False(t, exist) + serveConfig := r.getServeConfigFromCache(&rayService, cluster.Name) + assert.Empty(t, serveConfig) shouldCreate := r.checkIfNeedSubmitServeDeployment(ctx, &rayService, &cluster, &rayv1.RayServiceStatus{}) assert.True(t, shouldCreate) // Test 2: The RayCluster is not new, but the head Pod without GCS FT-enabled crashes and restarts. // Hence, the RayService's Serve application status is empty, but the KubeRay operator has cached the Serve // application's configuration. - r.ServeConfigs.Set(cacheKey, rayService.Spec.ServeConfigV2) // Simulate the Serve application's configuration has been cached. + r.cacheServeConfig(&rayService, cluster.Name) // Simulate the Serve application's configuration has been cached. shouldCreate = r.checkIfNeedSubmitServeDeployment(ctx, &rayService, &cluster, &rayv1.RayServiceStatus{}) assert.True(t, shouldCreate) // Test 3: The Serve application has been created, and the RayService's status has been updated. - _, exist = r.ServeConfigs.Get(cacheKey) - assert.True(t, exist) + serveConfig = r.getServeConfigFromCache(&rayService, cluster.Name) + assert.NotEmpty(t, serveConfig) serveStatus := rayv1.RayServiceStatus{ Applications: map[string]rayv1.AppStatus{ "myapp": {