Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RayJob][Refactor] use ray job logs to capture logs and be tolerant of duplicated job submissions #2579

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

rueian
Copy link
Contributor

@rueian rueian commented Nov 27, 2024

Why are these changes needed?

This is a PoC of using ray job submit --no-wait + ray job logs --follow to be tolerant of duplicated job submissions.

In summary, we can use this shell script to submit to workaround duplicated submissions issue:

ray job submit --address http://test-url --submission-id test-job-id --no-wait -- echo hello world 2>&1 | grep -zv 'Please use a different submission_id';
ray job logs --address http://test-url --follow test-job-id

Related issue number

Checks

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

@rueian rueian changed the title [RayJob][Refactor] use ray job logs to capture logs and be tolerant to duplicated job submission [RayJob][Refactor] use ray job logs to capture logs and be tolerant of duplicated job submissions Nov 27, 2024
… to duplicated job submission

Signed-off-by: Rueian <[email protected]>
@andrewsykim
Copy link
Collaborator

andrewsykim commented Nov 27, 2024

I used a similar workaround in here https://github.com/ray-project/kuberay/blob/master/benchmark/perf-tests/10000-rayjob/pytorch-mnist-rayjob.yaml#L23-L24

The problem is that with either solution, either ray job submit or ray job logs will raise an exception and I'm concerned users will be misled to think their job failed. But it does solve the issue with duplicate submission IDs and submission retries

@rueian
Copy link
Contributor Author

rueian commented Nov 27, 2024

Hi @andrewsykim, thanks for the workaround! I don't notice that before.

Here is an example scenario of a duplicated submission with this PoC:

▶ kubectl logs -f rayjob-sample-9gvpq
2024-11-26 18:40:07,699	INFO cli.py:36 -- Job submission server address: http://rayjob-sample-raycluster-m9p8c-head-svc.default.svc.cluster.local:8265
Traceback (most recent call last):
  File "/home/ray/anaconda3/bin/ray", line 8, in <module>
    sys.exit(main())
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/scripts/scripts.py", line 2498, in main
    return cli()
  File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/cli_utils.py", line 54, in wrapper
    return func(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/autoscaler/_private/cli_logger.py", line 856, in wrapper
    return f(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/cli.py", line 272, in submit
    job_id = client.submit_job(
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/sdk.py", line 254, in submit_job
    self._raise_error(r)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/dashboard_sdk.py", line 283, in _raise_error
    raise RuntimeError(
RuntimeError: Request failed with status code 500: Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/job_head.py", line 287, in submit_job
    resp = await job_agent_client.submit_job_internal(submit_request)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/job_head.py", line 80, in submit_job_internal
    await self._raise_error(resp)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/job_head.py", line 68, in _raise_error
    raise RuntimeError(f"Request failed with status code {status}: {error_text}.")
RuntimeError: Request failed with status code 400: Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/job_agent.py", line 45, in submit_job
    submission_id = await self.get_job_manager().submit_job(
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/job_manager.py", line 945, in submit_job
    raise ValueError(
ValueError: Job with submission_id rayjob-sample-6tqdm already exists. Please use a different submission_id.
.
.
2024-11-26 18:40:08,975	INFO cli.py:36 -- Job submission server address: http://rayjob-sample-raycluster-m9p8c-head-svc.default.svc.cluster.local:8265
2024-11-26 18:40:03,772	INFO worker.py:1405 -- Using address 10.244.0.31:6379 set in the environment variable RAY_ADDRESS
2024-11-26 18:40:03,772	INFO worker.py:1540 -- Connecting to existing Ray cluster at address: 10.244.0.31:6379...
2024-11-26 18:40:03,778	INFO worker.py:1715 -- Connected to Ray cluster. View the dashboard at http://10.244.0.31:8265
test_counter got 1
test_counter got 2
test_counter got 3
test_counter got 4
test_counter got 5
2024-11-26 18:40:17,022	SUCC cli.py:60 -- -----------------------------------
2024-11-26 18:40:17,023	SUCC cli.py:61 -- Job 'rayjob-sample-6tqdm' succeeded
2024-11-26 18:40:17,023	SUCC cli.py:62 -- -----------------------------------

Note that in this PoC, we do ray job submit --no-wait first and then ray job logs --follow. That is exactly how ray job submit works under the hood without the --no-wait flag.

To summarize:
If ray job submit really fails, then ray job logs will not succeed either and k8s will retry the job.
If ray job submit fails with duplication submissions, then users can see a message like ValueError: Job with submission_id rayjob-sample-6tqdm already exists. Please use a different submission_id.. I think it is less confusing.

Besides this PoC, I am also working on a lightweight submitter, where I will use the same approach but implement it in Go. I think it will probably be packaged into the same image of kuberay operator, so that we don't need to maintain another docker release.

@andrewsykim
Copy link
Collaborator

To summarize:
If ray job submit really fails, then ray job logs will not succeed either and k8s will retry the job.
If ray job submit fails with duplication submissions, then users can see a message like ValueError: Job with submission_id rayjob-sample-6tqdm already exists. Please use a different submission_id.. I think it is less confusing.

This lines up with the results I had. The problem is still dumping a stack trace from an exception, I think users will find that really confusing. I wonder if there's an option where we can run the first command but just pipe the output to /dev/null and use the exit code to determine what command to run next?

@rueian
Copy link
Contributor Author

rueian commented Nov 27, 2024

To summarize:
If ray job submit really fails, then ray job logs will not succeed either and k8s will retry the job.
If ray job submit fails with duplication submissions, then users can see a message like ValueError: Job with submission_id rayjob-sample-6tqdm already exists. Please use a different submission_id.. I think it is less confusing.

This lines up with the results I had. The problem is still dumping a stack trace from an exception, I think users will find that really confusing. I wonder if there's an option where we can run the first command but just pipe the output to /dev/null and use the exit code to determine what command to run next?

I think we can pipe to grep -zv 'Please use a different submission_id'. That can hide the stack trace of a duplication submission:

▶ kubectl logs -f rayjob-sample-rxw2m
2024-11-26 22:04:56,033	INFO cli.py:36 -- Job submission server address: http://rayjob-sample-raycluster-5npzh-head-svc.default.svc.cluster.local:8265
2024-11-26 22:04:59,053	INFO worker.py:1405 -- Using address 10.244.0.41:6379 set in the environment variable RAY_ADDRESS
2024-11-26 22:04:59,053	INFO worker.py:1540 -- Connecting to existing Ray cluster at address: 10.244.0.41:6379...
2024-11-26 22:04:59,058	INFO worker.py:1715 -- Connected to Ray cluster. View the dashboard at http://10.244.0.41:8265
test_counter got 1
test_counter got 2
test_counter got 3
test_counter got 4
test_counter got 5
2024-11-26 22:05:12,112	SUCC cli.py:60 -- -----------------------------------
2024-11-26 22:05:12,112	SUCC cli.py:61 -- Job 'rayjob-sample-964gd' succeeded
2024-11-26 22:05:12,113	SUCC cli.py:62 -- -----------------------------------

while stack trace can still be printed for other errors:

▶ kubectl logs -f rayjob-sample-5vpwg
Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.8/site-packages/urllib3/connection.py", line 174, in _new_conn
    conn = connection.create_connection(
  File "/home/ray/anaconda3/lib/python3.8/site-packages/urllib3/util/connection.py", line 72, in create_connection
    for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
  File "/home/ray/anaconda3/lib/python3.8/socket.py", line 918, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -2] Name or service not known

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 715, in urlopen
    httplib_response = self._make_request(
  File "/home/ray/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 416, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/urllib3/connection.py", line 244, in request
    super(HTTPConnection, self).request(method, url, body=body, headers=headers)
  File "/home/ray/anaconda3/lib/python3.8/http/client.py", line 1256, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/home/ray/anaconda3/lib/python3.8/http/client.py", line 1302, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/home/ray/anaconda3/lib/python3.8/http/client.py", line 1251, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/home/ray/anaconda3/lib/python3.8/http/client.py", line 1011, in _send_output
    self.send(msg)
  File "/home/ray/anaconda3/lib/python3.8/http/client.py", line 951, in send
    self.connect()
  File "/home/ray/anaconda3/lib/python3.8/site-packages/urllib3/connection.py", line 205, in connect
    conn = self._new_conn()
  File "/home/ray/anaconda3/lib/python3.8/site-packages/urllib3/connection.py", line 186, in _new_conn
    raise NewConnectionError(
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0xffff8f02d430>: Failed to establish a new connection: [Errno -2] Name or service not known

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.8/site-packages/requests/adapters.py", line 486, in send
    resp = conn.urlopen(
  File "/home/ray/anaconda3/lib/python3.8/site-packages/urllib3/connectionpool.py", line 799, in urlopen
    retries = retries.increment(
  File "/home/ray/anaconda3/lib/python3.8/site-packages/urllib3/util/retry.py", line 592, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='', port=80): Max retries exceeded with url: /api/version (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0xffff8f02d430>: Failed to establish a new connection: [Errno -2] Name or service not known'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/dashboard_sdk.py", line 262, in _check_connection_and_version_with_url
    r = self._do_request("GET", url)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/dashboard_sdk.py", line 303, in _do_request
    return requests.request(
  File "/home/ray/anaconda3/lib/python3.8/site-packages/requests/api.py", line 59, in request
    return session.request(method=method, url=url, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/requests/sessions.py", line 589, in request
    resp = self.send(prep, **send_kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/requests/sessions.py", line 703, in send
    r = adapter.send(request, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/requests/adapters.py", line 519, in send
    raise ConnectionError(e, request=request)
requests.exceptions.ConnectionError: HTTPConnectionPool(host='', port=80): Max retries exceeded with url: /api/version (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0xffff8f02d430>: Failed to establish a new connection: [Errno -2] Name or service not known'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ray/anaconda3/bin/ray", line 8, in <module>
    sys.exit(main())
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/scripts/scripts.py", line 2498, in main
    return cli()
  File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/cli_utils.py", line 54, in wrapper
    return func(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/autoscaler/_private/cli_logger.py", line 856, in wrapper
    return f(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/cli.py", line 263, in submit
    client = _get_sdk_client(
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/cli.py", line 29, in _get_sdk_client
    client = JobSubmissionClient(
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/sdk.py", line 109, in __init__
    self._check_connection_and_version(
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/dashboard_sdk.py", line 248, in _check_connection_and_version
    self._check_connection_and_version_with_url(min_version, version_error_message)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/dashboard_sdk.py", line 278, in _check_connection_and_version_with_url
    raise ConnectionError(
ConnectionError: Failed to connect to Ray at address: http://rayjob-sample-raycluster-clq7c-head-svc.default.svc.cluster.local:8265
2024-11-26 22:16:53,246	INFO cli.py:36 -- Job submission server address: http://rayjob-sample-raycluster-clq7c-head-svc.default.svc.cluster.local:8265
Traceback (most recent call last):
  File "/home/ray/anaconda3/bin/ray", line 8, in <module>
    sys.exit(main())
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/scripts/scripts.py", line 2498, in main
    return cli()
  File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1688, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/cli_utils.py", line 54, in wrapper
    return func(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/autoscaler/_private/cli_logger.py", line 856, in wrapper
    return f(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/cli.py", line 481, in logs
    get_or_create_event_loop().run_until_complete(_tail_logs(client, job_id))
  File "/home/ray/anaconda3/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/cli.py", line 93, in _tail_logs
    async for lines in client.tail_job_logs(job_id):
  File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/dashboard/modules/job/sdk.py", line 484, in tail_job_logs
    ws = await session.ws_connect(
  File "/home/ray/anaconda3/lib/python3.8/site-packages/aiohttp/client.py", line 821, in _ws_connect
    raise WSServerHandshakeError(
aiohttp.client_exceptions.WSServerHandshakeError: 404, message='Invalid response status', url=URL('http://rayjob-sample-raycluster-clq7c-head-svc.default.svc.cluster.local:8265/api/jobs/rayjob-sample-pnd2l/logs/tail')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants