Skip to content

Commit

Permalink
DELETE_RAYJOB_CR_AFTER_JOB_FINISHES doesn't work with Configuration API
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Sy Kim <[email protected]>
  • Loading branch information
andrewsykim committed Jul 26, 2024
1 parent 085dbb5 commit 5f06fa8
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 29 deletions.
11 changes: 0 additions & 11 deletions ray-operator/apis/config/v1alpha1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package v1alpha1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)

//+kubebuilder:object:root=true
Expand Down Expand Up @@ -67,11 +64,3 @@ type Configuration struct {
// DeleteRayJobAfterJobFinishes deletes the RayJob CR itself if shutdownAfterJobFinishes is set to true.
DeleteRayJobAfterJobFinishes bool `json:"deleteRayJobAfterJobFinishes,omitempty"`
}

func (config Configuration) GetDashboardClient(mgr manager.Manager) func() utils.RayDashboardClientInterface {
return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy)
}

func (config Configuration) GetHttpProxyClient(mgr manager.Manager) func() utils.RayHttpProxyClientInterface {
return utils.GetRayHttpProxyClientFunc(mgr, config.UseKubernetesProxy)
}
21 changes: 12 additions & 9 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package ray
import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/go-logr/logr"
Expand All @@ -26,6 +24,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
)

Expand All @@ -41,17 +40,19 @@ type RayJobReconciler struct {
Scheme *runtime.Scheme
Recorder record.EventRecorder

dashboardClientFunc func() utils.RayDashboardClientInterface
dashboardClientFunc func() utils.RayDashboardClientInterface
deleteRayJobAfterJobFinishes bool
}

// NewRayJobReconciler returns a new reconcile.Reconciler
func NewRayJobReconciler(_ context.Context, mgr manager.Manager, provider utils.ClientProvider) *RayJobReconciler {
func NewRayJobReconciler(_ context.Context, mgr manager.Manager, provider utils.ClientProvider, config configapi.Configuration) *RayJobReconciler {
dashboardClientFunc := provider.GetDashboardClient(mgr)
return &RayJobReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("rayjob-controller"),
dashboardClientFunc: dashboardClientFunc,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("rayjob-controller"),
dashboardClientFunc: dashboardClientFunc,
deleteRayJobAfterJobFinishes: config.DeleteRayJobAfterJobFinishes,
}
}

Expand Down Expand Up @@ -335,7 +336,8 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
logger.Info(fmt.Sprintf("shutdownTime not reached, requeue this RayJob for %d seconds", delta))
return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
}
if s := os.Getenv(utils.DELETE_RAYJOB_CR_AFTER_JOB_FINISHES); strings.ToLower(s) == "true" {

if r.deleteRayJobAfterJobFinishes {
err = r.Client.Delete(ctx, rayJobInstance)
logger.Info("RayJob is deleted")
} else {
Expand All @@ -344,6 +346,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
_, err = r.deleteClusterResources(ctx, rayJobInstance)
logger.Info("RayCluster is deleted", "RayCluster", rayJobInstance.Status.RayClusterName)
}

if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
Expand Down
8 changes: 5 additions & 3 deletions ray-operator/controllers/ray/rayjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package ray

import (
"context"
"os"
"time"

"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -423,8 +422,11 @@ var _ = Context("RayJob in K8sJobMode", func() {
})

It("If DELETE_RAYJOB_CR_AFTER_JOB_FINISHES environement variable is set, RayJob should be deleted.", func() {
os.Setenv(utils.DELETE_RAYJOB_CR_AFTER_JOB_FINISHES, "true")
defer os.Unsetenv(utils.DELETE_RAYJOB_CR_AFTER_JOB_FINISHES)
rayJobReconciler.deleteRayJobAfterJobFinishes = true
defer func() {
rayJobReconciler.deleteRayJobAfterJobFinishes = false
}()

Eventually(
func() bool {
return apierrors.IsNotFound(getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob)())
Expand Down
14 changes: 11 additions & 3 deletions ray-operator/controllers/ray/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"sigs.k8s.io/controller-runtime/pkg/manager"

configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"

Expand Down Expand Up @@ -51,6 +52,10 @@ var (

fakeRayDashboardClient *utils.FakeRayDashboardClient
fakeRayHttpProxyClient *utils.FakeRayHttpProxyClient

rayClusterReconciler *RayClusterReconciler
rayJobReconciler *RayJobReconciler
rayServiceReconciler *RayServiceReconciler
)

type TestClientProvider struct{}
Expand Down Expand Up @@ -120,14 +125,17 @@ var _ = BeforeSuite(func(ctx SpecContext) {
},
},
}
err = NewReconciler(ctx, mgr, options).SetupWithManager(mgr, 1)
rayClusterReconciler = NewReconciler(ctx, mgr, options)
err = rayClusterReconciler.SetupWithManager(mgr, 1)
Expect(err).NotTo(HaveOccurred(), "failed to setup RayCluster controller")

testClientProvider := TestClientProvider{}
err = NewRayServiceReconciler(ctx, mgr, testClientProvider).SetupWithManager(mgr, 1)
rayServiceReconciler = NewRayServiceReconciler(ctx, mgr, testClientProvider)
err = rayServiceReconciler.SetupWithManager(mgr, 1)
Expect(err).NotTo(HaveOccurred(), "failed to setup RayService controller")

err = NewRayJobReconciler(ctx, mgr, testClientProvider).SetupWithManager(mgr, 1)
rayJobReconciler = NewRayJobReconciler(ctx, mgr, testClientProvider, configapi.Configuration{})
err = rayJobReconciler.SetupWithManager(mgr, 1)
Expect(err).NotTo(HaveOccurred(), "failed to setup RayJob controller")

go func() {
Expand Down
14 changes: 14 additions & 0 deletions ray-operator/controllers/ray/utils/dashboard_httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"

"k8s.io/apimachinery/pkg/api/errors"

"k8s.io/apimachinery/pkg/util/json"

configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
)

Expand All @@ -30,6 +32,18 @@ var (
JobPath = "/api/jobs/"
)

type RayClientProvider struct {
configapi.Configuration
}

func (r *RayClientProvider) GetDashboardClient(mgr manager.Manager) func() RayDashboardClientInterface {
return GetRayDashboardClientFunc(mgr, r.UseKubernetesProxy)
}

func (r *RayClientProvider) GetHttpProxyClient(mgr manager.Manager) func() RayHttpProxyClientInterface {
return GetRayHttpProxyClientFunc(mgr, r.UseKubernetesProxy)
}

type RayDashboardClientInterface interface {
InitClient(ctx context.Context, url string, rayCluster *rayv1.RayCluster) error
UpdateDeployments(ctx context.Context, configJson []byte) error
Expand Down
7 changes: 4 additions & 3 deletions ray-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func main() {
config.LogStdoutEncoder = logStdoutEncoder
config.EnableBatchScheduler = ray.EnableBatchScheduler
config.UseKubernetesProxy = useKubernetesProxy
config.DeleteRayJobAfterJobFinishes = os.Getenv(utils.DELETE_RAYJOB_CR_AFTER_JOB_FINISHES) == "true"
config.DeleteRayJobAfterJobFinishes = strings.ToLower(os.Getenv(utils.DELETE_RAYJOB_CR_AFTER_JOB_FINISHES)) == "true"
}

stdoutEncoder, err := newLogEncoder(logStdoutEncoder)
Expand Down Expand Up @@ -220,11 +220,12 @@ func main() {
WorkerSidecarContainers: config.WorkerSidecarContainers,
}
ctx := ctrl.SetupSignalHandler()
rayClientProvider := &utils.RayClientProvider{Configuration: config}
exitOnError(ray.NewReconciler(ctx, mgr, rayClusterOptions).SetupWithManager(mgr, config.ReconcileConcurrency),
"unable to create controller", "controller", "RayCluster")
exitOnError(ray.NewRayServiceReconciler(ctx, mgr, config).SetupWithManager(mgr, config.ReconcileConcurrency),
exitOnError(ray.NewRayServiceReconciler(ctx, mgr, rayClientProvider).SetupWithManager(mgr, config.ReconcileConcurrency),
"unable to create controller", "controller", "RayService")
exitOnError(ray.NewRayJobReconciler(ctx, mgr, config).SetupWithManager(mgr, config.ReconcileConcurrency),
exitOnError(ray.NewRayJobReconciler(ctx, mgr, rayClientProvider, config).SetupWithManager(mgr, config.ReconcileConcurrency),
"unable to create controller", "controller", "RayJob")

if os.Getenv("ENABLE_WEBHOOKS") == "true" {
Expand Down

0 comments on commit 5f06fa8

Please sign in to comment.