Skip to content

Commit

Permalink
Merge pull request #7 from permitio/rk/pg-conn-pool
Browse files Browse the repository at this point in the history
Postgres Backend: Acquire connections from pool
  • Loading branch information
roekatz authored Aug 7, 2023
2 parents 1a96311 + 5d61725 commit c79b70d
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-suite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:

strategy:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11-dev"]
python-version: ["3.8", "3.9", "3.10", "3.11"]

services:
zookeeper:
Expand Down
2 changes: 1 addition & 1 deletion broadcaster/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ._base import Broadcast, Event

__version__ = "0.2.3"
__version__ = "0.2.4"
__all__ = ["Broadcast", "Event"]
20 changes: 18 additions & 2 deletions broadcaster/_backends/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,38 @@
from typing import Any

import asyncpg
import os

from .._base import Event
from .base import BroadcastBackend

try:
POOL_MAX_SIZE = int(os.getenv("BROADCASTER_PG_MAX_POOL_SIZE"))
except TypeError:
POOL_MAX_SIZE = 10

class PostgresBackend(BroadcastBackend):
_pools = {}
_pools_lock = asyncio.Lock()

def __init__(self, url: str):
self._url = url

async def _get_pool(self):
async with self.__class__._pools_lock:
if self._url not in self.__class__._pools:
self.__class__._pools[self._url] = await asyncpg.create_pool(self._url, max_size=POOL_MAX_SIZE)
return self.__class__._pools[self._url]

async def connect(self) -> None:
self._conn = await asyncpg.connect(self._url)
self._conn = await (await self._get_pool()).acquire()
self._listen_queue: asyncio.Queue = asyncio.Queue()
self._conn.add_termination_listener(self._termination_listener)

async def disconnect(self) -> None:
await self._conn.close()
self._conn.remove_termination_listener(self._termination_listener)
await (await self._get_pool()).release(self._conn)
self._conn = None

async def subscribe(self, channel: str) -> None:
await self._conn.add_listener(channel, self._listener)
Expand Down

0 comments on commit c79b70d

Please sign in to comment.