Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add the option to use shell with the metaflow python Runner #2067

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

HarryAnkers
Copy link

Hi,
Raising a pr to allow the subprocesses to be executed directly through the shell.

Locally this had a massive performance improvement for an extreamly lightweight flow for me.
18.618823051452637
to
3.2076590061187744

I'm not 100% on where the increase comes from. I suspect this is as the shell is caching python instead of directly executing it each time.

Thank you

@HarryAnkers HarryAnkers marked this pull request as draft September 30, 2024 18:02
@HarryAnkers HarryAnkers marked this pull request as ready for review September 30, 2024 18:03
@savingoyal
Copy link
Collaborator

@HarryAnkers, do you have a reproducible example that demonstrates the speed-up? We don't set shell=True for many reasons (security, portability, etc.), and I am curious to dig into what is actually causing the slowdown/speedup. That will help us understand whether this PR is in the right direction or not.

@HarryAnkers
Copy link
Author

@savingoyal thanks for the reply.

Yes here's a two file setup that shows this issue

test.py

import json
import os
import subprocess
import tempfile
import time
from typing import Any

from metaflow import Runner


def test_a(
    workflow_module_path: str,
    workflow_params: dict[str, Any],
    workflow_input_payload: str,
) -> None:
    start_time = time.time()
    print(f"Starting runner execution_time={time.time() - start_time}")
    with Runner(workflow_module_path, pylint=False) as runner:
        running = runner.run(**workflow_params, input_payload=workflow_input_payload)
        print(f"Runner run execution_time={time.time() - start_time}")

        if running.status == "successful":
            print(f"Runner completed execution_time={time.time() - start_time}")


def test_b(
    workflow_module_path: str,
    workflow_params: dict[str, Any],
    workflow_input_payload: str,
) -> None:
    start_time = time.time()
    print(f"Starting runner workaround execution_time={time.time() - start_time}")
    with tempfile.TemporaryDirectory(dir="/tmp") as temp_dir:
        tfp_runner_attribute = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)

        print(f"Started runner execution_time={time.time() - start_time}")
        subprocess.run(
            f"python {workflow_module_path} --no-pylint run --runner-attribute-file '{tfp_runner_attribute.name}' --input_payload '{workflow_input_payload}'",
            shell=True,
        )
        print(f"Runner workaround completed execution_time={time.time() - start_time}")


if __name__ == "__main__":
    cwd = os.path.dirname(os.path.abspath(__file__))
    workflow_module_path = os.path.join(cwd, "workflow.py")

    workflow_input_payload = json.dumps(
        {"request_id": "5c76e196-5ef2-443d-a036-25755a9a5fdd"}
    )

    workflow_params = {"some_key_1": "key_1", "some_key_2": "key_2"}
    print(
        f"Following both test implementations with workflow_module_path={workflow_module_path}"
    )
    test_a(
        workflow_module_path,
        workflow_params,
        workflow_input_payload,
    )

    test_b(
        workflow_module_path,
        workflow_params,
        workflow_input_payload,
    )

workflow.py

from metaflow import FlowSpec, JSONType, Parameter, step

from appraisal_api.domain.workflows.metaflow.io_schemas import (
    SampleFlowResponse,
    SampleFlowResult,
)


class SimpleFlow(FlowSpec):
    input_payload = Parameter(
        "input_payload",
        help="The payload containing the images to be processed.",
        required=True,
        type=JSONType,
    )

    some_key_1 = Parameter(
        "some_key_1",
        help="Some Extra Parameter provided to the workflow",
        type=str,
        default="key_1",
    )

    some_key_2 = Parameter(
        "some_key_2",
        help="Some Extra Parameter provided to the workflow",
        type=str,
        default="key_2",
    )

    @step
    def start(self) -> None:
        print("This is the start step.")
        self.next(self.process_data)

    @step
    def process_data(self) -> None:
        print("This is the data processing step.")
        self.next(self.end)

    @step
    def end(self) -> None:
        result = SampleFlowResult(img_path="path/to/image.jpg", result="success")
        self.output_payload = SampleFlowResponse(
            operation="sample_flow",
            response=[result],
        )
        print("This is the end step.")


if __name__ == "__main__":
    SimpleFlow()

To run this setup run test.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants