From ec9617335a2618531624033c515b0634aa4ff8e3 Mon Sep 17 00:00:00 2001 From: abawi Date: Thu, 31 Oct 2024 20:04:15 +0100 Subject: [PATCH] WIP: Zenoh PUB/SUB created, but problem with listener. Verified publisher works --- wrapyfi/listeners/zenoh.py | 280 ++++++++++++++++++++++++++++ wrapyfi/middlewares/zenoh.py | 92 ++++++++++ wrapyfi/publishers/zenoh.py | 341 +++++++++++++++++++++++++++++++++++ 3 files changed, 713 insertions(+) create mode 100644 wrapyfi/listeners/zenoh.py create mode 100644 wrapyfi/middlewares/zenoh.py create mode 100644 wrapyfi/publishers/zenoh.py diff --git a/wrapyfi/listeners/zenoh.py b/wrapyfi/listeners/zenoh.py new file mode 100644 index 0000000..38e3811 --- /dev/null +++ b/wrapyfi/listeners/zenoh.py @@ -0,0 +1,280 @@ +import logging +import json +import queue +import time +import os +from typing import Optional + +import numpy as np +import cv2 +import zenoh + +from wrapyfi.connect.listeners import Listener, Listeners, ListenerWatchDog +from wrapyfi.middlewares.zenoh import ZenohMiddlewarePubSub +from wrapyfi.encoders import JsonDecodeHook + + +# Capture environment variables for Zenoh configuration +ZENOH_IP = os.getenv("WRAPYFI_ZENOH_IP", "127.0.0.1") +ZENOH_PORT = int(os.getenv("WRAPYFI_ZENOH_PORT", 7447)) +ZENOH_MODE = os.getenv("WRAPYFI_ZENOH_MODE", "peer") +ZENOH_CONNECT = json.loads(os.getenv("WRAPYFI_ZENOH_CONNECT", "[]")) +ZENOH_LISTEN = json.loads(os.getenv("WRAPYFI_ZENOH_LISTEN", "[]")) +ZENOH_CONFIG_FILEPATH = os.getenv("WRAPYFI_ZENOH_CONFIG_FILEPATH", None) +ZENOH_MONITOR_LISTENER_SPAWN = os.getenv("WRAPYFI_ZENOH_MONITOR_LISTENER_SPAWN", "thread") + +WATCHDOG_POLL_INTERVAL = float(os.getenv("WRAPYFI_ZENOH_RETRY_INTERVAL", 0.2)) +WATCHDOG_POLL_REPEATS = int(os.getenv("WRAPYFI_ZENOH_MAX_REPEATS", -1)) + +# Ensure the monitor listener spawn type is compatible +if ZENOH_MONITOR_LISTENER_SPAWN == "process": + ZENOH_MONITOR_LISTENER_SPAWN = "thread" + logging.warning( + "[Zenoh] Wrapyfi does not support multiprocessing for Zenoh. " + "Switching automatically to 'thread' mode." + ) + +class ZenohListener(Listener): + """ + Base Zenoh listener class that configures and initializes Zenoh middleware. + Merges listener-specific settings and environment configurations, and awaits connection. + """ + + def __init__(self, name: str, in_topic: str, should_wait: bool = True, + ip: str = ZENOH_IP, port: int = ZENOH_PORT, mode: str = ZENOH_MODE, zenoh_kwargs: Optional[dict] = None, **kwargs): + """ + Initializes the Zenoh listener with environment or parameter-based configurations + and waits for connection if specified. + + :param name: str: Name of the listener + :param in_topic: str: Topic name + :param should_wait: bool: Whether to block until a message is received + :param ip: str: IP address for the Zenoh connection. Default is '127.0.0.1' + :param port: int: Port for the Zenoh connection. Default is 7447 + :param mode: str: Mode for Zenoh session (`peer` or `client`) + :param zenoh_kwargs: dict: Additional Zenoh configuration options, overridden by env variables + :param kwargs: dict: Additional options for the listener + """ + + # Zenoh does not accept trailing or leading slashes in topic names + in_topic = in_topic.strip("/") + super().__init__(name, in_topic, should_wait=should_wait, **kwargs) + + # Prepare Zenoh configuration from environment variables and kwargs + zenoh_config = { + "mode": mode, + "connect/endpoints": ZENOH_CONNECT if isinstance(ZENOH_CONNECT, list) else ZENOH_CONNECT.split(",") if isinstance(ZENOH_CONNECT, str) else [f"tcp/{ip}:{port}"], + "listen/endpoints": ZENOH_LISTEN if isinstance(ZENOH_LISTEN, list) else ZENOH_LISTEN.split(",") if isinstance(ZENOH_LISTEN, str) else [f"tcp/{ip}:{port}"], + **(zenoh_kwargs or {}) + } + + # Activate Zenoh middleware with the prepared configuration + ZenohMiddlewarePubSub.activate(config=self._prepare_config(zenoh_config), **kwargs) + + # Set up connection establishment + self.established = False + + def _prepare_config(self, zenoh_kwargs): + """ + Converts keyword arguments to a zenoh.Config object and merges with environment-based settings. + + :param zenoh_kwargs: dict: Configuration parameters + :return: zenoh.Config: Configured Zenoh session + """ + config = zenoh.Config().from_file(ZENOH_CONFIG_FILEPATH) if ZENOH_CONFIG_FILEPATH else zenoh.Config() + for key, value in zenoh_kwargs.items(): + config.insert_json5(key, json.dumps(value)) + return config + + def await_connection(self, in_topic: Optional[str] = None, repeats: int = WATCHDOG_POLL_REPEATS): + """ + Waits for the Zenoh connection to be established. + + :param in_topic: str: Topic name for connection + :param repeats: int: Number of retry attempts + :return: bool: True if connection is established, False otherwise + """ + in_topic = in_topic or self.in_topic + while repeats != 0: + repeats -= 1 if repeats > 0 else 0 + if ZenohMiddlewarePubSub._instance.is_connected(): + logging.info(f"[Zenoh] Connected to topic {in_topic}") + return True + logging.debug(f"Waiting for connection on topic {in_topic}") + time.sleep(WATCHDOG_POLL_INTERVAL) + return False + + def establish(self, repeats: Optional[int] = None, **kwargs): + """ + Establish the connection to the publisher. + + :param repeats: int: Number of repeats to await connection. None for infinite. Default is None + :return: bool: True if connection established, False otherwise + """ + established = self.await_connection(repeats=repeats) + established = self.check_establishment(established) + if established: + ZenohMiddlewarePubSub._instance.register_callback(self.in_topic, self.on_message) + return established + + def close(self): + """ + Closes the Zenoh listener. + This can be overridden by child classes to add cleanup operations. + """ + pass + + +@Listeners.register("NativeObject", "zenoh") +class ZenohNativeObjectListener(ZenohListener): + """ + Zenoh NativeObject listener for handling JSON-encoded native objects. + Decodes incoming messages to native Python objects using JsonDecodeHook. + + :param name: str: Name of the listener + :param in_topic: str: Name of the input topic + :param should_wait: bool: Whether to wait for messages + :param deserializer_kwargs: dict: Keyword arguments for the JSON deserializer + """ + + def __init__(self, name: str, in_topic: str, should_wait: bool = True, deserializer_kwargs: Optional[dict] = None, **kwargs): + super().__init__(name, in_topic, should_wait=should_wait, **kwargs) + self._plugin_decoder_hook = JsonDecodeHook(**kwargs).object_hook + self._message_queue = queue.Queue() + self._deserializer_kwargs = deserializer_kwargs or {} + + def on_message(self, sample): + """ + Handles incoming messages by decoding JSON into native objects using JsonDecodeHook. + + :param sample: zenoh.Sample: The Zenoh sample received + """ + try: + obj = json.loads(sample.payload.to_bytes(), object_hook=self._plugin_decoder_hook, **self._deserializer_kwargs) + self._message_queue.put(obj) + logging.debug(f"Queued message for topic {self.in_topic}: {obj}") + except json.JSONDecodeError as e: + logging.error(f"Failed to decode JSON from topic {self.in_topic}: {e}") + + def listen(self): + """ + Listen for a message, ensuring the connection is established. + + :return: Any: The received message as a native Python object + """ + if not self.established: + established = self.establish(repeats=WATCHDOG_POLL_REPEATS) + if not established: + return None + + try: + return self._message_queue.get(block=self.should_wait) + except queue.Empty: + return None + + +@Listeners.register("Image", "zenoh") +class ZenohImageListener(ZenohNativeObjectListener): + """ + Zenoh Image listener for handling image messages. + Converts incoming data to OpenCV images, supporting JPEG and raw formats. + + :param name: str: Name of the listener + :param in_topic: str: Name of the input topic + :param should_wait: bool: Whether to wait for messages + :param width: int: Expected image width, -1 to use received width + :param height: int: Expected image height, -1 to use received height + :param rgb: bool: True if the image is RGB, False if grayscale + :param jpg: bool: True if the image is JPEG-compressed + """ + + def __init__(self, name: str, in_topic: str, should_wait: bool = True, width: int = -1, height: int = -1, rgb: bool = True, jpg: bool = False, **kwargs): + super().__init__(name, in_topic, should_wait=should_wait, **kwargs) + self.width = width + self.height = height + self.rgb = rgb + self.jpg = jpg + self._message_queue = queue.Queue() + + def on_message(self, sample): + """ + Handles incoming image messages, converting data to OpenCV format. + + :param sample: zenoh.Sample: Zenoh sample payload + """ + try: + np_data = np.frombuffer(sample.payload.to_bytes(), dtype=np.uint8) + if self.jpg: + img = cv2.imdecode(np_data, cv2.IMREAD_COLOR if self.rgb else cv2.IMREAD_GRAYSCALE) + else: + img = np_data.reshape(self.height, self.width, 3 if self.rgb else 1) + self._message_queue.put(img) + except Exception as e: + logging.error(f"Failed to process image message: {e}") + + def listen(self): + """ + Listen for a message, ensuring the connection is established. + + :return: np.ndarray: The received image as an OpenCV-formatted array + """ + if not self.established: + established = self.establish(repeats=WATCHDOG_POLL_REPEATS) + if not established: + return None + + try: + return self._message_queue.get(block=self.should_wait) + except queue.Empty: + return None + + +@Listeners.register("AudioChunk", "zenoh") +class ZenohAudioChunkListener(ZenohNativeObjectListener): + """ + Zenoh AudioChunk listener for handling audio messages. + Converts incoming data to numpy arrays for audio processing. + + :param name: str: Name of the listener + :param in_topic: str: Name of the input topic + :param should_wait: bool: Whether to wait for messages + :param channels: int: Number of audio channels + :param rate: int: Sampling rate of the audio + :param chunk: int: Number of samples in the audio chunk + """ + + def __init__(self, name: str, in_topic: str, should_wait: bool = True, channels: int = 1, rate: int = 44100, chunk: int = -1, **kwargs): + super().__init__(name, in_topic, should_wait=should_wait, **kwargs) + self.channels = channels + self.rate = rate + self.chunk = chunk + self._message_queue = queue.Queue() + + def on_message(self, sample): + """ + Processes incoming audio messages into structured numpy arrays. + + :param sample: zenoh.Sample: Zenoh sample payload + """ + try: + aud_array = np.frombuffer(sample.payload.to_bytes(), dtype=np.float32).reshape(-1, self.channels) + self._message_queue.put((aud_array, self.rate)) + except Exception as e: + logging.error(f"Failed to process audio message: {e}") + + def listen(self): + """ + Listen for a message, ensuring the connection is established. + + :return: Tuple[np.ndarray, int]: The received audio chunk and sampling rate + """ + if not self.established: + established = self.establish(repeats=WATCHDOG_POLL_REPEATS) + if not established: + return None, self.rate + + try: + return self._message_queue.get(block=self.should_wait) + except queue.Empty: + return None, self.rate + diff --git a/wrapyfi/middlewares/zenoh.py b/wrapyfi/middlewares/zenoh.py new file mode 100644 index 0000000..c63b812 --- /dev/null +++ b/wrapyfi/middlewares/zenoh.py @@ -0,0 +1,92 @@ +import logging +import threading +import atexit +import json + +import zenoh + +from wrapyfi.utils import SingletonOptimized +from wrapyfi.connect.wrapper import MiddlewareCommunicator + + +class ZenohMiddlewarePubSub(metaclass=SingletonOptimized): + """ + Zenoh middleware wrapper with singleton pattern. + The `activate` method initializes the middleware, and `deinit` handles cleanup. + + Configurations are initialized by merging any provided keyword arguments, + environment variables, and direct `zenoh.Config` parameters. + """ + + _instance = None # Singleton instance + + @staticmethod + def activate(config: zenoh.Config = None, **kwargs): + """ + Activates the Zenoh middleware. Initializes the Zenoh session by merging a provided configuration, + environment variables, and additional keyword arguments. + + :param config: zenoh.Config: Optional Zenoh configuration; merged with environment and `kwargs` + :param kwargs: dict: Additional settings for customization + :return: ZenohMiddlewarePubSub instance + """ + zenoh.init_log_from_env_or("error") + if ZenohMiddlewarePubSub._instance is None: + ZenohMiddlewarePubSub._instance = ZenohMiddlewarePubSub(config=config, **kwargs) + return ZenohMiddlewarePubSub._instance + + def __init__(self, config: zenoh.Config = None, **kwargs): + """ + Initializes the Zenoh session and sets up a clean exit with deinitialization. + + :param config: zenoh.Config: Configuration for Zenoh session + """ + logging.info("Initializing Zenoh middleware") + + # Initialize Zenoh session with configuration or default values + self.config = config or self._merge_config_with_env(kwargs) + self.session = zenoh.open(self.config) + self.subscribers = {} + + # Ensure cleanup at exit + atexit.register(MiddlewareCommunicator.close_all_instances) + atexit.register(self.deinit) + + def _merge_config_with_env(self, config_kwargs): + """ + Merges given configuration parameters with environment-based defaults. + + :param config_kwargs: dict: Direct configuration values to merge + :return: zenoh.Config: Complete Zenoh configuration instance + """ + config = zenoh.Config() + for key, value in config_kwargs.items(): + config.insert_json5(key, json.dumps(value)) + return config + + def register_callback(self, topic: str, callback): + """ + Registers an event handler for a specific topic. + + :param topic: str: The topic to subscribe to + :param callback: callable: Function to call upon receiving a message + """ + if topic not in self.subscribers: + self.subscribers[topic] = self.session.declare_subscriber(topic, callback) + logging.info(f"[ZenohMiddlewarePubSub] Registered callback for topic {topic}") + + def is_connected(self) -> bool: + """ + Checks if the Zenoh session is active. + + :return: bool: True if connected, False otherwise + """ + return self.session is not None and not self.session.is_closed() + + def deinit(self): + """ + Closes the Zenoh session upon exit. + """ + logging.info("[ZenohMiddlewarePubSub] Closing Zenoh session") + self.session.close() + diff --git a/wrapyfi/publishers/zenoh.py b/wrapyfi/publishers/zenoh.py new file mode 100644 index 0000000..337931f --- /dev/null +++ b/wrapyfi/publishers/zenoh.py @@ -0,0 +1,341 @@ +import logging +import json +import time +import os +import threading +from typing import Optional, Tuple + +import numpy as np +import cv2 +import zenoh + +from wrapyfi.connect.publishers import Publisher, Publishers, PublisherWatchDog +from wrapyfi.middlewares.zenoh import ZenohMiddlewarePubSub +from wrapyfi.encoders import JsonEncoder + + +# Capture environment variables for Zenoh configuration +ZENOH_IP = os.getenv("WRAPYFI_ZENOH_IP", "127.0.0.1") +ZENOH_PORT = int(os.getenv("WRAPYFI_ZENOH_PORT", 7447)) +ZENOH_MODE = os.getenv("WRAPYFI_ZENOH_MODE", "peer") +ZENOH_CONNECT = json.loads(os.getenv("WRAPYFI_ZENOH_CONNECT", "[]")) +ZENOH_LISTEN = json.loads(os.getenv("WRAPYFI_ZENOH_LISTEN", "[]")) +ZENOH_CONFIG_FILEPATH = os.getenv("WRAPYFI_ZENOH_CONFIG_FILEPATH", None) +ZENOH_MONITOR_LISTENER_SPAWN = os.getenv("WRAPYFI_ZENOH_MONITOR_LISTENER_SPAWN", "thread") + +WATCHDOG_POLL_INTERVAL = float(os.getenv("WRAPYFI_ZENOH_RETRY_INTERVAL", 0.2)) +WATCHDOG_POLL_REPEATS = int(os.getenv("WRAPYFI_ZENOH_MAX_REPEATS", -1)) + +# Ensure the monitor listener spawn type is compatible +if ZENOH_MONITOR_LISTENER_SPAWN == "process": + ZENOH_MONITOR_LISTENER_SPAWN = "thread" + logging.warning( + "[Zenoh] Wrapyfi does not support multiprocessing for Zenoh. " + "Switching automatically to 'thread' mode." + ) + + +class ZenohPublisher(Publisher): + """ + Base Zenoh publisher class that configures and initializes Zenoh middleware. + Sets up connection handling and establishes connection on demand. + """ + + def __init__( + self, + name: str, + out_topic: str, + should_wait: bool = True, + ip: str = ZENOH_IP, + port: int = ZENOH_PORT, + mode: str = ZENOH_MODE, + monitor_listener_spawn: Optional[str] = ZENOH_MONITOR_LISTENER_SPAWN, + zenoh_kwargs: Optional[dict] = None, + **kwargs, + ): + """ + Initialize the Zenoh publisher. + + :param name: str: Name of the publisher + :param out_topic: str: Name of the output topic + :param should_wait: bool: Whether to wait for at least one listener before unblocking the script. Default is True + :param ip: str: IP address for the Zenoh connection. Default is '127.0.0.1' + :param port: int: Port for the Zenoh connection. Default is 7447 + :param mode: str: Mode for Zenoh session (`peer` or `client`) + :param monitor_listener_spawn: str: Listener spawn method (thread or process) + :param zenoh_kwargs: dict: Additional kwargs for the Zenoh middleware + """ + out_topic = out_topic.strip("/") + super().__init__(name, out_topic, should_wait=should_wait, **kwargs) + + # Prepare Zenoh configuration + self.zenoh_config = { + "mode": mode, + "connect/endpoints": ZENOH_CONNECT if isinstance(ZENOH_CONNECT, list) else ZENOH_CONNECT.split(",") if isinstance(ZENOH_CONNECT, str) else [f"tcp/{ip}:{port}"], + "listen/endpoints": ZENOH_LISTEN if isinstance(ZENOH_LISTEN, list) else ZENOH_LISTEN.split(",") if isinstance(ZENOH_LISTEN, str) else [f"tcp/{ip}:{port}"], + **(zenoh_kwargs or {}) + } + + ZenohMiddlewarePubSub.activate(config=self._prepare_config(self.zenoh_config), **kwargs) + + def _prepare_config(self, zenoh_kwargs): + """ + Converts keyword arguments to a zenoh.Config object. + + :param zenoh_kwargs: dict: Configuration parameters + :return: zenoh.Config: Configured Zenoh session + """ + config = zenoh.Config().from_file(ZENOH_CONFIG_FILEPATH) if ZENOH_CONFIG_FILEPATH else zenoh.Config() + for key, value in zenoh_kwargs.items(): + config.insert_json5(key, json.dumps(value)) + return config + + def await_connection(self, out_topic: Optional[str] = None, repeats: Optional[int] = None): + """ + Wait for the connection to be established. + + :param out_topic: str: Name of the output topic + :param repeats: int: Number of repeats to await connection. None for infinite. Default is None + :return: bool: True if connection established, False otherwise + """ + if out_topic is None: + out_topic = self.out_topic + logging.info(f"[Zenoh] Waiting for output connection: {out_topic}") + if repeats is None: + repeats = -1 if self.should_wait else 0 + + while repeats > 0 or repeats == -1: + if repeats != -1: + repeats -= 1 + connected = ZenohMiddlewarePubSub._instance.is_connected() + if connected: + ZenohMiddlewarePubSub._instance.session.declare_publisher(out_topic) + logging.info(f"[Zenoh] Output connection established: {out_topic}") + return True + time.sleep(WATCHDOG_POLL_INTERVAL) + return False + + def close(self): + """ + Close the publisher. + """ + logging.info(f"[Zenoh] Closing publisher for topic: {self.out_topic}") + ZenohMiddlewarePubSub._instance.session.close() + time.sleep(0.2) + + def __del__(self): + self.close() + + +@Publishers.register("NativeObject", "zenoh") +class ZenohNativeObjectPublisher(ZenohPublisher): + """ + Zenoh NativeObject publisher for publishing JSON-encoded native objects using JsonEncoder. + Serializes the data and publishes it as a JSON string. + """ + + def __init__( + self, + name: str, + out_topic: str, + should_wait: bool = True, + multi_threaded: bool = False, + serializer_kwargs: Optional[dict] = None, + **kwargs, + ): + """ + The NativeObjectPublisher using the Zenoh message construct assuming a combination of Python native objects + and numpy arrays as input. Serializes the data (including plugins) using the encoder and sends it as a string. + + :param name: str: Name of the publisher + :param out_topic: str: Name of the output topic (e.g., 'topic') + :param should_wait: bool: Whether to wait for at least one listener before unblocking the script. Default is True + :param multi_threaded: bool: Whether to use a separate session for each thread. Default is False + :param serializer_kwargs: dict: Additional kwargs for the serializer + """ + super().__init__(name, out_topic, should_wait=should_wait, **kwargs) + if multi_threaded: + self._thread_local_storage = threading.local() + + self._plugin_encoder = JsonEncoder + self._plugin_kwargs = kwargs + self._serializer_kwargs = serializer_kwargs or {} + + if not self.should_wait: + PublisherWatchDog().add_publisher(self) + + def establish(self, repeats: Optional[int] = None, **kwargs): + """ + Establish the connection to the publisher. + + :param repeats: int: Number of repeats to await connection. None for infinite. Default is None + :return: bool: True if connection established, False otherwise + """ + established = self.await_connection(repeats=repeats) + return self.check_establishment(established) + + def publish(self, obj): + """ + Publish the object to the middleware. + + :param obj: object: Object to publish + """ + if not self.established: + established = self.establish(repeats=WATCHDOG_POLL_REPEATS) + if not established: + return + else: + time.sleep(0.2) + + obj_str = json.dumps( + obj, + cls=self._plugin_encoder, + **self._plugin_kwargs, + **self._serializer_kwargs, + ) + ZenohMiddlewarePubSub._instance.session.put(self.out_topic, obj_str) + + +@Publishers.register("Image", "zenoh") +class ZenohImagePublisher(ZenohNativeObjectPublisher): + """ + Zenoh Image publisher for publishing image data as numpy arrays. + Supports publishing both JPEG-compressed and raw images. + """ + + def __init__( + self, + name: str, + out_topic: str, + should_wait: bool = True, + multi_threaded: bool = False, + width: int = -1, + height: int = -1, + rgb: bool = True, + fp: bool = False, + jpg: bool = False, + **kwargs, + ): + """ + The ImagePublisher using the Zenoh message construct assuming a numpy array as input. + + :param name: str: Name of the publisher + :param out_topic: str: Name of the output topic (e.g., 'topic') + :param should_wait: bool: Whether to wait for at least one listener before unblocking the script. Default is True + :param multi_threaded: bool: Whether to use a separate session for each thread. Default is False + :param width: int: Width of the image. Default is -1 (dynamic width) + :param height: int: Height of the image. Default is -1 (dynamic height) + :param rgb: bool: True if the image is RGB, False if grayscale. Default is True + :param fp: bool: True if the image is floating point, False if integer. Default is False + :param jpg: bool: True if the image should be compressed as JPG. Default is False + """ + super().__init__(name, out_topic, should_wait=should_wait, **kwargs) + self.width = width + self.height = height + self.rgb = rgb + self.fp = fp + self.jpg = jpg + self._type = np.float32 if self.fp else np.uint8 + + def publish(self, img: np.ndarray): + """ + Publish the image to the middleware. + + :param img: np.ndarray: Image to publish formatted as a cv2 image np.ndarray[img_height, img_width, channels] + """ + if img is None: + return + + if not self.established: + established = self.establish(repeats=WATCHDOG_POLL_REPEATS) + if not established: + return + else: + time.sleep(0.2) + + if ( + 0 < self.width != img.shape[1] + or 0 < self.height != img.shape[0] + or not ( + (img.ndim == 2 and not self.rgb) + or (img.ndim == 3 and self.rgb and img.shape[2] == 3) + ) + ): + raise ValueError("Incorrect image shape for publisher") + if not img.flags["C_CONTIGUOUS"]: + img = np.ascontiguousarray(img) + + if self.jpg: + _, img_encoded = cv2.imencode(".jpg", img) + img_bytes = img_encoded.tobytes() + header = {"timestamp": time.time()} + else: + img_bytes = img.tobytes() + header = {"timestamp": time.time(), "shape": img.shape, "dtype": str(img.dtype)} + + ZenohMiddlewarePubSub._instance.session.put(self.out_topic, [header, img_bytes]) + + +@Publishers.register("AudioChunk", "zenoh") +class ZenohAudioChunkPublisher(ZenohNativeObjectPublisher): + """ + Zenoh AudioChunk publisher for publishing audio data as numpy arrays. + Supports publishing multi-channel audio with variable sampling rates. + """ + + def __init__( + self, + name: str, + out_topic: str, + should_wait: bool = True, + multi_threaded: bool = False, + channels: int = 1, + rate: int = 44100, + chunk: int = -1, + **kwargs, + ): + """ + The AudioChunkPublisher using the Zenoh message construct assuming a numpy array as input. + + :param name: str: Name of the publisher + :param out_topic: str: Name of the output topic (e.g., 'topic') + :param should_wait: bool: Whether to wait for at least one listener before unblocking the script. Default is True + :param multi_threaded: bool: Whether to use a separate session for each thread. Default is False + :param channels: int: Number of channels. Default is 1 + :param rate: int: Sampling rate. Default is 44100 + :param chunk: int: Chunk size. Default is -1 (dynamic chunk size) + """ + super().__init__(name, out_topic, should_wait=should_wait, multi_threaded=multi_threaded, **kwargs) + self.channels = channels + self.rate = rate + self.chunk = chunk + + def publish(self, aud: Tuple[np.ndarray, int]): + """ + Publish the audio chunk to the middleware. + + :param aud: Tuple[np.ndarray, int]: Audio chunk to publish formatted as (np.ndarray[audio_chunk, channels], int[samplerate]) + """ + if not self.established: + established = self.establish(repeats=WATCHDOG_POLL_REPEATS) + if not established: + return + else: + time.sleep(0.2) + + aud_array, rate = aud + if aud_array is None: + return + if 0 < self.rate != rate: + raise ValueError("Incorrect audio rate for publisher") + chunk, channels = aud_array.shape if len(aud_array.shape) > 1 else (aud_array.shape[0], 1) + self.chunk = chunk if self.chunk == -1 else self.chunk + self.channels = channels if self.channels == -1 else self.channels + if 0 < self.chunk != chunk or 0 < self.channels != channels: + raise ValueError("Incorrect audio shape for publisher") + aud_array = np.require(aud_array, dtype=np.float32, requirements="C") + + aud_bytes = aud_array.tobytes() + header = {"shape": aud_array.shape, "dtype": str(aud_array.dtype), "rate": rate, "timestamp": time.time()} + ZenohMiddlewarePubSub._instance.session.put(self.out_topic, [header, aud_bytes]) +