Skip to content

Commit

Permalink
Add view to retrieve log output of job from external server
Browse files Browse the repository at this point in the history
  • Loading branch information
totycro committed Nov 26, 2024
1 parent e649e5c commit f37ef3a
Show file tree
Hide file tree
Showing 12 changed files with 262 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-and-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
- name: test and push
run: |
IMG="eurodatacube/pygeoapi-eoxhub:${{ steps.get_image_tag.outputs.TAG }}"
docker run --env PYGEOAPI_CONFIG=/pkp/tests/pygeoapi-test-config.yaml --entrypoint python3 "${IMG}" -m pytest || exit 1
docker run --env PYGEOAPI_CONFIG=/pkp/tests/pygeoapi-test-config.yaml --env PYGEOAPI_OPENAPI=/pygeoapi/tests/pygeoapi-test-openapi.yml --entrypoint python3 "${IMG}" -m pytest || exit 1
docker run --entrypoint flake8 "${IMG}" pygeoapi_kubernetes_papermill tests || exit 1
docker run --entrypoint mypy "${IMG}" pygeoapi_kubernetes_papermill tests || exit 1
docker push "${IMG}"
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 1.5.9
* Add view to retrieve log output of job from external server

## 1.5.8
* Add argo backend
* Allow setting custom resource requirements for s3fs
Expand Down
7 changes: 2 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
test: build
docker run --env PYGEOAPI_CONFIG=/pkp/tests/pygeoapi-test-config.yaml --entrypoint python3 pkp:1 -m pytest
docker run --env PYGEOAPI_CONFIG=/pkp/tests/pygeoapi-test-config.yaml --env PYGEOAPI_OPENAPI=/pygeoapi/tests/pygeoapi-test-openapi.yml --entrypoint python3 pkp:1 -m pytest
docker run --entrypoint flake8 pkp:1 pygeoapi_kubernetes_papermill tests
docker run --entrypoint mypy pkp:1 pygeoapi_kubernetes_papermill tests

build:
docker build . -t pkp:1

test-watch: build
docker run --env PYGEOAPI_CONFIG=/pkp/tests/pygeoapi-test-config.yaml --volume `pwd`:/pkp --entrypoint ptw pkp:1

bash: build
docker run --env PYGEOAPI_CONFIG=/pkp/tests/pygeoapi-test-config.yaml --volume `pwd`:/pkp -it --entrypoint bash pkp:1
docker run --env PYGEOAPI_CONFIG=/pkp/tests/pygeoapi-test-config.yaml --env PYGEOAPI_OPENAPI=/pygeoapi/tests/pygeoapi-test-openapi.yml --volume `pwd`:/pkp -it --entrypoint bash pkp:1

upgrade-packages:
docker run --volume `pwd`:/pkp --rm --user 0 --entrypoint bash -it pkp:1 -c "python3 -m pip install pip-upgrader && pip-upgrade --skip-package-installation"
Expand Down
1 change: 1 addition & 0 deletions pygeoapi_kubernetes_papermill/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from .kubernetes import KubernetesManager # noqa
from .notebook import PapermillNotebookKubernetesProcessor # noqa
from .image import ContainerImageKubernetesProcessor # noqa
from . import log_view # noqa
2 changes: 2 additions & 0 deletions pygeoapi_kubernetes_papermill/argo.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ def __init__(self, manager_def: dict) -> None:
self.custom_objects_api = k8s_client.CustomObjectsApi()
# self.core_api = k8s_client.CoreV1Api()

self.log_query_endpoint: str = manager_def["log_query_endpoint"]

def get_jobs(self, status=None, limit=None, offset=None) -> dict:
"""
Get process jobs, optionally filtered by status
Expand Down
2 changes: 2 additions & 0 deletions pygeoapi_kubernetes_papermill/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ def __init__(self, manager_def: dict) -> None:
self.batch_v1 = k8s_client.BatchV1Api()
self.core_api = k8s_client.CoreV1Api()

self.log_query_endpoint: str = manager_def["log_query_endpoint"]

def get_jobs(self, status=None, limit=None, offset=None) -> dict:
"""
Get process jobs, optionally filtered by status
Expand Down
74 changes: 74 additions & 0 deletions pygeoapi_kubernetes_papermill/log_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# =================================================================
#
# Authors: Bernhard Mallinger <[email protected]>
#
# Copyright (C) 2024 EOX IT Services GmbH <https://eox.at>
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================

from http import HTTPStatus
import logging
import itertools

import requests

# NOTE: this assumes flask_app, which is the default.
from pygeoapi.flask_app import APP, api_


LOGGER = logging.getLogger(__name__)


@APP.get("/processes/<process_id>/jobs/<job_id>/logs")
def get_job_logs(process_id, job_id):
LOGGER.debug(f"Retrieving job logs for {process_id} {job_id}")

log_query_endpoint = getattr(api_.manager, "log_query_endpoint", None)
if not log_query_endpoint:
headers, status, content = api_.get_exception(
HTTPStatus.INTERNAL_SERVER_ERROR,
{},
"json",
"NoApplicableCode",
"logs not configured for this pygeoapi instance",
)
return content, status, headers
else:
namespace = api_.manager.namespace
response = requests.get(
log_query_endpoint,
params={
"query": f"{{job={namespace}/{job_id}}}",
},
)
response.raise_for_status()
streams = response.json()["data"]["result"]

# here let's just mix stdout/stderr
return "\n".join(
entry[1]
for entry in sorted(
itertools.chain.from_iterable(result["values"] for result in streams)
)
)
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def mock_k8s_base():
"pygeoapi_kubernetes_papermill.kubernetes.current_namespace"
), mock.patch(
"pygeoapi_kubernetes_papermill.notebook.current_namespace",
), mock.patch(
"pygeoapi_kubernetes_papermill.argo.current_namespace",
):
yield

Expand Down
1 change: 1 addition & 0 deletions tests/pygeoapi-test-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ server:
manager:
name: pygeoapi_kubernetes_papermill.KubernetesManager
skip_k8s_setup: true
log_query_endpoint: "https://www.example.com"

logging:
level: DEBUG
Expand Down
7 changes: 2 additions & 5 deletions tests/test_argo_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,13 @@
from kubernetes import client as k8s_client

from pygeoapi.util import JobStatus, RequestedProcessExecutionMode, Subscriber
from pygeoapi_kubernetes_papermill import (
ArgoManager,
)
from pygeoapi_kubernetes_papermill.argo import ArgoProcessor
from pygeoapi_kubernetes_papermill.argo import ArgoProcessor, ArgoManager


@pytest.fixture()
def manager(mock_k8s_base) -> ArgoManager:
man = ArgoManager(
{"name": "kman", "skip_k8s_setup": True, "workflow_template": "mytemplate"}
{"name": "kman", "skip_k8s_setup": True, "log_query_endpoint": ""}
)
man.get_processor = lambda *args, **kwargs: ArgoProcessor(
{"name": "", "workflow_template": "mywf"}
Expand Down
4 changes: 3 additions & 1 deletion tests/test_kubernetes_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ def mock_wait_for_result_file():

@pytest.fixture()
def manager(mock_k8s_base, papermill_processor) -> KubernetesManager:
man = KubernetesManager({"name": "kman", "skip_k8s_setup": True})
man = KubernetesManager(
{"name": "kman", "skip_k8s_setup": True, "log_query_endpoint": ""}
)
man.get_processor = lambda *args, **kwargs: papermill_processor
return man

Expand Down
169 changes: 169 additions & 0 deletions tests/test_log_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# =================================================================
#
# Authors: Bernhard Mallinger <[email protected]>
#
# Copyright (C) 2024 EOX IT Services GmbH <https://eox.at>
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================


from http import HTTPStatus
from unittest import mock
import pytest
import json

import requests


@pytest.fixture()
def mock_load_openapi():
# we really don't need this, but it's thightly coupled
with mock.patch("pygeoapi.openapi"):
yield


@pytest.fixture()
def mock_loki_request():
response = requests.Response()
response.status_code = HTTPStatus.OK
response._content = json.dumps(LOKI_MOCK_RESPONSE).encode()

with mock.patch(
"pygeoapi_kubernetes_papermill.log_view.requests.get",
return_value=response,
) as patcher:
yield patcher


@pytest.fixture
def client():
from pygeoapi.flask_app import APP

APP.config["TESTING"] = True
import os

os.environ["PYGEOAPI_OPENAPI"] = "foo"
with APP.test_request_context():
with APP.test_client() as client:
yield client


def test_log_view_returns_log_lines(client, mock_loki_request):
job_id = "abc-123"

response = client.get(f"/processes/my-process/jobs/{job_id}/logs")

assert mock_loki_request.mock_calls[0][2]["params"]["query"] == "{job=test/abc-123}"

assert response.status_code == 200
assert len(response.text.splitlines()) == 4
assert response.text.startswith('{"event": "HTTP Request: GET https://exampl')


LOKI_MOCK_RESPONSE = {
"status": "success",
"data": {
"resultType": "streams",
"result": [
{
"stream": {
"pod": "myservice-6ff9459566-87q7c",
"stream": "stderr",
"app": "myservice",
"container": "myservice",
"filename": "/var/log/pods/myservice_myservice",
"job": "myservice/myservice",
"namespace": "myservice",
"node_name": "192.168.0.63",
},
"values": [
[
"1732608728234328263",
'{"headers": {"X-Request-ID": "1369ab3ceba2d5d4cbf963d',
],
[
"1732608728233384370",
'{"event": "HTTP Request: POST http://myservice/image/'
'\\"HTTP/1.1 200 OK\\""}',
],
[
"1732608725637512670",
'{"event": "HTTP Request: POST http://myservice/image/'
'\\"HTTP/1.1 200 OK\\""}',
],
[
"1732608724453929363",
'{"event": "HTTP Request: GET https://example.com/collec',
],
],
}
],
"stats": {
"summary": {
"bytesProcessedPerSecond": 53904,
"linesProcessedPerSecond": 234,
"totalBytesProcessed": 1378,
"totalLinesProcessed": 6,
"execTime": 0.025563608,
"queueTime": 0.000036888,
"subqueries": 1,
"totalEntriesReturned": 6,
},
"querier": {
"store": {
"totalChunksRef": 0,
"totalChunksDownloaded": 0,
"chunksDownloadTime": 0,
"chunk": {
"headChunkBytes": 0,
"headChunkLines": 0,
"decompressedBytes": 0,
"decompressedLines": 0,
"compressedBytes": 0,
"totalDuplicates": 0,
},
}
},
"ingester": {
"totalReached": 1,
"totalChunksMatched": 1,
"totalBatches": 1,
"totalLinesSent": 6,
"store": {
"totalChunksRef": 0,
"totalChunksDownloaded": 0,
"chunksDownloadTime": 0,
"chunk": {
"headChunkBytes": 0,
"headChunkLines": 0,
"decompressedBytes": 1378,
"decompressedLines": 6,
"compressedBytes": 598,
"totalDuplicates": 0,
},
},
},
},
},
}

0 comments on commit f37ef3a

Please sign in to comment.