Skip to content

Commit

Permalink
[RayJob][Feature] add light weight job submitter in kuberay image
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Nov 27, 2024
1 parent a4d7dd0 commit 43016fd
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 0 deletions.
3 changes: 3 additions & 0 deletions ray-operator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@ COPY main.go main.go
COPY apis/ apis/
COPY controllers/ controllers/
COPY pkg/features pkg/features
COPY rayjob-submitter/ rayjob-submitter/

# Build
USER root
RUN CGO_ENABLED=1 GOOS=linux go build -tags strictfipsruntime -a -o manager main.go
RUN CGO_ENABLED=1 GOOS=linux go build -tags strictfipsruntime -a -o submitter rayjob-submitter/main.go

FROM gcr.io/distroless/base-debian12:nonroot
WORKDIR /
COPY --from=builder /workspace/manager .
COPY --from=builder /workspace/submitter .
USER 65532:65532

ENTRYPOINT ["/manager"]
1 change: 1 addition & 0 deletions ray-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/coder/websocket v1.8.12 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.12.1 // indirect
github.com/evanphx/json-patch v5.9.0+incompatible // indirect
Expand Down
2 changes: 2 additions & 0 deletions ray-operator/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

144 changes: 144 additions & 0 deletions ray-operator/rayjob-submitter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package main

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"

"github.com/coder/websocket"
flag "github.com/spf13/pflag"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)

func submitJobReq(address string, request utils.RayJobRequest) (jobId string, err error) {
rayJobJson, err := json.Marshal(request)
if err != nil {
return "", err
}

req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, address, bytes.NewBuffer(rayJobJson))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")

resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()

body, _ := io.ReadAll(resp.Body)

if resp.StatusCode == http.StatusBadRequest { // ignore the duplicated submission error
if strings.Contains(string(body), "Please use a different submission_id") {
return request.SubmissionId, nil
}
}

if resp.StatusCode < 200 || resp.StatusCode > 299 {
return "", fmt.Errorf("SubmitJob fail: %s %s", resp.Status, string(body))
}

return request.SubmissionId, nil
}

func jobSubmissionURL(address string) string {
if !strings.HasPrefix(address, "http://") {
address = "http://" + address
}
address, err := url.JoinPath(address, "/api/jobs/") // the tailing "/" is required.
if err != nil {
panic(err)
}
return address
}

func logTailingURL(address, submissionId string) string {
address = strings.Replace(address, "http", "ws", 1)
address, err := url.JoinPath(address, submissionId, "/logs/tail")
if err != nil {
panic(err)
}
return address
}

func main() {
var (
runtimeEnvJson string
metadataJson string
entrypointResources string
entrypointNumCpus float32
entrypointNumGpus float32
)

flag.StringVar(&runtimeEnvJson, "runtime-env-json", "", "")
flag.StringVar(&metadataJson, "metadata-json", "", "")
flag.StringVar(&entrypointResources, "entrypoint-resources", "", "")
flag.Float32Var(&entrypointNumCpus, "entrypoint-num-cpus", 0.0, "")
flag.Float32Var(&entrypointNumGpus, "entrypoint-num-gpus", 0.0, "")

flag.Parse()

address := os.Getenv("RAY_DASHBOARD_ADDRESS")
if address == "" {
panic("Missing RAY_DASHBOARD_ADDRESS")
}
submissionId := os.Getenv("RAY_JOB_SUBMISSION_ID")
if submissionId == "" {
panic("Missing RAY_JOB_SUBMISSION_ID")
}

req := utils.RayJobRequest{
Entrypoint: strings.Join(flag.Args(), " "),
SubmissionId: submissionId,
NumCpus: entrypointNumCpus,
NumGpus: entrypointNumGpus,
}

if len(runtimeEnvJson) > 0 {
if err := json.Unmarshal([]byte(runtimeEnvJson), &req.RuntimeEnv); err != nil {
panic(err)
}
}
if len(metadataJson) > 0 {
if err := json.Unmarshal([]byte(metadataJson), &req.Metadata); err != nil {
panic(err)
}
}
if len(entrypointResources) > 0 {
if err := json.Unmarshal([]byte(entrypointResources), &req.Resources); err != nil {
panic(err)
}
}

address = jobSubmissionURL(address)
submissionId, err := submitJobReq(address, req)
if err != nil {
panic(err)
}

wsAddr := logTailingURL(address, submissionId)
c, _, err := websocket.Dial(context.Background(), wsAddr, nil)
if err != nil {
panic(err)
}
defer func() { _ = c.CloseNow() }()
for {
_, msg, err := c.Read(context.Background())
if err != nil {
if websocket.CloseStatus(err) == websocket.StatusNormalClosure {
return
}
panic(err)
}
os.Stdout.Write(msg)
}
}

0 comments on commit 43016fd

Please sign in to comment.