Skip to content

Commit

Permalink
48 update pv kafka metrics ingest to use new json schema (#51)
Browse files Browse the repository at this point in the history
* 48 - Added KafkaBenchMarkProbeJSON type

* 48 - Updated `decode_and_yield_events_from_raw_msgs_no_length` to use new JSON schema

* 48 - Updated test for new JSON schema for benchmarkprobe

* 48 - Fixed linting errors

* 48 - Updated no length mock for kafka ingest so that correct json is provided
  • Loading branch information
FreddieMatherSmartDCSIT authored May 29, 2024
1 parent 3f41fdf commit 5635232
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 25 deletions.
15 changes: 10 additions & 5 deletions test_harness/protocol_verifier/metrics_and_events/kafka_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import aiokafka

from test_harness.simulator.simulator import ResultsHandler
from test_harness.protocol_verifier.utils.types import ResultsDict
from test_harness.protocol_verifier.utils.types import (
ResultsDict,
KafkaBenchMarkProbeJSON,
)
from test_harness.metrics.metrics import MetricsRetriever


Expand Down Expand Up @@ -206,12 +209,14 @@ def decode_and_yield_events_from_raw_msgs_no_length(
for partition in raw_msgs.keys():
for msg in raw_msgs[partition]:
data = bytearray(msg.value)
json_data = json.loads(data.decode("utf-8"))
if json_data["tag"] in KEY_EVENTS:
json_data: KafkaBenchMarkProbeJSON = json.loads(
data.decode("utf-8")
)
if json_data["payload"]["tag"] in KEY_EVENTS:
yield ResultsDict(
field=json_data["tag"],
field=json_data["payload"]["tag"],
timestamp=json_data['timestamp'],
event_id=json_data["EventId"]
event_id=json_data["payload"]["eventId"]
)


Expand Down
31 changes: 16 additions & 15 deletions test_harness/protocol_verifier/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
PVPerformanceResults,
PVResultsDataFrame,
)
from test_harness.protocol_verifier.utils.types import (
KafkaBenchMarkProbeJSON,
KafkaBenchMarkProbePayload,
)
from requests import PreparedRequest

# grok file path
Expand Down Expand Up @@ -397,23 +401,20 @@ async def mock_get_many(*args, **kwargs):
"svdc_event_processed",
]
for i, field in enumerate(log_field):

bytes_string = '"tag" : "' + field + '", '

event_id_string = f""""EventId" : "{event['eventId']}", """

timestamp_string = (
'"timestamp" : "'
+ time_stamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+ '"'
KafkaBenchMarkProbeJSON(
timestamp=time_stamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
payload=KafkaBenchMarkProbePayload(
tag=field, eventId=event["eventId"]
),
)

byte_array = str(
"{ "
+ bytes_string
+ event_id_string
+ timestamp_string
+ " }"
byte_array = json.dumps(
KafkaBenchMarkProbeJSON(
timestamp=time_stamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
payload=KafkaBenchMarkProbePayload(
tag=field, eventId=event["eventId"]
),
)
).encode("utf-8")

consumer_record = aiokafka.ConsumerRecord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
from test_harness.protocol_verifier.metrics_and_events.kafka_metrics import (
decode_and_yield_events_from_raw_msgs_no_length,
)
from test_harness.protocol_verifier.utils.types import (
KafkaBenchMarkProbeJSON,
KafkaBenchMarkProbePayload,
)


def test_decode_and_yield_events_from_raw_msgs_no_length() -> None:
Expand Down Expand Up @@ -36,11 +40,13 @@ def test_decode_and_yield_events_from_raw_msgs_no_length() -> None:
key=None,
value=bytearray(
json.dumps(
{
"tag": "reception_event_received",
"timestamp": "2022-01-01T00:00:00.000Z",
"EventId": "test_event_id",
}
KafkaBenchMarkProbeJSON(
timestamp="2022-01-01T00:00:00.000Z",
payload=KafkaBenchMarkProbePayload(
tag="reception_event_received",
eventId="test_event_id",
)
),
).encode("utf-8")
),
timestamp=0,
Expand Down
20 changes: 20 additions & 0 deletions test_harness/protocol_verifier/utils/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,23 @@ class MetricsRetriverKwargsPairAndHandlerKwargsPair(NamedTuple):
handler_kwargs_pair: ResultsHandlerKwargsPair
"""The handler for the results and kwargs
"""


class KafkaBenchMarkProbePayload(TypedDict):
"""Typed dict that type hints for an expected Kafka Benchmark Probe Payload
"""
eventId: str
"""The event id of the probe
"""
tag: str


class KafkaBenchMarkProbeJSON(TypedDict):
"""Typed dict that type hints for an expected Kafka Benchmark Probe JSON
"""
timestamp: str
"""The timestamp of the probe
"""
payload: KafkaBenchMarkProbePayload
"""The payload for the metric
"""

0 comments on commit 5635232

Please sign in to comment.