Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RayJob][Feature] add light weight job submitter in kuberay image #2587

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions .github/workflows/e2e-tests-ray-job-submitter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
name: e2e-ray-job-submitter

on:
pull_request:
branches:
- master
- 'release-*'
push:
branches:
- master
- 'release-*'

concurrency:
group: ${{ github.head_ref }}-${{ github.workflow }}
cancel-in-progress: true

jobs:
ray-job-submitter:
runs-on: ubuntu-20.04
strategy:
fail-fast: false
matrix:
ray-version: [ '2.39.0' ]
go-version: [ '1.22.0' ]
steps:
- name: Checkout code
uses: actions/checkout@v3
with:
submodules: recursive

- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go-version }}

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.x'

- name: Install Ray
run: pip install ray[default]==${{ matrix.ray-version }}

- name: Run e2e tests
run: |
cd ray-operator
go test -timeout 30m -v ./test/e2erayjobsubmitter
3 changes: 3 additions & 0 deletions ray-operator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@ COPY main.go main.go
COPY apis/ apis/
COPY controllers/ controllers/
COPY pkg/features pkg/features
COPY rayjobsubmitter/ rayjobsubmitter/

# Build
USER root
RUN CGO_ENABLED=1 GOOS=linux go build -tags strictfipsruntime -a -o manager main.go
RUN CGO_ENABLED=1 GOOS=linux go build -tags strictfipsruntime -a -o submitter ./rayjobsubmitter/cmd/main.go

FROM gcr.io/distroless/base-debian12:nonroot
WORKDIR /
COPY --from=builder /workspace/manager .
COPY --from=builder /workspace/submitter .
USER 65532:65532

ENTRYPOINT ["/manager"]
4 changes: 4 additions & 0 deletions ray-operator/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ test-sampleyaml: WHAT ?= ./test/sampleyaml
test-sampleyaml: manifests fmt vet
go test -timeout 30m -v $(WHAT)

test-e2erayjobsubmitter: WHAT ?= ./test/e2erayjobsubmitter
test-e2erayjobsubmitter: fmt vet
go test -timeout 30m -v $(WHAT)

sync: helm api-docs
./hack/update-codegen.sh

Expand Down
1 change: 1 addition & 0 deletions ray-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/coder/websocket v1.8.12 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.12.1 // indirect
github.com/evanphx/json-patch v5.9.0+incompatible // indirect
Expand Down
2 changes: 2 additions & 0 deletions ray-operator/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions ray-operator/rayjobsubmitter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Ray Job Submitter

This is a Go Ray Job Submitter for KubeRay to submit a Ray Job
and tail its logs without installing Ray which is very large.

Note that this tool is designed specifically for KubeRay and
will not support some `ray job submit` features that people
don't use with KubeRay, for example, uploading local files to
a Ray cluster will not be supported by this tool.

## Testing

Tests are located at [../test/e2erayjobsubmitter](../test/e2erayjobsubmitter).

As the e2e suggests, you need to have `ray` installed for these tests
because they need to start a real Ray Head. You can run the tests with:

```sh
make test-e2erayjobsubmitter
```
or GitHub Action: [../../.github/workflows/e2e-tests-ray-job-submitter.yaml](../../.github/workflows/e2e-tests-ray-job-submitter.yaml)
61 changes: 61 additions & 0 deletions ray-operator/rayjobsubmitter/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
"encoding/json"
"os"
"strings"

flag "github.com/spf13/pflag"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/rayjobsubmitter"
)

func main() {
var (
runtimeEnvJson string
metadataJson string
entrypointResources string
entrypointNumCpus float32
entrypointNumGpus float32
)

flag.StringVar(&runtimeEnvJson, "runtime-env-json", "", "")
flag.StringVar(&metadataJson, "metadata-json", "", "")
flag.StringVar(&entrypointResources, "entrypoint-resources", "", "")
flag.Float32Var(&entrypointNumCpus, "entrypoint-num-cpus", 0.0, "")
flag.Float32Var(&entrypointNumGpus, "entrypoint-num-gpus", 0.0, "")
flag.Parse()

address := os.Getenv("RAY_DASHBOARD_ADDRESS")
if address == "" {
panic("Missing RAY_DASHBOARD_ADDRESS")
}
submissionId := os.Getenv("RAY_JOB_SUBMISSION_ID")
if submissionId == "" {
panic("Missing RAY_JOB_SUBMISSION_ID")
}

req := utils.RayJobRequest{
Entrypoint: strings.Join(flag.Args(), " "),
SubmissionId: submissionId,
NumCpus: entrypointNumCpus,
NumGpus: entrypointNumGpus,
}
if len(runtimeEnvJson) > 0 {
if err := json.Unmarshal([]byte(runtimeEnvJson), &req.RuntimeEnv); err != nil {
panic(err)
}
}
if len(metadataJson) > 0 {
if err := json.Unmarshal([]byte(metadataJson), &req.Metadata); err != nil {
panic(err)
}
}
if len(entrypointResources) > 0 {
if err := json.Unmarshal([]byte(entrypointResources), &req.Resources); err != nil {
panic(err)
}
}
rayjobsubmitter.Submit(address, req, os.Stdout)
}
98 changes: 98 additions & 0 deletions ray-operator/rayjobsubmitter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package rayjobsubmitter

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"

"github.com/coder/websocket"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)

func submitJobReq(address string, request utils.RayJobRequest) (jobId string, err error) {
rayJobJson, err := json.Marshal(request)
if err != nil {
return "", err
}

req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, address, bytes.NewBuffer(rayJobJson))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")

resp, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
defer func() { _ = resp.Body.Close() }()

body, _ := io.ReadAll(resp.Body)

if strings.Contains(string(body), "Please use a different submission_id") {
return request.SubmissionId, nil
}

if resp.StatusCode < 200 || resp.StatusCode > 299 {
return "", fmt.Errorf("SubmitJob fail: %s %s", resp.Status, string(body))
}

return request.SubmissionId, nil
}

func jobSubmissionURL(address string) string {
if !strings.HasPrefix(address, "http://") {
address = "http://" + address
}
address, err := url.JoinPath(address, "/api/jobs/") // the tailing "/" is required.
if err != nil {
panic(err)
}
return address
}

func logTailingURL(address, submissionId string) string {
address = strings.Replace(address, "http", "ws", 1)
address, err := url.JoinPath(address, submissionId, "/logs/tail")
if err != nil {
panic(err)
}
return address
}

func Submit(address string, req utils.RayJobRequest, out io.Writer) {
_, _ = fmt.Fprintf(out, "INFO -- Job submission server address: %s\n", address)

address = jobSubmissionURL(address)
submissionId, err := submitJobReq(address, req)
if err != nil {
panic(err)
}

_, _ = fmt.Fprintf(out, "SUCC -- Job '%s' submitted successfully\n", submissionId)
_, _ = fmt.Fprintf(out, "INFO -- Tailing logs until the job exits (disable with --no-wait):\n")

wsAddr := logTailingURL(address, submissionId)
c, _, err := websocket.Dial(context.Background(), wsAddr, nil)
if err != nil {
panic(err)
}
defer func() { _ = c.CloseNow() }()
for {
_, msg, err := c.Read(context.Background())
if err != nil {
if websocket.CloseStatus(err) == websocket.StatusNormalClosure {
_, _ = fmt.Fprintf(out, "SUCC -- Job '%s' succeeded\n", submissionId)
return
}
panic(err)
}
_, _ = out.Write(msg)
}
}
121 changes: 121 additions & 0 deletions ray-operator/test/e2erayjobsubmitter/e2e_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package e2erayjobsubmitter

import (
"bytes"
"fmt"
"os"
"os/exec"
"regexp"
"strings"
"testing"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/rayjobsubmitter"
)

var script = `import ray
import os

ray.init()

@ray.remote
class Counter:
def __init__(self):
# Used to verify runtimeEnv
self.name = os.getenv("counter_name")
assert self.name == "test_counter"
self.counter = 0

def inc(self):
self.counter += 1

def get_counter(self):
return "{} got {}".format(self.name, self.counter)

counter = Counter.remote()

for _ in range(5):
ray.get(counter.inc.remote())
print(ray.get(counter.get_counter.remote()))
`

func TestRayJobSubmitter(t *testing.T) {
// Create a temp job script
scriptpy, err := os.CreateTemp("", "counter.py")
if err != nil {
t.Fatalf("Failed to create job script: %v", err)
}
defer func() { _ = os.Remove(scriptpy.Name()) }()
if _, err = scriptpy.WriteString(script); err != nil {
t.Fatalf("Failed to write to job script: %v", err)
}
if err = scriptpy.Close(); err != nil {
t.Fatalf("Failed to close job script: %v", err)
}

// start ray
cmd := exec.Command("ray", "start", "--head", "--disable-usage-stats", "--include-dashboard=true")
out, err := cmd.CombinedOutput()
if err != nil {
t.Fatalf("Failed to start ray head: %v", err)
}
t.Log(string(out))
if cmd.ProcessState.ExitCode() != 0 {
t.Fatalf("Failed to start ray head with exit code: %v", cmd.ProcessState.ExitCode())
}
defer func() {
cmd := exec.Command("ray", "stop")
if _, err := cmd.CombinedOutput(); err != nil {
t.Fatalf("Failed to stop ray head: %v", err)
}
}()

var address string
re := regexp.MustCompile(`RAY_ADDRESS='([^']+)'`)
matches := re.FindStringSubmatch(string(out))
if len(matches) > 1 {
address = matches[1]
} else {
t.Fatalf("Failed to find RAY_ADDRESS from the ray start output")
}

testcases := []struct {
name string
out string
req utils.RayJobRequest
}{
{
name: "my-job-1",
req: utils.RayJobRequest{
Entrypoint: "python " + scriptpy.Name(),
RuntimeEnv: map[string]interface{}{"env_vars": map[string]string{"counter_name": "test_counter"}},
SubmissionId: "my-job-1",
},
out: "test_counter got 5",
},
{
name: "my-job-1-duplicated",
req: utils.RayJobRequest{
Entrypoint: "python " + scriptpy.Name(),
RuntimeEnv: map[string]interface{}{"env_vars": map[string]string{"counter_name": "test_counter"}},
SubmissionId: "my-job-1",
},
out: "test_counter got 5",
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
out := bytes.NewBuffer(nil)

rayjobsubmitter.Submit(address, tc.req, out)
for _, expected := range []string{
tc.out,
fmt.Sprintf("Job '%s' succeeded", tc.req.SubmissionId),
} {
if !strings.Contains(out.String(), tc.out) {
t.Errorf("Output did not contain expected string. output=%s\nexpected=%s\n", out.String(), expected)
}
}
})
}
}
Loading