Skip to content

Commit

Permalink
pulsar integration
Browse files Browse the repository at this point in the history
  • Loading branch information
daveads committed Sep 19, 2024
1 parent 1e179c1 commit d108959
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 3 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,20 @@ Python 3.7+

* `pip install permit-broadcaster`
* `pip install permit-broadcaster[redis]`
* `pip install permit-broadcaster[pulsar]`
* `pip install permit-broadcaster[postgres]`
* `pip install permit-broadcaster[kafka]`

## Available backends

* `Broadcast('memory://')`
* `Broadcast("redis://localhost:6379")`
* `Broadcast("pulsar://localhost:6650")`
* `Broadcast("postgres://localhost:5432/broadcaster")`
* `Broadcast("kafka://localhost:9092")`
* `Broadcast("kafka://broker_1:9092,broker_2:9092")`


## Kafka environment variables

The following environment variables are exposed to allow SASL authentication with Kafka (along with their default assignment):
Expand Down
57 changes: 57 additions & 0 deletions broadcaster/_backends/pulsar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import asyncio
import typing
from urllib.parse import urlparse
import pulsar
from .._base import Event
from .base import BroadcastBackend


class PulsarBackend(BroadcastBackend):
def __init__(self, url: str):
parsed_url = urlparse(url)
self._host = parsed_url.hostname or "localhost"
self._port = parsed_url.port or 6650
self._service_url = f"pulsar://{self._host}:{self._port}"
self._client = None
self._producer = None
self._consumer = None

async def connect(self) -> None:
self._client = pulsar.Client(self._service_url)
self._producer = self._client.create_producer("broadcast")
self._consumer = self._client.subscribe(
"broadcast",
subscription_name="broadcast_subscription",
consumer_type=pulsar.ConsumerType.Shared,
)

async def disconnect(self) -> None:
if self._producer:
self._producer.close()
if self._consumer:
self._consumer.close()
if self._client:
self._client.close()

async def subscribe(self, channel: str) -> None:
# In this implementation, we're using a single topic 'broadcast'
# So we don't need to do anything here
pass

async def unsubscribe(self, channel: str) -> None:
# Similarly, we don't need to do anything here
pass

async def publish(self, channel: str, message: typing.Any) -> None:
encoded_message = f"{channel}:{message}".encode("utf-8")
await self._run_in_executor(self._producer.send, encoded_message)

async def next_published(self) -> Event:
while True:
msg = await self._run_in_executor(self._consumer.receive)
channel, content = msg.data().decode("utf-8").split(":", 1)
await self._run_in_executor(self._consumer.acknowledge, msg)
return Event(channel=channel, message=content)

async def _run_in_executor(self, func, *args):
return await asyncio.get_event_loop().run_in_executor(None, func, *args)
5 changes: 5 additions & 0 deletions broadcaster/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ def __init__(self, url: str):

self._backend = MemoryBackend(url)

elif parsed_url.scheme == "pulsar":
from broadcaster._backends.pulsar import PulsarBackend

self._backend = PulsarBackend(url)

async def __aenter__(self) -> "Broadcast":
await self.connect()
return self
Expand Down
13 changes: 13 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,16 @@ services:
- POSTGRES_USER=postgres
ports:
- 5432:5432
pulsar:
image: apachepulsar/pulsar:3.3.1
command: bin/pulsar standalone
ports:
- 6650:6650
- 8080:8080
volumes:
- pulsardata:/pulsar/data
- pulsarconf:/pulsar/conf

volumes:
pulsardata:
pulsarconf:
1 change: 1 addition & 0 deletions example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ In order to run the app with different backends, you have to set the env
| kafka | `export BROADCAST_URL=kafka://localhost:9092` | `docker-compose up kafka` |
| redis | `export BROADCAST_URL=redis://localhost:6379` | `docker-compose up redis` |
| postgres | `export BROADCAST_URL=postgres://localhost:5432/broadcaster` | `docker-compose up postgres` |
| pulsar | `export BROADCAST_URL=pulsar://localhost:6650` | `docker-compose up pulsar` |
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-e .[redis,postgres,kafka]
-e .[redis,postgres,kafka,pulsar]

# Documentation
mkdocs==1.3.1
Expand Down
5 changes: 3 additions & 2 deletions scripts/start
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#!/bin/sh -e
# Accepted values: postgres, kafka, redis
# Accepted values: postgres, kafka, redis, pulsar
# If no variable provided all services will start
if [ -n "$1" ]; then
if [ "$1" != "kafka" ] && [ "$1" != "redis" ] && [ "$1" != "postgres" ]; then
if [ "$1" != "kafka" ] && [ "$1" != "redis" ] && [ "$1" != "postgres" ] && [ "$1" != "pulsar" ]; then
echo "Not a valid value. Choose one or none:
kafka
redis
pulsar
postgres ";
exit 1;
fi
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def get_packages(package):
"redis": ["asyncio-redis"],
"postgres": ["asyncpg"],
"kafka": ["aiokafka"],
"pulsar": ["pulsar-client"],
"test": ["pytest", "pytest-asyncio"],
},
classifiers=[
Expand Down
10 changes: 10 additions & 0 deletions tests/test_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ async def test_redis():
assert event.channel == "chatroom"
assert event.message == "hello"

## pulsar test
@pytest.mark.asyncio
async def test_pulsar():
async with Broadcast("pulsar://localhost:6650") as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"


@pytest.mark.asyncio
async def test_postgres():
Expand Down

0 comments on commit d108959

Please sign in to comment.