Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add the option to use shell with the metaflow python Runner #2067

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions metaflow/runner/metaflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ class Runner(object):
directory is used.
file_read_timeout : int, default 3600
The timeout until which we try to read the runner attribute file.
shell : bool, default False
If True, the runner will execute commands directly through the shell.
Forwarding commands to shell can provide a performance improvement.
**kwargs : Any
Additional arguments that you would pass to `python myflow.py` before
the `run` command.
Expand All @@ -231,6 +234,7 @@ def __init__(
env: Optional[Dict] = None,
cwd: Optional[str] = None,
file_read_timeout: int = 3600,
shell: bool = False,
**kwargs
):
# these imports are required here and not at the top
Expand Down Expand Up @@ -261,6 +265,8 @@ def __init__(
self.top_level_kwargs = kwargs
self.api = MetaflowAPI.from_cli(self.flow_file, start)

self.shell = shell

def __enter__(self) -> "Runner":
return self

Expand Down Expand Up @@ -320,6 +326,7 @@ def run(self, **kwargs) -> ExecutingRun:
env=self.env_vars,
cwd=self.cwd,
show_output=self.show_output,
shell=self.shell,
)
command_obj = self.spm.get(pid)

Expand Down Expand Up @@ -354,6 +361,7 @@ def resume(self, **kwargs):
env=self.env_vars,
cwd=self.cwd,
show_output=self.show_output,
shell=self.shell,
)
command_obj = self.spm.get(pid)

Expand Down
42 changes: 31 additions & 11 deletions metaflow/runner/subprocess_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import os
import shlex
import time
import shutil
import signal
Expand Down Expand Up @@ -67,6 +68,7 @@ def run_command(
env: Optional[Dict[str, str]] = None,
cwd: Optional[str] = None,
show_output: bool = False,
shell: bool = False,
) -> int:
"""
Run a command synchronously and return its process ID.
Expand All @@ -87,14 +89,17 @@ def run_command(
CommandManager object:
- command_obj.log_files["stdout"]
- command_obj.log_files["stderr"]
shell : bool, default False
Whether to run the command in a shell or not.
Forwarding the command to shell can provide a performance improvement.
Returns
-------
int
The process ID of the subprocess.
"""

command_obj = CommandManager(command, env, cwd)
pid = command_obj.run(show_output=show_output)
pid = command_obj.run(show_output=show_output, shell=shell)
self.commands[pid] = command_obj
command_obj.sync_wait()
return pid
Expand Down Expand Up @@ -245,7 +250,7 @@ def sync_wait(self):
self.stdout_thread.join()
self.stderr_thread.join()

def run(self, show_output: bool = False):
def run(self, show_output: bool = False, shell=False):
"""
Run the subprocess synchronously. This can only be called once.

Expand All @@ -258,6 +263,9 @@ def run(self, show_output: bool = False):
They can be accessed later by reading the files present in:
- self.log_files["stdout"]
- self.log_files["stderr"]
shell : bool, default False
Whether to run the command in a shell or not.
Forwarding the command to shell can provide a performance improvement.
"""

if not self.run_called:
Expand All @@ -274,15 +282,27 @@ def stream_to_stdout_and_file(pipe, log_file):
pipe.close()

try:
self.process = subprocess.Popen(
self.command,
cwd=self.cwd,
env=self.env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
universal_newlines=True,
)
if shell:
self.process = subprocess.Popen(
shlex.join(self.command),
cwd=self.cwd,
env=self.env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
universal_newlines=True,
shell=True,
)
else:
self.process = subprocess.Popen(
self.command,
cwd=self.cwd,
env=self.env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1,
universal_newlines=True,
)

self.log_files["stdout"] = stdout_logfile
self.log_files["stderr"] = stderr_logfile
Expand Down