diff --git a/broadcaster/_backends/pulsar.py b/broadcaster/_backends/pulsar.py index beb0926..0865652 100644 --- a/broadcaster/_backends/pulsar.py +++ b/broadcaster/_backends/pulsar.py @@ -1,8 +1,9 @@ import asyncio +import logging import typing from urllib.parse import urlparse import pulsar -from .._base import Event +from broadcaster._base import Event from .base import BroadcastBackend @@ -17,21 +18,30 @@ def __init__(self, url: str): self._consumer = None async def connect(self) -> None: - self._client = pulsar.Client(self._service_url) - self._producer = self._client.create_producer("broadcast") - self._consumer = self._client.subscribe( - "broadcast", - subscription_name="broadcast_subscription", - consumer_type=pulsar.ConsumerType.Shared, - ) + try: + logging.info("Connecting to brokers") + self._client = await asyncio.to_thread(pulsar.Client, self._service_url) + self._producer = await asyncio.to_thread( + self._client.create_producer, "broadcast" + ) + self._consumer = await asyncio.to_thread( + self._client.subscribe, + "broadcast", + subscription_name="broadcast_subscription", + consumer_type=pulsar.ConsumerType.Shared, + ) + logging.info("Successfully connected to brokers") + except Exception as e: + logging.error(e) + raise e async def disconnect(self) -> None: if self._producer: - self._producer.close() + await asyncio.to_thread(self._producer.close) if self._consumer: - self._consumer.close() + await asyncio.to_thread(self._consumer.close) if self._client: - self._client.close() + await asyncio.to_thread(self._client.close) async def subscribe(self, channel: str) -> None: # In this implementation, we're using a single topic 'broadcast' @@ -44,14 +54,21 @@ async def unsubscribe(self, channel: str) -> None: async def publish(self, channel: str, message: typing.Any) -> None: encoded_message = f"{channel}:{message}".encode("utf-8") - await self._run_in_executor(self._producer.send, encoded_message) + await asyncio.to_thread(self._producer.send, encoded_message) async def next_published(self) -> Event: while True: - msg = await self._run_in_executor(self._consumer.receive) - channel, content = msg.data().decode("utf-8").split(":", 1) - await self._run_in_executor(self._consumer.acknowledge, msg) - return Event(channel=channel, message=content) + try: + msg = await asyncio.to_thread(self._consumer.receive) + channel, content = msg.data().decode("utf-8").split(":", 1) + await asyncio.to_thread(self._consumer.acknowledge, msg) + return Event(channel=channel, message=content) - async def _run_in_executor(self, func, *args): - return await asyncio.get_event_loop().run_in_executor(None, func, *args) + except asyncio.CancelledError: + # cancellation + logging.info("next_published task is being cancelled") + raise + + except Exception as e: + logging.error(f"Error in next_published: {e}") + raise