-
Notifications
You must be signed in to change notification settings - Fork 764
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async runner improvements #2056
base: master
Are you sure you want to change the base?
Conversation
metaflow/runner/utils.py
Outdated
async def async_read_from_file_when_ready( | ||
file_path: str, command_obj: "CommandManager", timeout: float = 5 | ||
): | ||
await asyncio.wait_for(command_obj.process.wait(), timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, we wait for the process to finish with a timeout..
I am not sure we want that. The file content can be written earlier than the finish time of the process.
And we probably want the file content back as soon as possible..
The synchronous version doesn't wait for the process to finish before checking the file content. It keeps reading the file in a loop, and as soon as there is content, it returns it—even if the process is still running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't realize the poll
call isn't blocking. I rewrote it so that its the same as the sync version except a call to asyncio.sleep
.
The impl doesn't really sit right with me though, I can imagine a scenario in which a call to read
doesn't return the entire file content, because another write is issued by the subprocess immediately after. I think we could reimplement this with pipes and poll to see if the write side (subprocess) has closed the pipe and thus indicated it finished the writing. I could take a look at this after.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, we know for a fact that the file being written will only be written once..
This makes things a bit easier..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And we want to get the content ASAP since that allows us to fetch the Run
object..
(while the process is still running..)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@filipcacky has a point here though -- it is true we write once and in most cases it should work fine but it is up to the OS to determine how the writes proceed and there is no guarantee that the content will be written all at once. Another mechanism may be warranted. File locks are always a bit annoying though (and not always reliable). We could also check for the presence of a particular string at the end or something (the OS should ensure writes are at least done sequentially). It may be worth fixing now because these types of bugs are particularly nasty when they do occur (it's rare but not impossible).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably not worth doing in this PR, but when I have a bit more time I can take a look at it).
Reimplementing it for a peace of mind and or better ergonomics on the write side (not being forced to make only a single write) and faster read on the read side (reading until a pipe close, instead of waking up and checking for a non-null content) is I think worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can resolve this comment for now? But if possible..
@filipcacky can you take a stab at it within this PR itself?
raise TimeoutError( | ||
"Timeout while waiting for file content from '%s'" % file_path | ||
) | ||
await asyncio.sleep(0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the only difference with the sync
version is this sleep
statement?
@@ -9,6 +9,8 @@ | |||
import threading | |||
from typing import Callable, Dict, Iterator, List, Optional, Tuple | |||
|
|||
from .utils import check_process_exited | |||
|
|||
|
|||
def kill_process_and_descendants(pid, termination_timeout): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can remove this? If it is not being used, since you replaced it by a plural version..
Actually, it is used in the kill()
function..
Maybe you can modify kill
to use your plural version and then get rid of this one perhaps..
If you do it, we can copy over the TODO comments to the plural version..
also, you can use this kill
in _handle_sigint
and _async_handle_sigint
I guess we are close... @filipcacky Can you probably post an example / code snippet through which we can observe the behaviour before and after merging of this PR? aka what is the exact subtle nuanced behaviour you ran into that this PR solved for you.. I will make sure to verify from my side as well... |
Follow-up to #2053
Added an async implementation of Runner's signal handling, as discussed here #2053 (comment).
Added an async implementation of Runner's
__get_executing_run
, which is used inasync_run
andasync_resume
, the current implementation relies ontime.sleep
, which blocks other coroutines from running (modified code from my previous PR #2033).