Skip to content

Commit

Permalink
pulsar integration (#20)
Browse files Browse the repository at this point in the history
* pulsar integration

* based off the review

* fix failling ci

* setup workflow test for pulsar

* done
  • Loading branch information
daveads authored Sep 24, 2024
1 parent 1e179c1 commit 17d5f9d
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 14 deletions.
29 changes: 25 additions & 4 deletions .github/workflows/test-suite.yml
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
---
name: Test Suite

on:
push:
branches: ["master"]
pull_request:
branches: ["master"]
workflow_dispatch:

jobs:
tests:
name: "Python ${{ matrix.python-version }}"
runs-on: "ubuntu-latest"

strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11"]

services:
zookeeper:
image: confluentinc/cp-zookeeper
Expand Down Expand Up @@ -58,6 +54,26 @@ jobs:
- uses: "actions/setup-python@v2"
with:
python-version: "${{ matrix.python-version }}"

# Install Docker Compose if it's not available by default
- name: Set up Docker Compose
run: sudo apt-get install docker-compose -y

# Start Pulsar using Docker Compose
- name: Start Pulsar
run: docker-compose up -d pulsar


- name: Wait for Pulsar
run: |
timeout 300 bash -c '
until curl -s http://localhost:8080/admin/v2/brokers/healthcheck; do
echo "Waiting for Pulsar to be ready..."
sleep 5
done
echo "pulsar is ready"
'
- name: "Install dependencies"
run: "scripts/install"
# - name: "Run linting checks"
Expand All @@ -66,5 +82,10 @@ jobs:
run: "scripts/build"
- name: "Run tests"
run: "scripts/test"

- name: "Enforce coverage"
run: "scripts/coverage"

# Stop the Pulsar container after all tests are complete
- name: Stop Pulsar
run: docker-compose down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,20 @@ Python 3.7+

* `pip install permit-broadcaster`
* `pip install permit-broadcaster[redis]`
* `pip install permit-broadcaster[pulsar]`
* `pip install permit-broadcaster[postgres]`
* `pip install permit-broadcaster[kafka]`

## Available backends

* `Broadcast('memory://')`
* `Broadcast("redis://localhost:6379")`
* `Broadcast("pulsar://localhost:6650")`
* `Broadcast("postgres://localhost:5432/broadcaster")`
* `Broadcast("kafka://localhost:9092")`
* `Broadcast("kafka://broker_1:9092,broker_2:9092")`


## Kafka environment variables

The following environment variables are exposed to allow SASL authentication with Kafka (along with their default assignment):
Expand Down
77 changes: 77 additions & 0 deletions broadcaster/_backends/pulsar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import anyio
import logging
import typing
from urllib.parse import urlparse
import pulsar
from broadcaster._base import Event
from .base import BroadcastBackend


class PulsarBackend(BroadcastBackend):
def __init__(self, url: str):
parsed_url = urlparse(url)
self._host = parsed_url.hostname or "localhost"
self._port = parsed_url.port or 6650
self._service_url = f"pulsar://{self._host}:{self._port}"
self._client = None
self._producer = None
self._consumer = None

async def connect(self) -> None:
try:
logging.info("Connecting to brokers")
self._client = await anyio.to_thread.run_sync(
lambda: pulsar.Client(self._service_url)
)
self._producer = await anyio.to_thread.run_sync(
lambda: self._client.create_producer("broadcast")
)
self._consumer = await anyio.to_thread.run_sync(
lambda: self._client.subscribe(
"broadcast",
subscription_name="broadcast_subscription",
consumer_type=pulsar.ConsumerType.Shared,
)
)
logging.info("Successfully connected to brokers")
except Exception as e:
logging.error(e)
raise e

async def disconnect(self) -> None:
if self._producer:
await anyio.to_thread.run_sync(self._producer.close)
if self._consumer:
await anyio.to_thread.run_sync(self._consumer.close)
if self._client:
await anyio.to_thread.run_sync(self._client.close)

async def subscribe(self, channel: str) -> None:
# In this implementation, we're using a single topic 'broadcast'
# So we don't need to do anything here
pass

async def unsubscribe(self, channel: str) -> None:
# Similarly, we don't need to do anything here
pass

async def publish(self, channel: str, message: typing.Any) -> None:
encoded_message = f"{channel}:{message}".encode("utf-8")
await anyio.to_thread.run_sync(lambda: self._producer.send(encoded_message))

async def next_published(self) -> Event:
while True:
try:
msg = await anyio.to_thread.run_sync(self._consumer.receive)
channel, content = msg.data().decode("utf-8").split(":", 1)
await anyio.to_thread.run_sync(lambda: self._consumer.acknowledge(msg))
return Event(channel=channel, message=content)

except anyio.get_cancelled_exc_class():
# cancellation
logging.info("next_published task is being cancelled")
raise

except Exception as e:
logging.error(f"Error in next_published: {e}")
raise
5 changes: 5 additions & 0 deletions broadcaster/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ def __init__(self, url: str):

self._backend = MemoryBackend(url)

elif parsed_url.scheme == "pulsar":
from broadcaster._backends.pulsar import PulsarBackend

self._backend = PulsarBackend(url)

async def __aenter__(self) -> "Broadcast":
await self.connect()
return self
Expand Down
13 changes: 13 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,16 @@ services:
- POSTGRES_USER=postgres
ports:
- 5432:5432
pulsar:
image: apachepulsar/pulsar:3.3.1
command: bin/pulsar standalone
ports:
- 6650:6650
- 8080:8080
volumes:
- pulsardata:/pulsar/data
- pulsarconf:/pulsar/conf

volumes:
pulsardata:
pulsarconf:
1 change: 1 addition & 0 deletions example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ In order to run the app with different backends, you have to set the env
| kafka | `export BROADCAST_URL=kafka://localhost:9092` | `docker-compose up kafka` |
| redis | `export BROADCAST_URL=redis://localhost:6379` | `docker-compose up redis` |
| postgres | `export BROADCAST_URL=postgres://localhost:5432/broadcaster` | `docker-compose up postgres` |
| pulsar | `export BROADCAST_URL=pulsar://localhost:6650` | `docker-compose up pulsar` |
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-e .[redis,postgres,kafka]
-e .[redis,postgres,kafka,pulsar]

# Documentation
mkdocs==1.3.1
Expand Down
3 changes: 1 addition & 2 deletions scripts/coverage
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ export PREFIX=""
if [ -d 'venv' ] ; then
export PREFIX="venv/bin/"
fi
export SOURCE_FILES="broadcaster tests"

set -x

${PREFIX}coverage report --show-missing --skip-covered --fail-under=88
${PREFIX}coverage report
6 changes: 1 addition & 5 deletions scripts/install
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
#!/bin/sh -e

# Use the Python executable provided from the `-p` option, or a default.
[ "$1" = "-p" ] && PYTHON=$2 || PYTHON="python3"

REQUIREMENTS="requirements.txt"
VENV="venv"

set -x

if [ -z "$GITHUB_ACTIONS" ]; then
"$PYTHON" -m venv "$VENV"
PIP="$VENV/bin/pip"
else
PIP="pip"
fi

"$PIP" install -r "$REQUIREMENTS"
"$PIP" install --upgrade setuptools twine
"$PIP" install -e .
5 changes: 3 additions & 2 deletions scripts/start
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#!/bin/sh -e
# Accepted values: postgres, kafka, redis
# Accepted values: postgres, kafka, redis, pulsar
# If no variable provided all services will start
if [ -n "$1" ]; then
if [ "$1" != "kafka" ] && [ "$1" != "redis" ] && [ "$1" != "postgres" ]; then
if [ "$1" != "kafka" ] && [ "$1" != "redis" ] && [ "$1" != "postgres" ] && [ "$1" != "pulsar" ]; then
echo "Not a valid value. Choose one or none:
kafka
redis
pulsar
postgres ";
exit 1;
fi
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def get_packages(package):
"redis": ["asyncio-redis"],
"postgres": ["asyncpg"],
"kafka": ["aiokafka"],
"pulsar": ["pulsar-client", "anyio"],
"test": ["pytest", "pytest-asyncio"],
},
classifiers=[
Expand Down
10 changes: 10 additions & 0 deletions tests/test_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ async def test_redis():
assert event.channel == "chatroom"
assert event.message == "hello"

## pulsar test
@pytest.mark.asyncio
async def test_pulsar():
async with Broadcast("pulsar://localhost:6650") as broadcast:
async with broadcast.subscribe("chatroom") as subscriber:
await broadcast.publish("chatroom", "hello")
event = await subscriber.get()
assert event.channel == "chatroom"
assert event.message == "hello"


@pytest.mark.asyncio
async def test_postgres():
Expand Down

0 comments on commit 17d5f9d

Please sign in to comment.