Skip to content

Commit

Permalink
Get argo workflow inputs from template
Browse files Browse the repository at this point in the history
  • Loading branch information
totycro committed Sep 25, 2024
1 parent 1bde243 commit 6b3edf5
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 4 deletions.
53 changes: 49 additions & 4 deletions pygeoapi_kubernetes_papermill/argo.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@

INITIATOR_LABEL_VALUE = "pygeoapi-eoxhub"

WORKFLOW_ENTRYPOINT_NAME = "execute"


class ArgoManager(BaseManager):
def __init__(self, manager_def: dict) -> None:
Expand Down Expand Up @@ -270,7 +272,7 @@ def _execute_handler_async(
for key, value in data_dict.items()
]
},
"entrypoint": "execute",
"entrypoint": WORKFLOW_ENTRYPOINT_NAME,
"workflowTemplateRef": {"name": p.workflow_template},
},
}
Expand All @@ -286,16 +288,15 @@ class ArgoProcessor(BaseProcessor):
def __init__(self, processor_def: dict) -> None:
self.workflow_template: str = processor_def["workflow_template"]

# TODO: parse workflow file in namespace to fill in inputs, outputs, etc.

inputs = _inputs_from_workflow_template(self.workflow_template)
metadata = {
"version": "0.1.0",
"id": self.workflow_template,
"title": self.workflow_template,
"description": "",
"keywords": [""],
"links": [],
"inputs": {},
"inputs": inputs,
"outputs": {},
"example": {},
"jobControlOptions": [
Expand All @@ -306,6 +307,50 @@ def __init__(self, processor_def: dict) -> None:
super().__init__(processor_def, metadata)


def _inputs_from_workflow_template(workflow_template) -> dict:
try:
k8s_wf_template: dict = (
k8s_client.CustomObjectsApi().get_namespaced_custom_object(
group=WORKFLOWS_API_GROUP,
version=WORKFLOWS_API_VERSION,
plural="workflowtemplates",
name=workflow_template,
namespace=current_namespace(),
)
)
except kubernetes.client.rest.ApiException as e:
if e.status == HTTPStatus.NOT_FOUND:
raise Exception(
f"Failed to find workflow template {workflow_template}"
f" in {current_namespace()}"
) from e
else:
raise

try:
(entrypoint_template,) = [
template
for template in k8s_wf_template["spec"]["templates"]
if template.get("name") == WORKFLOW_ENTRYPOINT_NAME
]
except ValueError as e:
raise Exception(
f"Failed to find wf template entrypoint template {WORKFLOW_ENTRYPOINT_NAME}"
) from e

return {
parameter["name"]: {
"title": parameter["name"],
"description": "",
"schema": {"type": "string"},
"minOccurs": 0 if parameter.get("value") else 1,
"maxOccurs": 1,
"keywords": [],
}
for parameter in entrypoint_template.get("inputs", {}).get("parameters", [])
}


def job_from_k8s_wf(workflow: dict) -> JobDict:
annotations = workflow["metadata"]["annotations"] or {}
metadata = {
Expand Down
47 changes: 47 additions & 0 deletions tests/test_argo_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def manager(mock_k8s_base) -> ArgoManager:
def test_execute_process_starts_async_job(
manager: ArgoManager,
mock_create_workflow,
mock_get_workflow_template,
):
job_id = "abc"
result = manager.execute_process(
Expand Down Expand Up @@ -123,6 +124,15 @@ def test_delete_job_deletes_job(
mock_delete_workflow.assert_called()


def test_inputs_retrieved_from_workflow_template(
mock_k8s_base,
mock_get_workflow_template,
):
processor = ArgoProcessor({"name": "proc", "workflow_template": "whatever"})
assert processor.metadata["inputs"]["param"]["minOccurs"] == 1
assert processor.metadata["inputs"]["param-optional"]["minOccurs"] == 0


@pytest.fixture()
def mock_create_workflow():
with mock.patch(
Expand Down Expand Up @@ -167,6 +177,7 @@ def mock_create_workflow():

@pytest.fixture()
def mock_get_workflow():
# NOTE: mocks same function as get_workflow_template
with mock.patch(
"pygeoapi_kubernetes_papermill."
"kubernetes.k8s_client.CustomObjectsApi.get_namespaced_custom_object",
Expand All @@ -192,3 +203,39 @@ def mock_delete_workflow():
"kubernetes.k8s_client.CustomObjectsApi.delete_namespaced_custom_object",
) as mocker:
yield mocker


@pytest.fixture()
def mock_get_workflow_template():
# NOTE: mocks same function as get_workflow
with mock.patch(
"pygeoapi_kubernetes_papermill."
"kubernetes.k8s_client.CustomObjectsApi.get_namespaced_custom_object",
return_value=MOCK_WORKFLOW_TEMPLATE,
) as mocker:
yield mocker


MOCK_WORKFLOW_TEMPLATE = {
"metadata": {
"name": "test",
"namespace": "test",
},
"spec": {
"imagePullSecrets": [{"name": "flux-cerulean"}],
"serviceAccountName": "argo-workflow",
"templates": [
{
"container": {},
"inputs": {
"artifacts": [],
"parameters": [
{"name": "param"},
{"name": "param-optional", "value": "a"},
],
},
"name": "execute",
}
],
},
}

0 comments on commit 6b3edf5

Please sign in to comment.