Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RayService][Refactor] Change the ServeConfigs to nested map #2591

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 45 additions & 28 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ type RayServiceReconciler struct {
Recorder record.EventRecorder
// Currently, the Ray dashboard doesn't cache the Serve deployment config.
MortalHappiness marked this conversation as resolved.
Show resolved Hide resolved
// To avoid reapplying the same config repeatedly, cache the config in this map.
ServeConfigs cmap.ConcurrentMap[string, string]
// Stores map of cacheKey to map of RayCluster name to Serve deployment config,
MortalHappiness marked this conversation as resolved.
Show resolved Hide resolved
// where cacheKey is the combination of RayService namespace and name.
ServeConfigs cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, string]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to explain the definitions of key and value?

Copy link
Member Author

@MortalHappiness MortalHappiness Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in b2cb82a

RayClusterDeletionTimestamps cmap.ConcurrentMap[string, time.Time]

dashboardClientFunc func() utils.RayDashboardClientInterface
Expand All @@ -72,7 +74,7 @@ func NewRayServiceReconciler(_ context.Context, mgr manager.Manager, provider ut
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("rayservice-controller"),
ServeConfigs: cmap.New[string](),
ServeConfigs: cmap.New[cmap.ConcurrentMap[string, string]](),
RayClusterDeletionTimestamps: cmap.New[time.Time](),

dashboardClientFunc: dashboardClientFunc,
Expand Down Expand Up @@ -529,21 +531,21 @@ func (r *RayServiceReconciler) getRayClusterByNamespacedName(ctx context.Context
// cleanUpServeConfigCache cleans up the unused serve deployments config in the cached map.
MortalHappiness marked this conversation as resolved.
Show resolved Hide resolved
func (r *RayServiceReconciler) cleanUpServeConfigCache(ctx context.Context, rayServiceInstance *rayv1.RayService) {
logger := ctrl.LoggerFrom(ctx)
activeConfigKey := r.generateConfigKey(rayServiceInstance, rayServiceInstance.Status.ActiveServiceStatus.RayClusterName)
pendingConfigKey := r.generateConfigKey(rayServiceInstance, rayServiceInstance.Status.PendingServiceStatus.RayClusterName)
configPrefix := r.generateConfigKeyPrefix(rayServiceInstance)
activeRayClusterName := rayServiceInstance.Status.ActiveServiceStatus.RayClusterName
pendingRayClusterName := rayServiceInstance.Status.PendingServiceStatus.RayClusterName

// Clean up RayCluster serve deployment configs.
for key := range r.ServeConfigs.Items() {
if key == activeConfigKey || key == pendingConfigKey {
continue
}
if !strings.HasPrefix(key, configPrefix) {
// Skip configs owned by other RayService Instance.
cacheKey := rayServiceInstance.Namespace + "/" + rayServiceInstance.Name
serveConfigs, exist := r.ServeConfigs.Get(cacheKey)
if !exist {
return
}

for key := range serveConfigs.Items() {
if key == activeRayClusterName || key == pendingRayClusterName {
continue
}
logger.Info("cleanUpServeConfigCache", "activeConfigKey", activeConfigKey, "pendingConfigKey", pendingConfigKey, "remove key", key)
r.ServeConfigs.Remove(key)
logger.Info("cleanUpServeConfigCache", "activeRayClusterName", activeRayClusterName, "pendingRayClusterName", pendingRayClusterName, "remove key", key)
serveConfigs.Remove(key)
}
}

Expand Down Expand Up @@ -803,16 +805,13 @@ func (r *RayServiceReconciler) checkIfNeedSubmitServeDeployment(ctx context.Cont
logger := ctrl.LoggerFrom(ctx)

// If the Serve config has not been cached, update the Serve config.
cacheKey := r.generateConfigKey(rayServiceInstance, rayClusterInstance.Name)
cachedServeConfigV2, exist := r.ServeConfigs.Get(cacheKey)

if !exist {
cachedServeConfigV2 := r.getServeConfigFromCache(rayServiceInstance, rayClusterInstance.Name)
if cachedServeConfigV2 == "" {
logger.Info(
"shouldUpdate",
"shouldUpdateServe", true,
"reason", "Nothing has been cached for the cluster with the key",
"reason", "Nothing has been cached for the cluster",
"rayClusterName", rayClusterInstance.Name,
"cacheKey", cacheKey,
)
return true
}
Expand All @@ -838,7 +837,7 @@ func (r *RayServiceReconciler) checkIfNeedSubmitServeDeployment(ctx context.Cont

if cachedServeConfigV2 != rayServiceInstance.Spec.ServeConfigV2 {
shouldUpdate = true
reason = fmt.Sprintf("Current V2 Serve config doesn't match cached Serve config for cluster %s with key %s", rayClusterInstance.Name, cacheKey)
reason = fmt.Sprintf("Current V2 Serve config doesn't match cached Serve config for cluster %s", rayClusterInstance.Name)
}
logger.Info("shouldUpdate", "shouldUpdateServe", shouldUpdate, "reason", reason, "cachedServeConfig", cachedServeConfigV2, "current Serve config", rayServiceInstance.Spec.ServeConfigV2)

Expand Down Expand Up @@ -868,9 +867,8 @@ func (r *RayServiceReconciler) updateServeDeployment(ctx context.Context, raySer
return err
}

cacheKey := r.generateConfigKey(rayServiceInstance, clusterName)
r.ServeConfigs.Set(cacheKey, rayServiceInstance.Spec.ServeConfigV2)
logger.Info("updateServeDeployment", "message", "Cached Serve config for Ray cluster with the key", "rayClusterName", clusterName, "cacheKey", cacheKey)
r.cacheServeConfig(rayServiceInstance, clusterName)
logger.Info("updateServeDeployment", "message", "Cached Serve config for Ray cluster with the key", "rayClusterName", clusterName)
return nil
}

Expand Down Expand Up @@ -960,12 +958,31 @@ func (r *RayServiceReconciler) getAndCheckServeStatus(ctx context.Context, dashb
return isReady, nil
}

func (r *RayServiceReconciler) generateConfigKey(rayServiceInstance *rayv1.RayService, clusterName string) string {
return r.generateConfigKeyPrefix(rayServiceInstance) + clusterName
func (r *RayServiceReconciler) getServeConfigFromCache(rayServiceInstance *rayv1.RayService, clusterName string) string {
cacheKey := rayServiceInstance.Namespace + "/" + rayServiceInstance.Name
serveConfigs, exist := r.ServeConfigs.Get(cacheKey)
if !exist {
return ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that the config is indeed an empty string?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 040d475. Only cache the serveConfig when it is not empty.

}
serveConfig, exist := serveConfigs.Get(clusterName)
if !exist {
return ""
}
return serveConfig
}

func (r *RayServiceReconciler) generateConfigKeyPrefix(rayServiceInstance *rayv1.RayService) string {
return rayServiceInstance.Namespace + "/" + rayServiceInstance.Name + "/"
func (r *RayServiceReconciler) cacheServeConfig(rayServiceInstance *rayv1.RayService, clusterName string) {
serveConfig := rayServiceInstance.Spec.ServeConfigV2
if serveConfig == "" {
return
}
cacheKey := rayServiceInstance.Namespace + "/" + rayServiceInstance.Name
rayServiceServeConfigs, exist := r.ServeConfigs.Get(cacheKey)
if !exist {
rayServiceServeConfigs = cmap.New[string]()
r.ServeConfigs.Set(cacheKey, rayServiceServeConfigs)
}
rayServiceServeConfigs.Set(clusterName, serveConfig)
}

func (r *RayServiceReconciler) markRestartAndAddPendingClusterName(ctx context.Context, rayServiceInstance *rayv1.RayService) {
Expand Down
13 changes: 6 additions & 7 deletions ray-operator/controllers/ray/rayservice_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ func TestCheckIfNeedSubmitServeDeployment(t *testing.T) {
Client: fakeClient,
Recorder: &record.FakeRecorder{},
Scheme: scheme.Scheme,
ServeConfigs: cmap.New[string](),
ServeConfigs: cmap.New[cmap.ConcurrentMap[string, string]](),
}

namespace := "ray"
Expand Down Expand Up @@ -703,22 +703,21 @@ applications:
// Test 1: The RayCluster is new, and this is the first reconciliation after the RayCluster becomes ready.
// No Serve application has been created yet, so the RayService's serve configuration has not been cached in
// `r.ServeConfigs`.
cacheKey := r.generateConfigKey(&rayService, cluster.Name)
_, exist := r.ServeConfigs.Get(cacheKey)
assert.False(t, exist)
serveConfig := r.getServeConfigFromCache(&rayService, cluster.Name)
assert.Empty(t, serveConfig)
shouldCreate := r.checkIfNeedSubmitServeDeployment(ctx, &rayService, &cluster, &rayv1.RayServiceStatus{})
assert.True(t, shouldCreate)

// Test 2: The RayCluster is not new, but the head Pod without GCS FT-enabled crashes and restarts.
// Hence, the RayService's Serve application status is empty, but the KubeRay operator has cached the Serve
// application's configuration.
r.ServeConfigs.Set(cacheKey, rayService.Spec.ServeConfigV2) // Simulate the Serve application's configuration has been cached.
r.cacheServeConfig(&rayService, cluster.Name) // Simulate the Serve application's configuration has been cached.
shouldCreate = r.checkIfNeedSubmitServeDeployment(ctx, &rayService, &cluster, &rayv1.RayServiceStatus{})
assert.True(t, shouldCreate)

// Test 3: The Serve application has been created, and the RayService's status has been updated.
_, exist = r.ServeConfigs.Get(cacheKey)
assert.True(t, exist)
serveConfig = r.getServeConfigFromCache(&rayService, cluster.Name)
assert.NotEmpty(t, serveConfig)
serveStatus := rayv1.RayServiceStatus{
Applications: map[string]rayv1.AppStatus{
"myapp": {
Expand Down
Loading