From 5f06fa8d3bb7a1827ebaeb575a2b08e1821c12bd Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Fri, 19 Jul 2024 19:22:11 +0000 Subject: [PATCH] DELETE_RAYJOB_CR_AFTER_JOB_FINISHES doesn't work with Configuration API Signed-off-by: Andrew Sy Kim --- .../config/v1alpha1/configuration_types.go | 11 ---------- .../controllers/ray/rayjob_controller.go | 21 +++++++++++-------- .../controllers/ray/rayjob_controller_test.go | 8 ++++--- ray-operator/controllers/ray/suite_test.go | 14 ++++++++++--- .../ray/utils/dashboard_httpclient.go | 14 +++++++++++++ ray-operator/main.go | 7 ++++--- 6 files changed, 46 insertions(+), 29 deletions(-) diff --git a/ray-operator/apis/config/v1alpha1/configuration_types.go b/ray-operator/apis/config/v1alpha1/configuration_types.go index 7f1681b417..1e786a5a16 100644 --- a/ray-operator/apis/config/v1alpha1/configuration_types.go +++ b/ray-operator/apis/config/v1alpha1/configuration_types.go @@ -3,9 +3,6 @@ package v1alpha1 import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/manager" - - "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" ) //+kubebuilder:object:root=true @@ -67,11 +64,3 @@ type Configuration struct { // DeleteRayJobAfterJobFinishes deletes the RayJob CR itself if shutdownAfterJobFinishes is set to true. DeleteRayJobAfterJobFinishes bool `json:"deleteRayJobAfterJobFinishes,omitempty"` } - -func (config Configuration) GetDashboardClient(mgr manager.Manager) func() utils.RayDashboardClientInterface { - return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy) -} - -func (config Configuration) GetHttpProxyClient(mgr manager.Manager) func() utils.RayHttpProxyClientInterface { - return utils.GetRayHttpProxyClientFunc(mgr, config.UseKubernetesProxy) -} diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index a84194e14f..e93c2581d7 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -3,8 +3,6 @@ package ray import ( "context" "fmt" - "os" - "strings" "time" "github.com/go-logr/logr" @@ -26,6 +24,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/reconcile" + configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" ) @@ -41,17 +40,19 @@ type RayJobReconciler struct { Scheme *runtime.Scheme Recorder record.EventRecorder - dashboardClientFunc func() utils.RayDashboardClientInterface + dashboardClientFunc func() utils.RayDashboardClientInterface + deleteRayJobAfterJobFinishes bool } // NewRayJobReconciler returns a new reconcile.Reconciler -func NewRayJobReconciler(_ context.Context, mgr manager.Manager, provider utils.ClientProvider) *RayJobReconciler { +func NewRayJobReconciler(_ context.Context, mgr manager.Manager, provider utils.ClientProvider, config configapi.Configuration) *RayJobReconciler { dashboardClientFunc := provider.GetDashboardClient(mgr) return &RayJobReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Recorder: mgr.GetEventRecorderFor("rayjob-controller"), - dashboardClientFunc: dashboardClientFunc, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor("rayjob-controller"), + dashboardClientFunc: dashboardClientFunc, + deleteRayJobAfterJobFinishes: config.DeleteRayJobAfterJobFinishes, } } @@ -335,7 +336,8 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) logger.Info(fmt.Sprintf("shutdownTime not reached, requeue this RayJob for %d seconds", delta)) return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil } - if s := os.Getenv(utils.DELETE_RAYJOB_CR_AFTER_JOB_FINISHES); strings.ToLower(s) == "true" { + + if r.deleteRayJobAfterJobFinishes { err = r.Client.Delete(ctx, rayJobInstance) logger.Info("RayJob is deleted") } else { @@ -344,6 +346,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) _, err = r.deleteClusterResources(ctx, rayJobInstance) logger.Info("RayCluster is deleted", "RayCluster", rayJobInstance.Status.RayClusterName) } + if err != nil { return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err } diff --git a/ray-operator/controllers/ray/rayjob_controller_test.go b/ray-operator/controllers/ray/rayjob_controller_test.go index 5409ddd815..4891fc7432 100644 --- a/ray-operator/controllers/ray/rayjob_controller_test.go +++ b/ray-operator/controllers/ray/rayjob_controller_test.go @@ -17,7 +17,6 @@ package ray import ( "context" - "os" "time" "k8s.io/apimachinery/pkg/api/resource" @@ -423,8 +422,11 @@ var _ = Context("RayJob in K8sJobMode", func() { }) It("If DELETE_RAYJOB_CR_AFTER_JOB_FINISHES environement variable is set, RayJob should be deleted.", func() { - os.Setenv(utils.DELETE_RAYJOB_CR_AFTER_JOB_FINISHES, "true") - defer os.Unsetenv(utils.DELETE_RAYJOB_CR_AFTER_JOB_FINISHES) + rayJobReconciler.deleteRayJobAfterJobFinishes = true + defer func() { + rayJobReconciler.deleteRayJobAfterJobFinishes = false + }() + Eventually( func() bool { return apierrors.IsNotFound(getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob)()) diff --git a/ray-operator/controllers/ray/suite_test.go b/ray-operator/controllers/ray/suite_test.go index d614e64d4f..d8e11087f5 100644 --- a/ray-operator/controllers/ray/suite_test.go +++ b/ray-operator/controllers/ray/suite_test.go @@ -22,6 +22,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" + configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" @@ -51,6 +52,10 @@ var ( fakeRayDashboardClient *utils.FakeRayDashboardClient fakeRayHttpProxyClient *utils.FakeRayHttpProxyClient + + rayClusterReconciler *RayClusterReconciler + rayJobReconciler *RayJobReconciler + rayServiceReconciler *RayServiceReconciler ) type TestClientProvider struct{} @@ -120,14 +125,17 @@ var _ = BeforeSuite(func(ctx SpecContext) { }, }, } - err = NewReconciler(ctx, mgr, options).SetupWithManager(mgr, 1) + rayClusterReconciler = NewReconciler(ctx, mgr, options) + err = rayClusterReconciler.SetupWithManager(mgr, 1) Expect(err).NotTo(HaveOccurred(), "failed to setup RayCluster controller") testClientProvider := TestClientProvider{} - err = NewRayServiceReconciler(ctx, mgr, testClientProvider).SetupWithManager(mgr, 1) + rayServiceReconciler = NewRayServiceReconciler(ctx, mgr, testClientProvider) + err = rayServiceReconciler.SetupWithManager(mgr, 1) Expect(err).NotTo(HaveOccurred(), "failed to setup RayService controller") - err = NewRayJobReconciler(ctx, mgr, testClientProvider).SetupWithManager(mgr, 1) + rayJobReconciler = NewRayJobReconciler(ctx, mgr, testClientProvider, configapi.Configuration{}) + err = rayJobReconciler.SetupWithManager(mgr, 1) Expect(err).NotTo(HaveOccurred(), "failed to setup RayJob controller") go func() { diff --git a/ray-operator/controllers/ray/utils/dashboard_httpclient.go b/ray-operator/controllers/ray/utils/dashboard_httpclient.go index 758a7fc5ab..6cf7479303 100644 --- a/ray-operator/controllers/ray/utils/dashboard_httpclient.go +++ b/ray-operator/controllers/ray/utils/dashboard_httpclient.go @@ -14,11 +14,13 @@ import ( corev1 "k8s.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/json" + configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" ) @@ -30,6 +32,18 @@ var ( JobPath = "/api/jobs/" ) +type RayClientProvider struct { + configapi.Configuration +} + +func (r *RayClientProvider) GetDashboardClient(mgr manager.Manager) func() RayDashboardClientInterface { + return GetRayDashboardClientFunc(mgr, r.UseKubernetesProxy) +} + +func (r *RayClientProvider) GetHttpProxyClient(mgr manager.Manager) func() RayHttpProxyClientInterface { + return GetRayHttpProxyClientFunc(mgr, r.UseKubernetesProxy) +} + type RayDashboardClientInterface interface { InitClient(ctx context.Context, url string, rayCluster *rayv1.RayCluster) error UpdateDeployments(ctx context.Context, configJson []byte) error diff --git a/ray-operator/main.go b/ray-operator/main.go index 456a6afe25..d06a14a9c9 100644 --- a/ray-operator/main.go +++ b/ray-operator/main.go @@ -126,7 +126,7 @@ func main() { config.LogStdoutEncoder = logStdoutEncoder config.EnableBatchScheduler = ray.EnableBatchScheduler config.UseKubernetesProxy = useKubernetesProxy - config.DeleteRayJobAfterJobFinishes = os.Getenv(utils.DELETE_RAYJOB_CR_AFTER_JOB_FINISHES) == "true" + config.DeleteRayJobAfterJobFinishes = strings.ToLower(os.Getenv(utils.DELETE_RAYJOB_CR_AFTER_JOB_FINISHES)) == "true" } stdoutEncoder, err := newLogEncoder(logStdoutEncoder) @@ -220,11 +220,12 @@ func main() { WorkerSidecarContainers: config.WorkerSidecarContainers, } ctx := ctrl.SetupSignalHandler() + rayClientProvider := &utils.RayClientProvider{Configuration: config} exitOnError(ray.NewReconciler(ctx, mgr, rayClusterOptions).SetupWithManager(mgr, config.ReconcileConcurrency), "unable to create controller", "controller", "RayCluster") - exitOnError(ray.NewRayServiceReconciler(ctx, mgr, config).SetupWithManager(mgr, config.ReconcileConcurrency), + exitOnError(ray.NewRayServiceReconciler(ctx, mgr, rayClientProvider).SetupWithManager(mgr, config.ReconcileConcurrency), "unable to create controller", "controller", "RayService") - exitOnError(ray.NewRayJobReconciler(ctx, mgr, config).SetupWithManager(mgr, config.ReconcileConcurrency), + exitOnError(ray.NewRayJobReconciler(ctx, mgr, rayClientProvider, config).SetupWithManager(mgr, config.ReconcileConcurrency), "unable to create controller", "controller", "RayJob") if os.Getenv("ENABLE_WEBHOOKS") == "true" {