Skip to content

Commit

Permalink
based off the review
Browse files Browse the repository at this point in the history
  • Loading branch information
daveads committed Sep 22, 2024
1 parent d108959 commit 2adf5b6
Showing 1 changed file with 35 additions and 18 deletions.
53 changes: 35 additions & 18 deletions broadcaster/_backends/pulsar.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import logging
import typing
from urllib.parse import urlparse
import pulsar
from .._base import Event
from broadcaster._base import Event
from .base import BroadcastBackend


Expand All @@ -17,21 +18,30 @@ def __init__(self, url: str):
self._consumer = None

async def connect(self) -> None:
self._client = pulsar.Client(self._service_url)
self._producer = self._client.create_producer("broadcast")
self._consumer = self._client.subscribe(
"broadcast",
subscription_name="broadcast_subscription",
consumer_type=pulsar.ConsumerType.Shared,
)
try:
logging.info("Connecting to brokers")
self._client = await asyncio.to_thread(pulsar.Client, self._service_url)
self._producer = await asyncio.to_thread(
self._client.create_producer, "broadcast"
)
self._consumer = await asyncio.to_thread(
self._client.subscribe,
"broadcast",
subscription_name="broadcast_subscription",
consumer_type=pulsar.ConsumerType.Shared,
)
logging.info("Successfully connected to brokers")
except Exception as e:
logging.error(e)
raise e

async def disconnect(self) -> None:
if self._producer:
self._producer.close()
await asyncio.to_thread(self._producer.close)
if self._consumer:
self._consumer.close()
await asyncio.to_thread(self._consumer.close)
if self._client:
self._client.close()
await asyncio.to_thread(self._client.close)

async def subscribe(self, channel: str) -> None:
# In this implementation, we're using a single topic 'broadcast'
Expand All @@ -44,14 +54,21 @@ async def unsubscribe(self, channel: str) -> None:

async def publish(self, channel: str, message: typing.Any) -> None:
encoded_message = f"{channel}:{message}".encode("utf-8")
await self._run_in_executor(self._producer.send, encoded_message)
await asyncio.to_thread(self._producer.send, encoded_message)

async def next_published(self) -> Event:
while True:
msg = await self._run_in_executor(self._consumer.receive)
channel, content = msg.data().decode("utf-8").split(":", 1)
await self._run_in_executor(self._consumer.acknowledge, msg)
return Event(channel=channel, message=content)
try:
msg = await asyncio.to_thread(self._consumer.receive)
channel, content = msg.data().decode("utf-8").split(":", 1)
await asyncio.to_thread(self._consumer.acknowledge, msg)
return Event(channel=channel, message=content)

async def _run_in_executor(self, func, *args):
return await asyncio.get_event_loop().run_in_executor(None, func, *args)
except asyncio.CancelledError:
# cancellation
logging.info("next_published task is being cancelled")
raise

except Exception as e:
logging.error(f"Error in next_published: {e}")
raise

0 comments on commit 2adf5b6

Please sign in to comment.