Skip to content

Commit

Permalink
[RayJob][Feature] test light weight job submitter in kuberay image
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Nov 30, 2024
1 parent 509556f commit e347595
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 153 deletions.
47 changes: 47 additions & 0 deletions .github/workflows/e2e-tests-ray-job-submitter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
name: e2e-ray-job-submitter

on:
pull_request:
branches:
- master
- 'release-*'
push:
branches:
- master
- 'release-*'

concurrency:
group: ${{ github.head_ref }}-${{ github.workflow }}
cancel-in-progress: true

jobs:
ray-job-submitter:
runs-on: ubuntu-20.04
strategy:
fail-fast: false
matrix:
ray-version: [ '2.39.0' ]
go-version: [ '1.22.0' ]
steps:
- name: Checkout code
uses: actions/checkout@v3
with:
submodules: recursive

- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go-version }}

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.x'

- name: Install Ray
run: pip install ray[default]==${{ matrix.ray-version }}

- name: Run e2e tests
run: |
cd ray-operator
go test -timeout 30m -v ./test/e2erayjobsubmitter
4 changes: 2 additions & 2 deletions ray-operator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ COPY main.go main.go
COPY apis/ apis/
COPY controllers/ controllers/
COPY pkg/features pkg/features
COPY rayjob-submitter/ rayjob-submitter/
COPY rayjobsubmitter/ rayjobsubmitter/

# Build
USER root
RUN CGO_ENABLED=1 GOOS=linux go build -tags strictfipsruntime -a -o manager main.go
RUN CGO_ENABLED=1 GOOS=linux go build -tags strictfipsruntime -a -o submitter rayjob-submitter/main.go
RUN CGO_ENABLED=1 GOOS=linux go build -tags strictfipsruntime -a -o submitter rayjobsubmitter/cmd/main.go

FROM gcr.io/distroless/base-debian12:nonroot
WORKDIR /
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ test-sampleyaml: WHAT ?= ./test/sampleyaml
test-sampleyaml: manifests fmt vet
go test -timeout 30m -v $(WHAT)

test-e2erayjobsubmitter: WHAT ?= ./test/e2erayjobsubmitter
test-e2erayjobsubmitter: fmt vet
go test -timeout 30m -v $(WHAT)

sync: helm api-docs
./hack/update-codegen.sh

Expand Down
151 changes: 0 additions & 151 deletions ray-operator/rayjob-submitter/main.go

This file was deleted.

98 changes: 98 additions & 0 deletions ray-operator/rayjobsubmitter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package rayjobsubmitter

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"

"github.com/coder/websocket"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)

func submitJobReq(address string, request utils.RayJobRequest) (jobId string, err error) {
rayJobJson, err := json.Marshal(request)
if err != nil {
return "", err
}

req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, address, bytes.NewBuffer(rayJobJson))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")

resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer func() { _ = resp.Body.Close() }()

body, _ := io.ReadAll(resp.Body)

if strings.Contains(string(body), "Please use a different submission_id") {
return request.SubmissionId, nil
}

if resp.StatusCode < 200 || resp.StatusCode > 299 {
return "", fmt.Errorf("SubmitJob fail: %s %s", resp.Status, string(body))
}

return request.SubmissionId, nil
}

func jobSubmissionURL(address string) string {
if !strings.HasPrefix(address, "http://") {
address = "http://" + address
}
address, err := url.JoinPath(address, "/api/jobs/") // the tailing "/" is required.
if err != nil {
panic(err)
}
return address
}

func logTailingURL(address, submissionId string) string {
address = strings.Replace(address, "http", "ws", 1)
address, err := url.JoinPath(address, submissionId, "/logs/tail")
if err != nil {
panic(err)
}
return address
}

func Submit(address string, req utils.RayJobRequest, out io.Writer) {
_, _ = fmt.Fprintf(out, "INFO -- Job submission server address: %s\n", address)

address = jobSubmissionURL(address)
submissionId, err := submitJobReq(address, req)
if err != nil {
panic(err)
}

_, _ = fmt.Fprintf(out, "SUCC -- Job '%s' submitted successfully\n", submissionId)
_, _ = fmt.Fprintf(out, "INFO -- Tailing logs until the job exits (disable with --no-wait):\n")

wsAddr := logTailingURL(address, submissionId)
c, _, err := websocket.Dial(context.Background(), wsAddr, nil)
if err != nil {
panic(err)
}
defer func() { _ = c.CloseNow() }()
for {
_, msg, err := c.Read(context.Background())
if err != nil {
if websocket.CloseStatus(err) == websocket.StatusNormalClosure {
_, _ = fmt.Fprintf(out, "SUCC -- Job '%s' succeeded\n", submissionId)
return
}
panic(err)
}
_, _ = out.Write(msg)
}
}
Loading

0 comments on commit e347595

Please sign in to comment.