Skip to content

Commit

Permalink
WIP: Zenoh PUB/SUB created, but problem with listener. Verified publi…
Browse files Browse the repository at this point in the history
…sher works
  • Loading branch information
fabawi committed Oct 31, 2024
1 parent 4435cb8 commit ec96173
Show file tree
Hide file tree
Showing 3 changed files with 713 additions and 0 deletions.
280 changes: 280 additions & 0 deletions wrapyfi/listeners/zenoh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
import logging
import json
import queue
import time
import os
from typing import Optional

import numpy as np
import cv2
import zenoh

from wrapyfi.connect.listeners import Listener, Listeners, ListenerWatchDog
from wrapyfi.middlewares.zenoh import ZenohMiddlewarePubSub
from wrapyfi.encoders import JsonDecodeHook


# Capture environment variables for Zenoh configuration
ZENOH_IP = os.getenv("WRAPYFI_ZENOH_IP", "127.0.0.1")
ZENOH_PORT = int(os.getenv("WRAPYFI_ZENOH_PORT", 7447))
ZENOH_MODE = os.getenv("WRAPYFI_ZENOH_MODE", "peer")
ZENOH_CONNECT = json.loads(os.getenv("WRAPYFI_ZENOH_CONNECT", "[]"))
ZENOH_LISTEN = json.loads(os.getenv("WRAPYFI_ZENOH_LISTEN", "[]"))
ZENOH_CONFIG_FILEPATH = os.getenv("WRAPYFI_ZENOH_CONFIG_FILEPATH", None)
ZENOH_MONITOR_LISTENER_SPAWN = os.getenv("WRAPYFI_ZENOH_MONITOR_LISTENER_SPAWN", "thread")

WATCHDOG_POLL_INTERVAL = float(os.getenv("WRAPYFI_ZENOH_RETRY_INTERVAL", 0.2))
WATCHDOG_POLL_REPEATS = int(os.getenv("WRAPYFI_ZENOH_MAX_REPEATS", -1))

# Ensure the monitor listener spawn type is compatible
if ZENOH_MONITOR_LISTENER_SPAWN == "process":
ZENOH_MONITOR_LISTENER_SPAWN = "thread"
logging.warning(
"[Zenoh] Wrapyfi does not support multiprocessing for Zenoh. "
"Switching automatically to 'thread' mode."
)

class ZenohListener(Listener):
"""
Base Zenoh listener class that configures and initializes Zenoh middleware.
Merges listener-specific settings and environment configurations, and awaits connection.
"""

def __init__(self, name: str, in_topic: str, should_wait: bool = True,
ip: str = ZENOH_IP, port: int = ZENOH_PORT, mode: str = ZENOH_MODE, zenoh_kwargs: Optional[dict] = None, **kwargs):
"""
Initializes the Zenoh listener with environment or parameter-based configurations
and waits for connection if specified.
:param name: str: Name of the listener
:param in_topic: str: Topic name
:param should_wait: bool: Whether to block until a message is received
:param ip: str: IP address for the Zenoh connection. Default is '127.0.0.1'
:param port: int: Port for the Zenoh connection. Default is 7447
:param mode: str: Mode for Zenoh session (`peer` or `client`)
:param zenoh_kwargs: dict: Additional Zenoh configuration options, overridden by env variables
:param kwargs: dict: Additional options for the listener
"""

# Zenoh does not accept trailing or leading slashes in topic names
in_topic = in_topic.strip("/")
super().__init__(name, in_topic, should_wait=should_wait, **kwargs)

# Prepare Zenoh configuration from environment variables and kwargs
zenoh_config = {
"mode": mode,
"connect/endpoints": ZENOH_CONNECT if isinstance(ZENOH_CONNECT, list) else ZENOH_CONNECT.split(",") if isinstance(ZENOH_CONNECT, str) else [f"tcp/{ip}:{port}"],
"listen/endpoints": ZENOH_LISTEN if isinstance(ZENOH_LISTEN, list) else ZENOH_LISTEN.split(",") if isinstance(ZENOH_LISTEN, str) else [f"tcp/{ip}:{port}"],
**(zenoh_kwargs or {})
}

# Activate Zenoh middleware with the prepared configuration
ZenohMiddlewarePubSub.activate(config=self._prepare_config(zenoh_config), **kwargs)

# Set up connection establishment
self.established = False

def _prepare_config(self, zenoh_kwargs):
"""
Converts keyword arguments to a zenoh.Config object and merges with environment-based settings.
:param zenoh_kwargs: dict: Configuration parameters
:return: zenoh.Config: Configured Zenoh session
"""
config = zenoh.Config().from_file(ZENOH_CONFIG_FILEPATH) if ZENOH_CONFIG_FILEPATH else zenoh.Config()
for key, value in zenoh_kwargs.items():
config.insert_json5(key, json.dumps(value))
return config

def await_connection(self, in_topic: Optional[str] = None, repeats: int = WATCHDOG_POLL_REPEATS):
"""
Waits for the Zenoh connection to be established.
:param in_topic: str: Topic name for connection
:param repeats: int: Number of retry attempts
:return: bool: True if connection is established, False otherwise
"""
in_topic = in_topic or self.in_topic
while repeats != 0:
repeats -= 1 if repeats > 0 else 0
if ZenohMiddlewarePubSub._instance.is_connected():
logging.info(f"[Zenoh] Connected to topic {in_topic}")
return True
logging.debug(f"Waiting for connection on topic {in_topic}")
time.sleep(WATCHDOG_POLL_INTERVAL)
return False

def establish(self, repeats: Optional[int] = None, **kwargs):
"""
Establish the connection to the publisher.
:param repeats: int: Number of repeats to await connection. None for infinite. Default is None
:return: bool: True if connection established, False otherwise
"""
established = self.await_connection(repeats=repeats)
established = self.check_establishment(established)
if established:
ZenohMiddlewarePubSub._instance.register_callback(self.in_topic, self.on_message)
return established

def close(self):
"""
Closes the Zenoh listener.
This can be overridden by child classes to add cleanup operations.
"""
pass


@Listeners.register("NativeObject", "zenoh")
class ZenohNativeObjectListener(ZenohListener):
"""
Zenoh NativeObject listener for handling JSON-encoded native objects.
Decodes incoming messages to native Python objects using JsonDecodeHook.
:param name: str: Name of the listener
:param in_topic: str: Name of the input topic
:param should_wait: bool: Whether to wait for messages
:param deserializer_kwargs: dict: Keyword arguments for the JSON deserializer
"""

def __init__(self, name: str, in_topic: str, should_wait: bool = True, deserializer_kwargs: Optional[dict] = None, **kwargs):
super().__init__(name, in_topic, should_wait=should_wait, **kwargs)
self._plugin_decoder_hook = JsonDecodeHook(**kwargs).object_hook
self._message_queue = queue.Queue()
self._deserializer_kwargs = deserializer_kwargs or {}

def on_message(self, sample):
"""
Handles incoming messages by decoding JSON into native objects using JsonDecodeHook.
:param sample: zenoh.Sample: The Zenoh sample received
"""
try:
obj = json.loads(sample.payload.to_bytes(), object_hook=self._plugin_decoder_hook, **self._deserializer_kwargs)
self._message_queue.put(obj)
logging.debug(f"Queued message for topic {self.in_topic}: {obj}")
except json.JSONDecodeError as e:
logging.error(f"Failed to decode JSON from topic {self.in_topic}: {e}")

def listen(self):
"""
Listen for a message, ensuring the connection is established.
:return: Any: The received message as a native Python object
"""
if not self.established:
established = self.establish(repeats=WATCHDOG_POLL_REPEATS)
if not established:
return None

try:
return self._message_queue.get(block=self.should_wait)
except queue.Empty:
return None


@Listeners.register("Image", "zenoh")
class ZenohImageListener(ZenohNativeObjectListener):
"""
Zenoh Image listener for handling image messages.
Converts incoming data to OpenCV images, supporting JPEG and raw formats.
:param name: str: Name of the listener
:param in_topic: str: Name of the input topic
:param should_wait: bool: Whether to wait for messages
:param width: int: Expected image width, -1 to use received width
:param height: int: Expected image height, -1 to use received height
:param rgb: bool: True if the image is RGB, False if grayscale
:param jpg: bool: True if the image is JPEG-compressed
"""

def __init__(self, name: str, in_topic: str, should_wait: bool = True, width: int = -1, height: int = -1, rgb: bool = True, jpg: bool = False, **kwargs):
super().__init__(name, in_topic, should_wait=should_wait, **kwargs)
self.width = width
self.height = height
self.rgb = rgb
self.jpg = jpg
self._message_queue = queue.Queue()

def on_message(self, sample):
"""
Handles incoming image messages, converting data to OpenCV format.
:param sample: zenoh.Sample: Zenoh sample payload
"""
try:
np_data = np.frombuffer(sample.payload.to_bytes(), dtype=np.uint8)
if self.jpg:
img = cv2.imdecode(np_data, cv2.IMREAD_COLOR if self.rgb else cv2.IMREAD_GRAYSCALE)
else:
img = np_data.reshape(self.height, self.width, 3 if self.rgb else 1)
self._message_queue.put(img)
except Exception as e:
logging.error(f"Failed to process image message: {e}")

def listen(self):
"""
Listen for a message, ensuring the connection is established.
:return: np.ndarray: The received image as an OpenCV-formatted array
"""
if not self.established:
established = self.establish(repeats=WATCHDOG_POLL_REPEATS)
if not established:
return None

try:
return self._message_queue.get(block=self.should_wait)
except queue.Empty:
return None


@Listeners.register("AudioChunk", "zenoh")
class ZenohAudioChunkListener(ZenohNativeObjectListener):
"""
Zenoh AudioChunk listener for handling audio messages.
Converts incoming data to numpy arrays for audio processing.
:param name: str: Name of the listener
:param in_topic: str: Name of the input topic
:param should_wait: bool: Whether to wait for messages
:param channels: int: Number of audio channels
:param rate: int: Sampling rate of the audio
:param chunk: int: Number of samples in the audio chunk
"""

def __init__(self, name: str, in_topic: str, should_wait: bool = True, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs):
super().__init__(name, in_topic, should_wait=should_wait, **kwargs)
self.channels = channels
self.rate = rate
self.chunk = chunk
self._message_queue = queue.Queue()

def on_message(self, sample):
"""
Processes incoming audio messages into structured numpy arrays.
:param sample: zenoh.Sample: Zenoh sample payload
"""
try:
aud_array = np.frombuffer(sample.payload.to_bytes(), dtype=np.float32).reshape(-1, self.channels)
self._message_queue.put((aud_array, self.rate))
except Exception as e:
logging.error(f"Failed to process audio message: {e}")

def listen(self):
"""
Listen for a message, ensuring the connection is established.
:return: Tuple[np.ndarray, int]: The received audio chunk and sampling rate
"""
if not self.established:
established = self.establish(repeats=WATCHDOG_POLL_REPEATS)
if not established:
return None, self.rate

try:
return self._message_queue.get(block=self.should_wait)
except queue.Empty:
return None, self.rate

92 changes: 92 additions & 0 deletions wrapyfi/middlewares/zenoh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import logging
import threading
import atexit
import json

import zenoh

from wrapyfi.utils import SingletonOptimized
from wrapyfi.connect.wrapper import MiddlewareCommunicator


class ZenohMiddlewarePubSub(metaclass=SingletonOptimized):
"""
Zenoh middleware wrapper with singleton pattern.
The `activate` method initializes the middleware, and `deinit` handles cleanup.
Configurations are initialized by merging any provided keyword arguments,
environment variables, and direct `zenoh.Config` parameters.
"""

_instance = None # Singleton instance

@staticmethod
def activate(config: zenoh.Config = None, **kwargs):
"""
Activates the Zenoh middleware. Initializes the Zenoh session by merging a provided configuration,
environment variables, and additional keyword arguments.
:param config: zenoh.Config: Optional Zenoh configuration; merged with environment and `kwargs`
:param kwargs: dict: Additional settings for customization
:return: ZenohMiddlewarePubSub instance
"""
zenoh.init_log_from_env_or("error")
if ZenohMiddlewarePubSub._instance is None:
ZenohMiddlewarePubSub._instance = ZenohMiddlewarePubSub(config=config, **kwargs)
return ZenohMiddlewarePubSub._instance

def __init__(self, config: zenoh.Config = None, **kwargs):
"""
Initializes the Zenoh session and sets up a clean exit with deinitialization.
:param config: zenoh.Config: Configuration for Zenoh session
"""
logging.info("Initializing Zenoh middleware")

# Initialize Zenoh session with configuration or default values
self.config = config or self._merge_config_with_env(kwargs)
self.session = zenoh.open(self.config)
self.subscribers = {}

# Ensure cleanup at exit
atexit.register(MiddlewareCommunicator.close_all_instances)
atexit.register(self.deinit)

def _merge_config_with_env(self, config_kwargs):
"""
Merges given configuration parameters with environment-based defaults.
:param config_kwargs: dict: Direct configuration values to merge
:return: zenoh.Config: Complete Zenoh configuration instance
"""
config = zenoh.Config()
for key, value in config_kwargs.items():
config.insert_json5(key, json.dumps(value))
return config

def register_callback(self, topic: str, callback):
"""
Registers an event handler for a specific topic.
:param topic: str: The topic to subscribe to
:param callback: callable: Function to call upon receiving a message
"""
if topic not in self.subscribers:
self.subscribers[topic] = self.session.declare_subscriber(topic, callback)
logging.info(f"[ZenohMiddlewarePubSub] Registered callback for topic {topic}")

def is_connected(self) -> bool:
"""
Checks if the Zenoh session is active.
:return: bool: True if connected, False otherwise
"""
return self.session is not None and not self.session.is_closed()

def deinit(self):
"""
Closes the Zenoh session upon exit.
"""
logging.info("[ZenohMiddlewarePubSub] Closing Zenoh session")
self.session.close()

Loading

0 comments on commit ec96173

Please sign in to comment.