Skip to content

Commit

Permalink
Allow creating argo processors with each a differnet workflow template
Browse files Browse the repository at this point in the history
  • Loading branch information
totycro committed Sep 18, 2024
1 parent 9ae208a commit 36bb528
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pygeoapi_kubernetes_papermill/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .argo import ArgoManager # noqa
from .argo import ArgoManager, ArgoProcessor # noqa
from .kubernetes import KubernetesManager # noqa
from .notebook import PapermillNotebookKubernetesProcessor # noqa
from .image import ContainerImageKubernetesProcessor # noqa
30 changes: 26 additions & 4 deletions pygeoapi_kubernetes_papermill/argo.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@
import kubernetes.client.rest


from pygeoapi.process.manager.base import BaseManager, DATETIME_FORMAT
from pygeoapi.process.manager.base import BaseManager, DATETIME_FORMAT, BaseProcessor
from pygeoapi.util import (
JobStatus,
Subscriber,
RequestedResponse,
ProcessExecutionMode,
)

from pygeoapi.process.base import (
Expand Down Expand Up @@ -94,7 +95,6 @@ def __init__(self, manager_def: dict) -> None:

self.namespace = current_namespace()

self.workflow_template: str = manager_def["workflow_template"]
self.custom_objects_api = k8s_client.CustomObjectsApi()
# self.core_api = k8s_client.CoreV1Api()

Expand Down Expand Up @@ -182,7 +182,7 @@ def delete_job(self, job_id) -> bool:

def _execute_handler_async(
self,
p: ArgoManager,
p: ArgoProcessor,
job_id,
data_dict,
requested_outputs: Optional[dict] = None,
Expand Down Expand Up @@ -226,7 +226,7 @@ def _execute_handler_async(
]
},
"entrypoint": "test",
"workflowTemplateRef": {"name": self.workflow_template},
"workflowTemplateRef": {"name": p.workflow_template},
},
}
self.custom_objects_api.create_namespaced_custom_object(
Expand All @@ -237,6 +237,28 @@ def _execute_handler_async(
return ("application/json", {}, JobStatus.accepted)


class ArgoProcessor(BaseProcessor):
def __init__(self, processor_def: dict) -> None:
metadata = {
"version": "0.1.0",
"id": "",
"title": "",
"description": "",
"keywords": [""],
"links": [],
"inputs": {},
"outputs": {},
"example": {},
"jobControlOptions": [
ProcessExecutionMode.async_execute.value,
ProcessExecutionMode.sync_execute.value,
],
}
super().__init__(processor_def, metadata)

self.workflow_template: str = processor_def["workflow_template"]


def job_from_k8s_wf(workflow: dict) -> JobDict:
annotations = workflow["metadata"]["annotations"] or {}
metadata = {
Expand Down
11 changes: 6 additions & 5 deletions tests/test_argo_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@
from pygeoapi_kubernetes_papermill import (
ArgoManager,
)
from pygeoapi.process.base import BaseProcessor
from pygeoapi_kubernetes_papermill.argo import ArgoProcessor


@pytest.fixture()
def manager(mock_k8s_base) -> ArgoManager:
man = ArgoManager(
{"name": "kman", "skip_k8s_setup": True, "workflow_template": "mytemplate"}
)
man.get_processor = lambda *args, **kwargs: BaseProcessor(
{"name": ""}, {"jobControlOptions": "async-execute"}
man.get_processor = lambda *args, **kwargs: ArgoProcessor(
{"name": "", "workflow_template": "mywf"}
)
return man

Expand Down Expand Up @@ -98,9 +98,10 @@ def test_get_job_returns_workflow(
):
job_id = "abc"
job = manager.get_job(job_id=job_id)
assert job
assert job["identifier"] == "annotations-identifier"
assert json.loads(job["parameters"]) == {"inpfile": "test2.txt"}
assert job["job_start_datetime"] == "2024-09-18T12:01:02.000000Z"
assert json.loads(job["parameters"]) == {"inpfile": "test2.txt"} # type: ignore
assert job.get("job_start_datetime") == "2024-09-18T12:01:02.000000Z"
assert job["status"] == "successful"


Expand Down

0 comments on commit 36bb528

Please sign in to comment.