Skip to content

Commit

Permalink
[RayService][Refactor] Change the ServeConfigs to nested map (#2591)
Browse files Browse the repository at this point in the history
  • Loading branch information
MortalHappiness authored Dec 4, 2024
1 parent df0bf2b commit 3c8904c
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 39 deletions.
81 changes: 49 additions & 32 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ type RayServiceReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
// Currently, the Ray dashboard doesn't cache the Serve deployment config.
// Currently, the Ray dashboard doesn't cache the Serve application config.
// To avoid reapplying the same config repeatedly, cache the config in this map.
ServeConfigs cmap.ConcurrentMap[string, string]
// Stores map of cacheKey to map of RayCluster name to Serve application config,
// where cacheKey is the combination of RayService namespace and name.
ServeConfigs cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, string]]
RayClusterDeletionTimestamps cmap.ConcurrentMap[string, time.Time]

dashboardClientFunc func() utils.RayDashboardClientInterface
Expand All @@ -72,7 +74,7 @@ func NewRayServiceReconciler(_ context.Context, mgr manager.Manager, provider ut
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("rayservice-controller"),
ServeConfigs: cmap.New[string](),
ServeConfigs: cmap.New[cmap.ConcurrentMap[string, string]](),
RayClusterDeletionTimestamps: cmap.New[time.Time](),

dashboardClientFunc: dashboardClientFunc,
Expand Down Expand Up @@ -526,24 +528,24 @@ func (r *RayServiceReconciler) getRayClusterByNamespacedName(ctx context.Context
return rayCluster, nil
}

// cleanUpServeConfigCache cleans up the unused serve deployments config in the cached map.
// cleanUpServeConfigCache cleans up the unused serve applications config in the cached map.
func (r *RayServiceReconciler) cleanUpServeConfigCache(ctx context.Context, rayServiceInstance *rayv1.RayService) {
logger := ctrl.LoggerFrom(ctx)
activeConfigKey := r.generateConfigKey(rayServiceInstance, rayServiceInstance.Status.ActiveServiceStatus.RayClusterName)
pendingConfigKey := r.generateConfigKey(rayServiceInstance, rayServiceInstance.Status.PendingServiceStatus.RayClusterName)
configPrefix := r.generateConfigKeyPrefix(rayServiceInstance)
activeRayClusterName := rayServiceInstance.Status.ActiveServiceStatus.RayClusterName
pendingRayClusterName := rayServiceInstance.Status.PendingServiceStatus.RayClusterName

// Clean up RayCluster serve deployment configs.
for key := range r.ServeConfigs.Items() {
if key == activeConfigKey || key == pendingConfigKey {
continue
}
if !strings.HasPrefix(key, configPrefix) {
// Skip configs owned by other RayService Instance.
cacheKey := rayServiceInstance.Namespace + "/" + rayServiceInstance.Name
serveConfigs, exist := r.ServeConfigs.Get(cacheKey)
if !exist {
return
}

for key := range serveConfigs.Items() {
if key == activeRayClusterName || key == pendingRayClusterName {
continue
}
logger.Info("cleanUpServeConfigCache", "activeConfigKey", activeConfigKey, "pendingConfigKey", pendingConfigKey, "remove key", key)
r.ServeConfigs.Remove(key)
logger.Info("cleanUpServeConfigCache", "activeRayClusterName", activeRayClusterName, "pendingRayClusterName", pendingRayClusterName, "remove key", key)
serveConfigs.Remove(key)
}
}

Expand Down Expand Up @@ -803,16 +805,13 @@ func (r *RayServiceReconciler) checkIfNeedSubmitServeDeployment(ctx context.Cont
logger := ctrl.LoggerFrom(ctx)

// If the Serve config has not been cached, update the Serve config.
cacheKey := r.generateConfigKey(rayServiceInstance, rayClusterInstance.Name)
cachedServeConfigV2, exist := r.ServeConfigs.Get(cacheKey)

if !exist {
cachedServeConfigV2 := r.getServeConfigFromCache(rayServiceInstance, rayClusterInstance.Name)
if cachedServeConfigV2 == "" {
logger.Info(
"shouldUpdate",
"shouldUpdateServe", true,
"reason", "Nothing has been cached for the cluster with the key",
"reason", "Nothing has been cached for the cluster",
"rayClusterName", rayClusterInstance.Name,
"cacheKey", cacheKey,
)
return true
}
Expand All @@ -838,7 +837,7 @@ func (r *RayServiceReconciler) checkIfNeedSubmitServeDeployment(ctx context.Cont

if cachedServeConfigV2 != rayServiceInstance.Spec.ServeConfigV2 {
shouldUpdate = true
reason = fmt.Sprintf("Current V2 Serve config doesn't match cached Serve config for cluster %s with key %s", rayClusterInstance.Name, cacheKey)
reason = fmt.Sprintf("Current V2 Serve config doesn't match cached Serve config for cluster %s", rayClusterInstance.Name)
}
logger.Info("shouldUpdate", "shouldUpdateServe", shouldUpdate, "reason", reason, "cachedServeConfig", cachedServeConfigV2, "current Serve config", rayServiceInstance.Spec.ServeConfigV2)

Expand Down Expand Up @@ -868,9 +867,8 @@ func (r *RayServiceReconciler) updateServeDeployment(ctx context.Context, raySer
return err
}

cacheKey := r.generateConfigKey(rayServiceInstance, clusterName)
r.ServeConfigs.Set(cacheKey, rayServiceInstance.Spec.ServeConfigV2)
logger.Info("updateServeDeployment", "message", "Cached Serve config for Ray cluster with the key", "rayClusterName", clusterName, "cacheKey", cacheKey)
r.cacheServeConfig(rayServiceInstance, clusterName)
logger.Info("updateServeDeployment", "message", "Cached Serve config for Ray cluster with the key", "rayClusterName", clusterName)
return nil
}

Expand Down Expand Up @@ -960,12 +958,31 @@ func (r *RayServiceReconciler) getAndCheckServeStatus(ctx context.Context, dashb
return isReady, nil
}

func (r *RayServiceReconciler) generateConfigKey(rayServiceInstance *rayv1.RayService, clusterName string) string {
return r.generateConfigKeyPrefix(rayServiceInstance) + clusterName
func (r *RayServiceReconciler) getServeConfigFromCache(rayServiceInstance *rayv1.RayService, clusterName string) string {
cacheKey := rayServiceInstance.Namespace + "/" + rayServiceInstance.Name
serveConfigs, exist := r.ServeConfigs.Get(cacheKey)
if !exist {
return ""
}
serveConfig, exist := serveConfigs.Get(clusterName)
if !exist {
return ""
}
return serveConfig
}

func (r *RayServiceReconciler) generateConfigKeyPrefix(rayServiceInstance *rayv1.RayService) string {
return rayServiceInstance.Namespace + "/" + rayServiceInstance.Name + "/"
func (r *RayServiceReconciler) cacheServeConfig(rayServiceInstance *rayv1.RayService, clusterName string) {
serveConfig := rayServiceInstance.Spec.ServeConfigV2
if serveConfig == "" {
return
}
cacheKey := rayServiceInstance.Namespace + "/" + rayServiceInstance.Name
rayServiceServeConfigs, exist := r.ServeConfigs.Get(cacheKey)
if !exist {
rayServiceServeConfigs = cmap.New[string]()
r.ServeConfigs.Set(cacheKey, rayServiceServeConfigs)
}
rayServiceServeConfigs.Set(clusterName, serveConfig)
}

func (r *RayServiceReconciler) markRestartAndAddPendingClusterName(ctx context.Context, rayServiceInstance *rayv1.RayService) {
Expand Down Expand Up @@ -1114,7 +1131,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
if err != nil {
logger.Error(err, "Failed to check if head Pod is running and ready!")
} else {
logger.Info("Skipping the update of Serve deployments because the Ray head Pod is not ready.")
logger.Info("Skipping the update of Serve applications because the Ray head Pod is not ready.")
}
return false, err
}
Expand Down Expand Up @@ -1155,7 +1172,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
if err := r.Status().Update(ctx, rayServiceInstance); err != nil {
return false, err
}
logger.Info("Mark cluster as waiting for Serve deployments", "rayCluster", rayClusterInstance)
logger.Info("Mark cluster as waiting for Serve applications", "rayCluster", rayClusterInstance)
}

return isReady, nil
Expand Down
13 changes: 6 additions & 7 deletions ray-operator/controllers/ray/rayservice_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ func TestCheckIfNeedSubmitServeDeployment(t *testing.T) {
Client: fakeClient,
Recorder: &record.FakeRecorder{},
Scheme: scheme.Scheme,
ServeConfigs: cmap.New[string](),
ServeConfigs: cmap.New[cmap.ConcurrentMap[string, string]](),
}

namespace := "ray"
Expand Down Expand Up @@ -703,22 +703,21 @@ applications:
// Test 1: The RayCluster is new, and this is the first reconciliation after the RayCluster becomes ready.
// No Serve application has been created yet, so the RayService's serve configuration has not been cached in
// `r.ServeConfigs`.
cacheKey := r.generateConfigKey(&rayService, cluster.Name)
_, exist := r.ServeConfigs.Get(cacheKey)
assert.False(t, exist)
serveConfig := r.getServeConfigFromCache(&rayService, cluster.Name)
assert.Empty(t, serveConfig)
shouldCreate := r.checkIfNeedSubmitServeDeployment(ctx, &rayService, &cluster, &rayv1.RayServiceStatus{})
assert.True(t, shouldCreate)

// Test 2: The RayCluster is not new, but the head Pod without GCS FT-enabled crashes and restarts.
// Hence, the RayService's Serve application status is empty, but the KubeRay operator has cached the Serve
// application's configuration.
r.ServeConfigs.Set(cacheKey, rayService.Spec.ServeConfigV2) // Simulate the Serve application's configuration has been cached.
r.cacheServeConfig(&rayService, cluster.Name) // Simulate the Serve application's configuration has been cached.
shouldCreate = r.checkIfNeedSubmitServeDeployment(ctx, &rayService, &cluster, &rayv1.RayServiceStatus{})
assert.True(t, shouldCreate)

// Test 3: The Serve application has been created, and the RayService's status has been updated.
_, exist = r.ServeConfigs.Get(cacheKey)
assert.True(t, exist)
serveConfig = r.getServeConfigFromCache(&rayService, cluster.Name)
assert.NotEmpty(t, serveConfig)
serveStatus := rayv1.RayServiceStatus{
Applications: map[string]rayv1.AppStatus{
"myapp": {
Expand Down

0 comments on commit 3c8904c

Please sign in to comment.