From 5635232eec64b1c37904f443c6ef251fe4892515 Mon Sep 17 00:00:00 2001 From: FreddieMatherSmartDCSIT <124710637+FreddieMatherSmartDCSIT@users.noreply.github.com> Date: Wed, 29 May 2024 13:56:27 +0100 Subject: [PATCH] 48 update pv kafka metrics ingest to use new json schema (#51) * 48 - Added KafkaBenchMarkProbeJSON type * 48 - Updated `decode_and_yield_events_from_raw_msgs_no_length` to use new JSON schema * 48 - Updated test for new JSON schema for benchmarkprobe * 48 - Fixed linting errors * 48 - Updated no length mock for kafka ingest so that correct json is provided --- .../metrics_and_events/kafka_metrics.py | 15 ++++++--- .../protocol_verifier/tests/conftest.py | 31 ++++++++++--------- .../metrics_and_events/test_kafka_metrics.py | 16 +++++++--- test_harness/protocol_verifier/utils/types.py | 20 ++++++++++++ 4 files changed, 57 insertions(+), 25 deletions(-) diff --git a/test_harness/protocol_verifier/metrics_and_events/kafka_metrics.py b/test_harness/protocol_verifier/metrics_and_events/kafka_metrics.py index 973c8f1..6646c80 100644 --- a/test_harness/protocol_verifier/metrics_and_events/kafka_metrics.py +++ b/test_harness/protocol_verifier/metrics_and_events/kafka_metrics.py @@ -11,7 +11,10 @@ import aiokafka from test_harness.simulator.simulator import ResultsHandler -from test_harness.protocol_verifier.utils.types import ResultsDict +from test_harness.protocol_verifier.utils.types import ( + ResultsDict, + KafkaBenchMarkProbeJSON, +) from test_harness.metrics.metrics import MetricsRetriever @@ -206,12 +209,14 @@ def decode_and_yield_events_from_raw_msgs_no_length( for partition in raw_msgs.keys(): for msg in raw_msgs[partition]: data = bytearray(msg.value) - json_data = json.loads(data.decode("utf-8")) - if json_data["tag"] in KEY_EVENTS: + json_data: KafkaBenchMarkProbeJSON = json.loads( + data.decode("utf-8") + ) + if json_data["payload"]["tag"] in KEY_EVENTS: yield ResultsDict( - field=json_data["tag"], + field=json_data["payload"]["tag"], timestamp=json_data['timestamp'], - event_id=json_data["EventId"] + event_id=json_data["payload"]["eventId"] ) diff --git a/test_harness/protocol_verifier/tests/conftest.py b/test_harness/protocol_verifier/tests/conftest.py index b07014b..8998426 100644 --- a/test_harness/protocol_verifier/tests/conftest.py +++ b/test_harness/protocol_verifier/tests/conftest.py @@ -16,6 +16,10 @@ PVPerformanceResults, PVResultsDataFrame, ) +from test_harness.protocol_verifier.utils.types import ( + KafkaBenchMarkProbeJSON, + KafkaBenchMarkProbePayload, +) from requests import PreparedRequest # grok file path @@ -397,23 +401,20 @@ async def mock_get_many(*args, **kwargs): "svdc_event_processed", ] for i, field in enumerate(log_field): - - bytes_string = '"tag" : "' + field + '", ' - - event_id_string = f""""EventId" : "{event['eventId']}", """ - - timestamp_string = ( - '"timestamp" : "' - + time_stamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ") - + '"' + KafkaBenchMarkProbeJSON( + timestamp=time_stamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + payload=KafkaBenchMarkProbePayload( + tag=field, eventId=event["eventId"] + ), ) - byte_array = str( - "{ " - + bytes_string - + event_id_string - + timestamp_string - + " }" + byte_array = json.dumps( + KafkaBenchMarkProbeJSON( + timestamp=time_stamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + payload=KafkaBenchMarkProbePayload( + tag=field, eventId=event["eventId"] + ), + ) ).encode("utf-8") consumer_record = aiokafka.ConsumerRecord( diff --git a/test_harness/protocol_verifier/tests/metrics_and_events/test_kafka_metrics.py b/test_harness/protocol_verifier/tests/metrics_and_events/test_kafka_metrics.py index 3d0b484..90349a3 100644 --- a/test_harness/protocol_verifier/tests/metrics_and_events/test_kafka_metrics.py +++ b/test_harness/protocol_verifier/tests/metrics_and_events/test_kafka_metrics.py @@ -6,6 +6,10 @@ from test_harness.protocol_verifier.metrics_and_events.kafka_metrics import ( decode_and_yield_events_from_raw_msgs_no_length, ) +from test_harness.protocol_verifier.utils.types import ( + KafkaBenchMarkProbeJSON, + KafkaBenchMarkProbePayload, +) def test_decode_and_yield_events_from_raw_msgs_no_length() -> None: @@ -36,11 +40,13 @@ def test_decode_and_yield_events_from_raw_msgs_no_length() -> None: key=None, value=bytearray( json.dumps( - { - "tag": "reception_event_received", - "timestamp": "2022-01-01T00:00:00.000Z", - "EventId": "test_event_id", - } + KafkaBenchMarkProbeJSON( + timestamp="2022-01-01T00:00:00.000Z", + payload=KafkaBenchMarkProbePayload( + tag="reception_event_received", + eventId="test_event_id", + ) + ), ).encode("utf-8") ), timestamp=0, diff --git a/test_harness/protocol_verifier/utils/types.py b/test_harness/protocol_verifier/utils/types.py index c14f465..b768e02 100644 --- a/test_harness/protocol_verifier/utils/types.py +++ b/test_harness/protocol_verifier/utils/types.py @@ -275,3 +275,23 @@ class MetricsRetriverKwargsPairAndHandlerKwargsPair(NamedTuple): handler_kwargs_pair: ResultsHandlerKwargsPair """The handler for the results and kwargs """ + + +class KafkaBenchMarkProbePayload(TypedDict): + """Typed dict that type hints for an expected Kafka Benchmark Probe Payload + """ + eventId: str + """The event id of the probe + """ + tag: str + + +class KafkaBenchMarkProbeJSON(TypedDict): + """Typed dict that type hints for an expected Kafka Benchmark Probe JSON + """ + timestamp: str + """The timestamp of the probe + """ + payload: KafkaBenchMarkProbePayload + """The payload for the metric + """