Skip to content

Commit

Permalink
Implement pagination in the kubernetes manager
Browse files Browse the repository at this point in the history
  • Loading branch information
totycro committed Aug 5, 2024
1 parent 2e3e36b commit 0ebbf77
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 10 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 1.4.0
* Implement pagination

## 1.3.0
* Save all process configuration as parameters, not just input values

Expand Down
30 changes: 22 additions & 8 deletions pygeoapi_kubernetes_papermill/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def execute(self):
JobDict = TypedDict(
"JobDict",
{
"identifier": str,
"status": str,
"result-notebook": str,
"message": str,
Expand Down Expand Up @@ -132,25 +133,38 @@ def __init__(self, manager_def: dict) -> None:
self.batch_v1 = k8s_client.BatchV1Api()
self.core_api = k8s_client.CoreV1Api()

def get_jobs(self, status=None) -> list[JobDict]:
def get_jobs(self, status=None, limit=None, offset=None) -> list[JobDict]:
"""
Get process jobs, optionally filtered by status
:param status: job status (accepted, running, successful,
failed, results) (default is all)
:param limit: number of jobs to return
:param offset: pagination offset
:returns: `list` of jobs (identifier, status, process identifier)
:returns: dict of list of jobs (identifier, status, process identifier)
and numberMatched
"""

k8s_jobs: k8s_client.V1Joblist = self.batch_v1.list_namespaced_job(
namespace=self.namespace,
)
k8s_jobs = [
k8s_job
for k8s_job in self.batch_v1.list_namespaced_job(
namespace=self.namespace,
).items
if is_k8s_job_name(k8s_job.metadata.name)
]

# NOTE: need to paginate before expensive single job serialization
if offset:
k8s_jobs = k8s_jobs[offset:]

if limit:
k8s_jobs = k8s_jobs[:limit]

# TODO: implement status filter

return [
job_from_k8s(k8s_job, self._job_message(k8s_job))
for k8s_job in k8s_jobs.items
if is_k8s_job_name(k8s_job.metadata.name)
job_from_k8s(k8s_job, self._job_message(k8s_job)) for k8s_job in k8s_jobs
]

def get_job(self, job_id) -> Optional[JobDict]:
Expand Down
10 changes: 10 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ def k8s_job_failed(k8s_job: k8s_client.V1Job) -> k8s_client.V1Job:
return failed_job


@pytest.fixture()
def many_k8s_jobs(k8s_job: k8s_client.V1Job) -> list[k8s_client.V1Job]:
def create_job(i: int) -> k8s_client.V1Job:
new_job = deepcopy(k8s_job)
new_job.metadata.annotations["pygeoapi.io/identifier"] = f"job-{i}"
return new_job

return [create_job(i) for i in range(13)]


@pytest.fixture()
def mock_read_job(k8s_job):
with mock.patch(
Expand Down
16 changes: 14 additions & 2 deletions tests/test_kubernetes_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@


@contextmanager
def mock_list_jobs_with(k8s_job):
def mock_list_jobs_with(*args):
with mock.patch(
"pygeoapi_kubernetes_papermill."
"kubernetes.k8s_client.BatchV1Api.list_namespaced_job",
return_value=k8s_client.V1JobList(items=[k8s_job]),
return_value=k8s_client.V1JobList(items=args),
):
yield

Expand Down Expand Up @@ -366,3 +366,15 @@ def test_failure_notification_is_sent_for_failing_job(k8s_job_failed, mock_patch

mock_post.assert_called_once()
mock_patch_job.assert_called_once()


def test_kubernetes_manager_handles_pagination(
manager: KubernetesManager,
mock_list_pods_no_container_status,
many_k8s_jobs,
):
with mock_list_jobs_with(*many_k8s_jobs):
jobs = manager.get_jobs(offset=3, limit=2)

assert len(jobs) == 2
assert [job["identifier"] for job in jobs] == ["job-3", "job-4"]

0 comments on commit 0ebbf77

Please sign in to comment.