diff --git a/pygeoapi_kubernetes_papermill/__init__.py b/pygeoapi_kubernetes_papermill/__init__.py index 0e07954..2b00ce8 100644 --- a/pygeoapi_kubernetes_papermill/__init__.py +++ b/pygeoapi_kubernetes_papermill/__init__.py @@ -1,4 +1,4 @@ -from .argo import ArgoManager # noqa +from .argo import ArgoManager, ArgoProcessor # noqa from .kubernetes import KubernetesManager # noqa from .notebook import PapermillNotebookKubernetesProcessor # noqa from .image import ContainerImageKubernetesProcessor # noqa diff --git a/pygeoapi_kubernetes_papermill/argo.py b/pygeoapi_kubernetes_papermill/argo.py index 7a181a1..484c426 100644 --- a/pygeoapi_kubernetes_papermill/argo.py +++ b/pygeoapi_kubernetes_papermill/argo.py @@ -41,11 +41,12 @@ import kubernetes.client.rest -from pygeoapi.process.manager.base import BaseManager, DATETIME_FORMAT +from pygeoapi.process.manager.base import BaseManager, DATETIME_FORMAT, BaseProcessor from pygeoapi.util import ( JobStatus, Subscriber, RequestedResponse, + ProcessExecutionMode, ) from pygeoapi.process.base import ( @@ -94,7 +95,6 @@ def __init__(self, manager_def: dict) -> None: self.namespace = current_namespace() - self.workflow_template: str = manager_def["workflow_template"] self.custom_objects_api = k8s_client.CustomObjectsApi() # self.core_api = k8s_client.CoreV1Api() @@ -182,7 +182,7 @@ def delete_job(self, job_id) -> bool: def _execute_handler_async( self, - p: ArgoManager, + p: ArgoProcessor, job_id, data_dict, requested_outputs: Optional[dict] = None, @@ -226,7 +226,7 @@ def _execute_handler_async( ] }, "entrypoint": "test", - "workflowTemplateRef": {"name": self.workflow_template}, + "workflowTemplateRef": {"name": p.workflow_template}, }, } self.custom_objects_api.create_namespaced_custom_object( @@ -237,6 +237,28 @@ def _execute_handler_async( return ("application/json", {}, JobStatus.accepted) +class ArgoProcessor(BaseProcessor): + def __init__(self, processor_def: dict) -> None: + metadata = { + "version": "0.1.0", + "id": "", + "title": "", + "description": "", + "keywords": [""], + "links": [], + "inputs": {}, + "outputs": {}, + "example": {}, + "jobControlOptions": [ + ProcessExecutionMode.async_execute.value, + ProcessExecutionMode.sync_execute.value, + ], + } + super().__init__(processor_def, metadata) + + self.workflow_template: str = processor_def["workflow_template"] + + def job_from_k8s_wf(workflow: dict) -> JobDict: annotations = workflow["metadata"]["annotations"] or {} metadata = { diff --git a/tests/test_argo_manager.py b/tests/test_argo_manager.py index b40cd54..c9390ac 100644 --- a/tests/test_argo_manager.py +++ b/tests/test_argo_manager.py @@ -37,7 +37,7 @@ from pygeoapi_kubernetes_papermill import ( ArgoManager, ) -from pygeoapi.process.base import BaseProcessor +from pygeoapi_kubernetes_papermill.argo import ArgoProcessor @pytest.fixture() @@ -45,8 +45,8 @@ def manager(mock_k8s_base) -> ArgoManager: man = ArgoManager( {"name": "kman", "skip_k8s_setup": True, "workflow_template": "mytemplate"} ) - man.get_processor = lambda *args, **kwargs: BaseProcessor( - {"name": ""}, {"jobControlOptions": "async-execute"} + man.get_processor = lambda *args, **kwargs: ArgoProcessor( + {"name": "", "workflow_template": "mywf"} ) return man @@ -98,9 +98,10 @@ def test_get_job_returns_workflow( ): job_id = "abc" job = manager.get_job(job_id=job_id) + assert job assert job["identifier"] == "annotations-identifier" - assert json.loads(job["parameters"]) == {"inpfile": "test2.txt"} - assert job["job_start_datetime"] == "2024-09-18T12:01:02.000000Z" + assert json.loads(job["parameters"]) == {"inpfile": "test2.txt"} # type: ignore + assert job.get("job_start_datetime") == "2024-09-18T12:01:02.000000Z" assert job["status"] == "successful"