From 267c3a85b37f88085685ec516b12660586e25277 Mon Sep 17 00:00:00 2001 From: abawi Date: Fri, 1 Nov 2024 02:26:04 +0100 Subject: [PATCH] Zenoh PUB/SUB added to the list of supported middleware --- README.md | 46 +++++++++---------- docs/exclude_packages.json | 2 +- docs/usage.md | 31 +++++++++---- .../User Guide/Communication Patterns.md | 11 ++++- .../usage/User Guide/Environment Variables.md | 8 +++- docs/usage/User Guide/Middleware.md | 13 +++--- examples/sensors/cam_mic.py | 4 +- pyproject.toml | 21 +++++++++ setup.py | 9 +++- wrapyfi/listeners/zenoh.py | 32 +++++++++---- wrapyfi/middlewares/zenoh.py | 1 + wrapyfi/publishers/zenoh.py | 35 ++++++++++---- 12 files changed, 150 insertions(+), 63 deletions(-) diff --git a/README.md b/README.md index cb612d7..b68d644 100755 --- a/README.md +++ b/README.md @@ -37,9 +37,7 @@ Wrapyfi is a middleware communication wrapper for transmitting data across nodes altering the operation pipeline of your Python scripts. Wrapyfi introduces a number of helper functions to make middleware integration possible without the need to learn an entire framework, just to parallelize your processes on multiple machines. -Wrapyfi supports [YARP](https://www.yarp.it/yarp_swig.html), [ROS](http://wiki.ros.org/rospy), [ROS 2](https://docs.ros2.org/foxy/api/rclpy/index.html), [ZeroMQ](http://zeromq.org/), [Websocket](https://socket.io/), and [MQTT](https://mqtt.org). - - +Wrapyfi supports [YARP](https://www.yarp.it/yarp_swig.html), [ROS](http://wiki.ros.org/rospy), [ROS 2](https://docs.ros2.org/foxy/api/rclpy/index.html), [ZeroMQ](http://zeromq.org/), [Websocket](https://socket.io/), [Zenoh](https://zenoh.io/) and [MQTT](https://mqtt.org). # Attribution @@ -61,33 +59,37 @@ Please refer to the following [paper](https://www2.informatik.uni-hamburg.de/wtm # Getting Started -Before using Wrapyfi, YARP, ROS, ZeroMQ, Websocket, or MQTT must be installed. +Before using Wrapyfi, YARP, ROS, ZeroMQ, Websocket, Zenoh, or MQTT must be installed. -* Follow the [YARP installation guide](https://github.com/fabawi/wrapyfi/tree/main/wrapyfi_extensions/yarp/README.md?rank=0). -Note that the iCub package is not needed for Wrapyfi to work and does not have to be installed if you do not intend to use the iCub robot. +* **YARP**: Follow the [YARP installation guide](https://github.com/fabawi/wrapyfi/tree/main/wrapyfi_extensions/yarp/README.md?rank=0). +The `yarpserver` server must be running before running any YARP-based scripts. Note that the iCub package is not needed for Wrapyfi to work and does not have to be installed if you do not intend to use the iCub robot. -* For installing ROS, follow the ROS installation guide [\[Ubuntu\]](http://wiki.ros.org/noetic/Installation/Ubuntu)[\[Windows\]](https://wiki.ros.org/noetic/Installation/Windows). -We recommend installing ROS on Conda using the [RoboStack](https://github.com/RoboStack/ros-noetic) environment. Additionally, the +* **ROS**: For installing ROS, follow the ROS installation guide [\[Ubuntu\]](http://wiki.ros.org/noetic/Installation/Ubuntu)[\[Windows\]](https://wiki.ros.org/noetic/Installation/Windows). +We recommend installing ROS on Conda using the [RoboStack](https://github.com/RoboStack/ros-noetic) environment. The `roscore` server must be running before running any ROS-based scripts. Additionally, the [Wrapyfi ROS interfaces](https://github.com/modular-ml/wrapyfi_ros_interfaces/blob/master/README.md?rank=0) must be built to support messages needed for audio transmission [![ROS Package Index](https://img.shields.io/ros/v/noetic/wrapyfi_ros_interfaces)](https://index.ros.org/r/wrapyfi_ros_interfaces/#noetic) -* For installing ROS 2, follow the ROS 2 installation guide [\[Ubuntu\]](https://docs.ros.org/en/humble/Installation/Ubuntu-Install-Debians.html)[\[Windows\]](https://docs.ros.org/en/humble/Installation/Windows-Install-Binary.html). +* **ROS 2**: For installing ROS 2, follow the ROS 2 installation guide [\[Ubuntu\]](https://docs.ros.org/en/humble/Installation/Ubuntu-Install-Debians.html)[\[Windows\]](https://docs.ros.org/en/humble/Installation/Windows-Install-Binary.html). We recommend installing ROS 2 on Conda using the [RoboStack](https://github.com/RoboStack/ros-humble) environment. Additionally, the [Wrapyfi ROS 2 interfaces](https://github.com/modular-ml/wrapyfi_ros2_interfaces/blob/master/README.md?rank=0) must be built to support messages and services needed for audio transmission and the REQ/REP pattern [![ROS Package Index](https://img.shields.io/ros/v/humble/wrapyfi_ros2_interfaces)](https://index.ros.org/p/wrapyfi_ros2_interfaces/#humble) -* ZeroMQ can be installed using pip: `pip install pyzmq`. +* **ZeroMQ**: ZeroMQ can be installed using pip: `pip install pyzmq`. The XPUB/XSUB and XREQ/XREP patterns followed in our ZeroMQ implementation requires a proxy broker. A broker is spawned by default as a daemon process. To avoid automatic spawning, pass the argument `start_proxy_broker=False` to the method register decorator. A standalone broker can be found [here](https://github.com/fabawi/wrapyfi/tree/main/wrapyfi/standalone/zeromq_proxy_broker.py) -* Websocket can be installed using pip: `pip install python-socketio`. +* **Websocket**: Websocket can be installed using pip: `pip install python-socketio`. The PUB/SUB pattern followed in our Websocket implementation requires a socket server. We recommend setting the server to run using [Flask-SocketIO](https://flask-socketio.readthedocs.io/en/latest/) which can be installed with `pip install flask-socketio`. Note that the server must be running and also scripted to forward messages to the listening from the publishing client as demonstrated in the example found [here](https://github.com/fabawi/wrapyfi/tree/main/wrapyfi/examples/websockets/websocket_server.py) -* MQTT can be installed using pip: `pip install paho-mqtt`. +* **Zenoh**: Zenoh can be installed using pip: `pip install zenoh`. It is recommended to use the `WRAPYFI_ZENOH_MODE` environment variable to set the mode to `peer` for running in peer-to-peer mode. +The PUB/SUB pattern followed in our Zenoh implementation requires a router. To install the Zenoh router, follow the instructions found [here](https://github.com/eclipse-zenoh/zenoh/?tab=readme-ov-file#how-to-install-it). +The `zenohd` router must be running before running any Zenoh-based scripts. *NOTE*: The `zenohd --rest-http-port 8082` command must be executed with an arbitrary (non-conflicting) port to avoid collision with other services occupying the default port (8000). + +* **MQTT**: MQTT can be installed using pip: `pip install paho-mqtt`. The PUB/SUB pattern followed in our MQTT implementation requires a broker. The default broker used by Wrapyfi [broker.emqx.io](https://broker.emqx.io). However, this broker is not recommended for production use or for transmitting video/audio as it is a public online broker and requires an internet connection (not secure and suffers high latency). We recommend setting up a local broker using [Mosquitto](https://mosquitto.org/download/). A Dockerized version can be found [here](https://github.com/sukesh-ak/setup-mosquitto-with-docker). The broker must be running, and the `WRAPYFI_MQTT_BROKER_ADDRESS` as well as `WRAPYFI_MQTT_BROKER_PORT` environment variables must be set to the @@ -124,6 +126,7 @@ def send_message(self): * ROS 2 Humble Hawksbill **|** Galactic Geochelone **|** Foxy Fitzroy * PyZMQ 16.0, 17.1 and 19.0 * Python-SocketIO >= 5.0.4 +* Eclipse-Zenoh >= 1.0.0 * Paho-MQTT >= 2.0 *(Hard-coded to v2 in Wrapyfi and not compatible with v1)* @@ -181,13 +184,19 @@ or when installing Wrapyfi to work with websockets (headless) including `numpy`, pip install .[headless_websockets] ``` +or when installing Wrapyfi to work with Zenoh (headless) including `numpy`, `opencv-python-headless`, and `eclipse-zenoh`: + +``` +pip install .[headless_zenoh] +``` + or when installing Wrapyfi to work with MQTT (headless) including `numpy`, `opencv-python-headless`, and `paho-mqtt`: ``` pip install .[headless_mqtt] ``` -or install Wrapyfi *without* NumPy, OpenCV, ZeroMQ, Websocket, and MQTT: +or install Wrapyfi *without* NumPy, OpenCV, ZeroMQ, Websocket, Zenoh, and MQTT: ``` pip install . @@ -367,22 +376,11 @@ For more examples of usage, refer to the [user guide](docs/usage.md). Run script # Supported Formats -## Middleware -- [x] **YARP** -- [x] **ROS** -- [x] **ROS 2** -- [x] **ZeroMQ** [*beta feature*]: - * `should_wait` trigger introduced with event monitoring - * Event monitoring currently cannot be disabled [![planned](https://custom-icon-badges.demolab.com/badge/planned%20for%20Wrapyfi%20v0.5-%23C2E0C6.svg?logo=hourglass&logoColor=white)](https://github.com/modular-ml/wrapyfi/issues/99 "planned link") -- [x] **Websocket** *Only PUB/SUB* [*alpha support*] -- [x] **MQTT** *Only PUB/SUB* [*alpha support*] - ## Serializers - [x] **JSON** - [ ] **msgpack** - [ ] **protobuf** - ## Data Structures Supported Objects by the `NativeObject` type include: diff --git a/docs/exclude_packages.json b/docs/exclude_packages.json index e73d245..9d76195 100644 --- a/docs/exclude_packages.json +++ b/docs/exclude_packages.json @@ -1 +1 @@ -["torch", "tensorflow", "collections", "std_msgs.msg", "std_msgs.msg.String", "std_msgs.msg.Image", "dask.array", "dask.dataframe", "numpy", "cupy.ndarray", "wrapyfi.encoders", "wrapyfi.connect.wrapper", "wrapyfi.connect.listeners", "PIL", "importlib", "scipy.stats", "rostopic", "Cython.Compiler.UtilNodes", "jax.numpy", "jax.numpy.DeviceArray", "jax.Array", "jaxlib.xla_extension.ArrayImpl", "trax", "trax.fastmath", "trax.fastmath.numpy", "trax.fastmath", "rospy", "wrapyfi.connect.clients", "xarray.DataArray", "xarray.Dataset", "wrapyfi_ros2_interfaces.srv", "zipfile", "wrapyfi.config.manager", "Utils", "zipimport", "rclpy.node", "cv2", "wrapyfi.tests.tools.class_test", "wrapyfi.connect.servers", "pandas.DataFrame", "pandas.Series", "wrapyfi.utils", "yarp", "rclpy", "astropy.table", "pint", "mxnet", "wrapyfi.connect.publishers", "pexpect", "zarr.Array", "zarr.Group", "sensor_msgs.msg", "gzip", "zmq", "pyarrow.StructArray", "paddle", "sounddevice", "traceback", "geometry_msgs.msg", "astropy", "tempfile"] +["zenoh", "torch", "tensorflow", "collections", "std_msgs.msg", "std_msgs.msg.String", "std_msgs.msg.Image", "dask.array", "dask.dataframe", "numpy", "cupy.ndarray", "wrapyfi.encoders", "wrapyfi.connect.wrapper", "wrapyfi.connect.listeners", "PIL", "importlib", "scipy.stats", "rostopic", "Cython.Compiler.UtilNodes", "jax.numpy", "jax.numpy.DeviceArray", "jax.Array", "jaxlib.xla_extension.ArrayImpl", "trax", "trax.fastmath", "trax.fastmath.numpy", "trax.fastmath", "rospy", "wrapyfi.connect.clients", "xarray.DataArray", "xarray.Dataset", "wrapyfi_ros2_interfaces.srv", "zipfile", "wrapyfi.config.manager", "Utils", "zipimport", "rclpy.node", "cv2", "wrapyfi.tests.tools.class_test", "wrapyfi.connect.servers", "pandas.DataFrame", "pandas.Series", "wrapyfi.utils", "yarp", "rclpy", "astropy.table", "pint", "mxnet", "wrapyfi.connect.publishers", "pexpect", "zarr.Array", "zarr.Group", "sensor_msgs.msg", "gzip", "zmq", "pyarrow.StructArray", "paddle", "sounddevice", "traceback", "geometry_msgs.msg", "astropy", "tempfile"] diff --git a/docs/usage.md b/docs/usage.md index 520cf23..cb128bb 100755 --- a/docs/usage.md +++ b/docs/usage.md @@ -248,7 +248,7 @@ All messages are transmitted using the `zmq` Python bindings. Transmission follo #### Websocket: ```{note} -Websocket assumes a server is running on the specified address and port. The forwarding of messages canonly be done manually by the user. An example server can be found [here](https://github.com/fabawi/wrapyfi/tree/main/wrapyfi/examples/websockets/websocket_server.py) +Websocket assumes a server is running on the specified address and port. The forwarding of messages can only be done manually by the user. An example server can be found [here](https://github.com/fabawi/wrapyfi/tree/main/wrapyfi/examples/websockets/websocket_server.py) ``` All messages are transmitted using the `python-socketio` Python bindings. Transmission follows the [socket.io protocol](https://socket.io/docs/v4/) @@ -259,6 +259,15 @@ All messages are transmitted using the `python-socketio` Python bindings. Transm `socketio.emit` for publishing and `socketio.on` for receiving messages * **Properties**: Transmits properties [![planned](https://custom-icon-badges.demolab.com/badge/planned%20for%20Wrapyfi%20v0.5-%23C2E0C6.svg?logo=hourglass&logoColor=white)](https://github.com/modular-ml/wrapyfi/issues/99 "planned link") +#### Zenoh: + +All messages are transmitted using the `eclipse-zenoh` Python bindings. Transmission follows the [zenoh protocol](https://zenoh.io/) + +* **Image**: Transmits and receives a `cv2` or `numpy` image wrapped in the `NativeObject` construct as `zenoh.Bytes` +* **AudioChunk**: Transmits and receives a `numpy` audio chunk wrapped in the `NativeObject` construct as `zenoh.Bytes` +* **NativeObject**: Transmits and receives a `json` string supporting all native Python objects, `numpy` arrays and [other formats]() as `zenoh.Bytes` using + `zenoh.session.key.put` for publishing and an asynchronus callback for receiving messages + #### MQTT: ```{note} @@ -412,15 +421,15 @@ Wrapyfi natively supports a [number of middleware](#middleware). However, more m * Ensure that the middleware communication pattern scripts reside within directories named `listeners`, `publishers`, `clients`, or `servers` nested inside the `WRAPYFI_MWARE_PATH` and that the directory contains an `__init__.py` file ### Natively Supported Middleware -- **YARP** -- **ROS** -- **ROS 2** -- **ZeroMQ** [*beta feature*]: +- [YARP](https://www.yarp.it/yarp_swig.html) +- [ROS](http://wiki.ros.org/rospy) +- [ROS 2](https://docs.ros2.org/foxy/api/rclpy/index.html) +- [ZeroMQ](http://zeromq.org/) [*beta feature*]: * `should_wait` trigger introduced with event monitoring * Event monitoring currently cannot be disabled [![planned](https://custom-icon-badges.demolab.com/badge/planned%20for%20Wrapyfi%20v0.5-%23C2E0C6.svg?logo=hourglass&logoColor=white)](https://github.com/modular-ml/wrapyfi/issues/99 "planned link") -- **Websocket** *Only PUB/SUB* [*alpha support*] -- **MQTT** *Only PUB/SUB* [*alpha support*] - +- [Websocket](https://socket.io/) *Only PUB/SUB* [*alpha support*] +- [Zenoh](https://zenoh.io/) *Only PUB/SUB* [*alpha support*] +- [MQTT](https://mqtt.org) *Only PUB/SUB* [*alpha support*] ## Plugins @@ -566,6 +575,12 @@ This can be achieved by setting: * `WRAPYFI_WEBSOCKET_SOCKET_PORT`: The socket port. Defaults to 5000 * `WRAPYFI_WEBSOCKET_NAMESPACE`: The socket namespace. Defaults to "/" * `WRAPYFI_WEBSOCKET_MONITOR_LISTENER_SPAWN`: Either spawn the websocket monitor listener as a "process" or "thread". Defaults to "thread" which is the only supported option for now +* `WRAPYFI_ZENOH_IP`: IP address of the Zenoh socket. Defaults to "127.0.0.1" +* `WRAPYFI_ZENOH_PORT`: The Zenoh socket port. Defaults to 7447 +* `WRAPYFI_ZENOH_MODE`: The Zenoh mode indicating whether to use the router as a broker or adopt peer-to-peer communication. Defaults to "peer" +* `WRAPYFI_ZENOH_CONNECT`: The Zenoh connect endpoints seperated by a comma e.g., "tcp/127.0.0.1:7447,udp/127.0.0.1:7448". This overrides `WRAPYFI_ZENOH_IP` and `WRAPYFI_ZENOH_PORT`. Defaults to an empty list +* `WRAPYFI_ZENOH_LISTEN`: The Zenoh listen endpoints seperated by a comma e.g., "tcp/127.0.0.1:7446". Defaults to an empty list +* `WRAPYFI_ZENOH_CONFIG_FILEPATH`: The Zenoh configuration file path. Defaults to None. Conflicting keys are overriden by `WRAPYFI_ZENOH_IP`, `WRAPYFI_ZENOH_PORT`, `WRAPYFI_ZENOH_CONNECT`, and `WRAPYFI_ZENOH_LISTEN` * `WRAPYFI_MQTT_BROKER_ADDRESS`: The MQTT broker address. Defaults to "broker.emqx.io" * `WRAPYFI_MQTT_BROKER_PORT`: The MQTT broker port. Defaults to 1883 diff --git a/docs/usage/User Guide/Communication Patterns.md b/docs/usage/User Guide/Communication Patterns.md index 361182d..a30cb2c 100644 --- a/docs/usage/User Guide/Communication Patterns.md +++ b/docs/usage/User Guide/Communication Patterns.md @@ -83,7 +83,7 @@ All messages are transmitted using the `zmq` Python bindings. Transmission follo #### Websocket: ```{note} -Websocket assumes a server is running on the specified address and port. The forwarding of messages canonly be done manually by the user. An example server can be found [here](https://github.com/fabawi/wrapyfi/tree/main/wrapyfi/examples/websockets/websocket_server.py) +Websocket assumes a server is running on the specified address and port. The forwarding of messages can only be done manually by the user. An example server can be found [here](https://github.com/fabawi/wrapyfi/tree/main/wrapyfi/examples/websockets/websocket_server.py) ``` All messages are transmitted using the `python-socketio` Python bindings. Transmission follows the [socket.io protocol](https://socket.io/docs/v4/) @@ -94,6 +94,15 @@ All messages are transmitted using the `python-socketio` Python bindings. Transm `socketio.emit` for publishing and `socketio.on` for receiving messages * **Properties**: Transmits properties [![planned](https://custom-icon-badges.demolab.com/badge/planned%20for%20Wrapyfi%20v0.5-%23C2E0C6.svg?logo=hourglass&logoColor=white)](https://github.com/modular-ml/wrapyfi/issues/99 "planned link") +#### Zenoh: + +All messages are transmitted using the `eclipse-zenoh` Python bindings. Transmission follows the [zenoh protocol](https://zenoh.io/) + +* **Image**: Transmits and receives a `cv2` or `numpy` image wrapped in the `NativeObject` construct as `zenoh.Bytes` +* **AudioChunk**: Transmits and receives a `numpy` audio chunk wrapped in the `NativeObject` construct as `zenoh.Bytes` +* **NativeObject**: Transmits and receives a `json` string supporting all native Python objects, `numpy` arrays and [other formats]() as `zenoh.Bytes` using + `zenoh.session.key.put` for publishing and an asynchronus callback for receiving messages + #### MQTT: ```{note} diff --git a/docs/usage/User Guide/Environment Variables.md b/docs/usage/User Guide/Environment Variables.md index 22d6c35..4094312 100644 --- a/docs/usage/User Guide/Environment Variables.md +++ b/docs/usage/User Guide/Environment Variables.md @@ -9,7 +9,7 @@ Wrapyfi reserves specific environment variable names for the functionality of it ZeroMQ requires socket configurations that can be passed as arguments to the respective middleware constructor (through the Wrapyfi decorator) or using environment variables. Note that these configurations are needed both by the proxy and the message publisher and listener. The downside to such an approach is that all messages share the same configs. Since the proxy broker spawns once on first trigger (if enabled) as well as a singleton subscriber monitoring instance, using environment variables is the recommended approach to avoid unintended behavior. This can be achieved by setting: - + * `WRAPYFI_ZEROMQ_SOCKET_IP`: IP address of the socket. Defaults to "127.0.0.1" * `WRAPYFI_ZEROMQ_SOCKET_PUB_PORT`: The publishing socket port. Defaults to 5555 * `WRAPYFI_ZEROMQ_SOCKET_SUB_PORT`: The sub-socket port (listening port for the broker). Defaults to 5556 @@ -25,6 +25,12 @@ This can be achieved by setting: * `WRAPYFI_WEBSOCKET_SOCKET_PORT`: The socket port. Defaults to 5000 * `WRAPYFI_WEBSOCKET_NAMESPACE`: The socket namespace. Defaults to "/" * `WRAPYFI_WEBSOCKET_MONITOR_LISTENER_SPAWN`: Either spawn the websocket monitor listener as a "process" or "thread". Defaults to "thread" which is the only supported option for now +* `WRAPYFI_ZENOH_IP`: IP address of the Zenoh socket. Defaults to "127.0.0.1" +* `WRAPYFI_ZENOH_PORT`: The Zenoh socket port. Defaults to 7447 +* `WRAPYFI_ZENOH_MODE`: The Zenoh mode indicating whether to use the router as a broker or adopt peer-to-peer communication. Defaults to "peer" +* `WRAPYFI_ZENOH_CONNECT`: The Zenoh connect endpoints seperated by a comma e.g., "tcp/127.0.0.1:7447,udp/127.0.0.1:7448". This overrides `WRAPYFI_ZENOH_IP` and `WRAPYFI_ZENOH_PORT`. Defaults to an empty list +* `WRAPYFI_ZENOH_LISTEN`: The Zenoh listen endpoints seperated by a comma e.g., "tcp/127.0.0.1:7446". Defaults to an empty list +* `WRAPYFI_ZENOH_CONFIG_FILEPATH`: The Zenoh configuration file path. Defaults to None. Conflicting keys are overriden by `WRAPYFI_ZENOH_IP`, `WRAPYFI_ZENOH_PORT`, `WRAPYFI_ZENOH_CONNECT`, and `WRAPYFI_ZENOH_LISTEN` * `WRAPYFI_MQTT_BROKER_ADDRESS`: The MQTT broker address. Defaults to "broker.emqx.io" * `WRAPYFI_MQTT_BROKER_PORT`: The MQTT broker port. Defaults to 1883 diff --git a/docs/usage/User Guide/Middleware.md b/docs/usage/User Guide/Middleware.md index 22cbb8d..167150a 100644 --- a/docs/usage/User Guide/Middleware.md +++ b/docs/usage/User Guide/Middleware.md @@ -15,11 +15,12 @@ Wrapyfi natively supports a [number of middleware](#middleware). However, more m * Ensure that the middleware communication pattern scripts reside within directories named `listeners`, `publishers`, `clients`, or `servers` nested inside the `WRAPYFI_MWARE_PATH` and that the directory contains an `__init__.py` file ### Natively Supported Middleware -- **YARP** -- **ROS** -- **ROS 2** -- **ZeroMQ** [*beta feature*]: +- [YARP](https://www.yarp.it/yarp_swig.html) +- [ROS](http://wiki.ros.org/rospy) +- [ROS 2](https://docs.ros2.org/foxy/api/rclpy/index.html) +- [ZeroMQ](http://zeromq.org/) [*beta feature*]: * `should_wait` trigger introduced with event monitoring * Event monitoring currently cannot be disabled [![planned](https://custom-icon-badges.demolab.com/badge/planned%20for%20Wrapyfi%20v0.5-%23C2E0C6.svg?logo=hourglass&logoColor=white)](https://github.com/modular-ml/wrapyfi/issues/99 "planned link") -- **Websocket** *Only PUB/SUB* [*alpha support*] -- **MQTT** *Only PUB/SUB* [*alpha support*] \ No newline at end of file +- [Websocket](https://socket.io/) *Only PUB/SUB* [*alpha support*] +- [Zenoh](https://zenoh.io/) *Only PUB/SUB* [*alpha support*] +- [MQTT](https://mqtt.org) *Only PUB/SUB* [*alpha support*] \ No newline at end of file diff --git a/examples/sensors/cam_mic.py b/examples/sensors/cam_mic.py index 79bffef..b39d9b0 100755 --- a/examples/sensors/cam_mic.py +++ b/examples/sensors/cam_mic.py @@ -240,8 +240,8 @@ def parse_args(): default=0, help="The video capture device id (int camera id)", ) - parser.add_argument("--img_width", type=int, default=320, help="The image width") - parser.add_argument("--img_height", type=int, default=240, help="The image height") + parser.add_argument("--img_width", type=int, default=-1, help="The image width") + parser.add_argument("--img_height", type=int, default=-1, help="The image height") parser.add_argument( "--mic_source", type=int, diff --git a/pyproject.toml b/pyproject.toml index 71186ee..5205341 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,6 +56,9 @@ numpy = [ websocket = [ "python-socketio>=5.0.4", ] +zenoh = [ + "eclipse-zenoh>=1.0.0", +] mqtt = [ "paho-mqtt>=2.0", ] @@ -69,11 +72,29 @@ headless_websocket = [ "wrapyfi[websocket]", "opencv-python-headless>=4.2.0", ] +headless_zenoh = [ + "wrapyfi[numpy]", + "wrapyfi[zenoh]", + "opencv-python-headless>=4.2.0", +] headless_mqtt = [ "wrapyfi[numpy]", "wrapyfi[mqtt]", "opencv-python-headless>=4.2.0", ] +complete = [ + "wrapyfi[numpy]", + "sounddevice", + "soundfile", + "Pillow", + "pandas", + "wrapyfi[pyzmq]", + "wrapyfi[websocket]", + "wrapyfi[zenoh]", + "wrapyfi[mqtt]", + "wrapyfi[docs]", + "opencv-contrib-python>=4.2.0", +] all = [ "wrapyfi[numpy]", "wrapyfi[pyzmq]", diff --git a/setup.py b/setup.py index 94641d3..30bf784 100755 --- a/setup.py +++ b/setup.py @@ -44,7 +44,7 @@ def check_cv2(default_python="opencv-python"): setuptools.setup( name="wrapyfi", - version="0.4.50", + version="0.4.51", description="Wrapyfi is a wrapper for simplifying Middleware communication", url="https://github.com/fabawi/wrapyfi/blob/main/", project_urls={ @@ -62,14 +62,19 @@ def check_cv2(default_python="opencv-python"): "pyzmq": ["pyzmq>=19.0.0"], "numpy": ["numpy>=1.19.2"], "websocket": ["python_socketio>=5.0.4"], + "zenoh": ["eclipse-zenoh>=1.0.0"], "mqtt": ["paho-mqtt>=2.0"], "headless": ["wrapyfi[pyzmq]", "wrapyfi[numpy]"] + check_cv2("opencv-python-headless"), "headless_websocket": ["wrapyfi[websocket]", "wrapyfi[numpy]"] + check_cv2("opencv-python-headless"), + "headless_zenoh": ["wrapyfi[zenoh]", "wrapyfi[numpy]"] + + check_cv2("opencv-python-headless"), "headless_mqtt": ["wrapyfi[mqtt]", "wrapyfi[numpy]"] + check_cv2("opencv-python-headless"), - "all": ["wrapyfi[pyzmq]", "wrapyfi[numpy]"] + "complete": ["wrapyfi[numpy]", "sounddevice", "soundfile", "Pillow", "pandas", "wrapyfi[pyzmq]", "wrapyfi[websocket]", "wrapyfi[zenoh]", "wrapyfi[mqtt]", "wrapyfi[docs]"] + + check_cv2("opencv-contrib-python"), + "all": ["wrapyfi[numpy]", "wrapyfi[pyzmq]"] + check_cv2("opencv-contrib-python"), }, install_requires=["pyyaml>=5.1.1"], diff --git a/wrapyfi/listeners/zenoh.py b/wrapyfi/listeners/zenoh.py index 38e3811..ec798e9 100644 --- a/wrapyfi/listeners/zenoh.py +++ b/wrapyfi/listeners/zenoh.py @@ -34,6 +34,7 @@ "Switching automatically to 'thread' mode." ) + class ZenohListener(Listener): """ Base Zenoh listener class that configures and initializes Zenoh middleware. @@ -61,17 +62,16 @@ def __init__(self, name: str, in_topic: str, should_wait: bool = True, super().__init__(name, in_topic, should_wait=should_wait, **kwargs) # Prepare Zenoh configuration from environment variables and kwargs - zenoh_config = { + self.zenoh_config = { "mode": mode, "connect/endpoints": ZENOH_CONNECT if isinstance(ZENOH_CONNECT, list) else ZENOH_CONNECT.split(",") if isinstance(ZENOH_CONNECT, str) else [f"tcp/{ip}:{port}"], - "listen/endpoints": ZENOH_LISTEN if isinstance(ZENOH_LISTEN, list) else ZENOH_LISTEN.split(",") if isinstance(ZENOH_LISTEN, str) else [f"tcp/{ip}:{port}"], **(zenoh_kwargs or {}) } + if ZENOH_LISTEN: + self.zenoh_config["listen/endpoints"] = ZENOH_LISTEN if isinstance(ZENOH_LISTEN, list) else ZENOH_LISTEN.split(",") - # Activate Zenoh middleware with the prepared configuration - ZenohMiddlewarePubSub.activate(config=self._prepare_config(zenoh_config), **kwargs) + ZenohMiddlewarePubSub.activate(config=self._prepare_config(self.zenoh_config), **kwargs) - # Set up connection establishment self.established = False def _prepare_config(self, zenoh_kwargs): @@ -203,11 +203,20 @@ def on_message(self, sample): :param sample: zenoh.Sample: Zenoh sample payload """ try: - np_data = np.frombuffer(sample.payload.to_bytes(), dtype=np.uint8) + # Split payload into header and image data + payload = sample.payload.to_bytes() + header_bytes, img_bytes = payload.split(b'\n', 1) # Split at the first newline + + header = json.loads(header_bytes.decode('utf-8')) + np_data = np.frombuffer(img_bytes, dtype=np.uint8) + if self.jpg: img = cv2.imdecode(np_data, cv2.IMREAD_COLOR if self.rgb else cv2.IMREAD_GRAYSCALE) else: - img = np_data.reshape(self.height, self.width, 3 if self.rgb else 1) + shape = header.get("shape", (self.height, self.width, 3 if self.rgb else 1)) + img = np_data.reshape(shape) + + # Place the decoded image into the message queue self._message_queue.put(img) except Exception as e: logging.error(f"Failed to process image message: {e}") @@ -257,8 +266,13 @@ def on_message(self, sample): :param sample: zenoh.Sample: Zenoh sample payload """ try: - aud_array = np.frombuffer(sample.payload.to_bytes(), dtype=np.float32).reshape(-1, self.channels) - self._message_queue.put((aud_array, self.rate)) + payload = sample.payload.to_bytes() + header_bytes, aud_bytes = payload.split(b'\n', 1) + header = json.loads(header_bytes.decode('utf-8')) + shape = header.get("shape") + rate = header.get("rate") + aud_array = np.frombuffer(aud_bytes, dtype=np.float32).reshape(shape) + self._message_queue.put((aud_array, rate)) except Exception as e: logging.error(f"Failed to process audio message: {e}") diff --git a/wrapyfi/middlewares/zenoh.py b/wrapyfi/middlewares/zenoh.py index c63b812..9f2f6d5 100644 --- a/wrapyfi/middlewares/zenoh.py +++ b/wrapyfi/middlewares/zenoh.py @@ -75,6 +75,7 @@ def register_callback(self, topic: str, callback): self.subscribers[topic] = self.session.declare_subscriber(topic, callback) logging.info(f"[ZenohMiddlewarePubSub] Registered callback for topic {topic}") + def is_connected(self) -> bool: """ Checks if the Zenoh session is active. diff --git a/wrapyfi/publishers/zenoh.py b/wrapyfi/publishers/zenoh.py index 337931f..6eb23b9 100644 --- a/wrapyfi/publishers/zenoh.py +++ b/wrapyfi/publishers/zenoh.py @@ -72,9 +72,10 @@ def __init__( self.zenoh_config = { "mode": mode, "connect/endpoints": ZENOH_CONNECT if isinstance(ZENOH_CONNECT, list) else ZENOH_CONNECT.split(",") if isinstance(ZENOH_CONNECT, str) else [f"tcp/{ip}:{port}"], - "listen/endpoints": ZENOH_LISTEN if isinstance(ZENOH_LISTEN, list) else ZENOH_LISTEN.split(",") if isinstance(ZENOH_LISTEN, str) else [f"tcp/{ip}:{port}"], **(zenoh_kwargs or {}) } + if ZENOH_LISTEN: + self.zenoh_config["listen/endpoints"] = ZENOH_LISTEN if isinstance(ZENOH_LISTEN, list) else ZENOH_LISTEN.split(",") ZenohMiddlewarePubSub.activate(config=self._prepare_config(self.zenoh_config), **kwargs) @@ -254,12 +255,12 @@ def publish(self, img: np.ndarray): time.sleep(0.2) if ( - 0 < self.width != img.shape[1] - or 0 < self.height != img.shape[0] - or not ( + 0 < self.width != img.shape[1] + or 0 < self.height != img.shape[0] + or not ( (img.ndim == 2 and not self.rgb) or (img.ndim == 3 and self.rgb and img.shape[2] == 3) - ) + ) ): raise ValueError("Incorrect image shape for publisher") if not img.flags["C_CONTIGUOUS"]: @@ -273,7 +274,10 @@ def publish(self, img: np.ndarray): img_bytes = img.tobytes() header = {"timestamp": time.time(), "shape": img.shape, "dtype": str(img.dtype)} - ZenohMiddlewarePubSub._instance.session.put(self.out_topic, [header, img_bytes]) + header_bytes = json.dumps(header).encode('utf-8') + payload = header_bytes + b'\n' + img_bytes + + ZenohMiddlewarePubSub._instance.session.put(self.out_topic, payload) @Publishers.register("AudioChunk", "zenoh") @@ -310,6 +314,8 @@ def __init__( self.rate = rate self.chunk = chunk + import json + def publish(self, aud: Tuple[np.ndarray, int]): """ Publish the audio chunk to the middleware. @@ -328,14 +334,25 @@ def publish(self, aud: Tuple[np.ndarray, int]): return if 0 < self.rate != rate: raise ValueError("Incorrect audio rate for publisher") + chunk, channels = aud_array.shape if len(aud_array.shape) > 1 else (aud_array.shape[0], 1) self.chunk = chunk if self.chunk == -1 else self.chunk self.channels = channels if self.channels == -1 else self.channels if 0 < self.chunk != chunk or 0 < self.channels != channels: raise ValueError("Incorrect audio shape for publisher") - aud_array = np.require(aud_array, dtype=np.float32, requirements="C") + aud_array = np.require(aud_array, dtype=np.float32, requirements="C") aud_bytes = aud_array.tobytes() - header = {"shape": aud_array.shape, "dtype": str(aud_array.dtype), "rate": rate, "timestamp": time.time()} - ZenohMiddlewarePubSub._instance.session.put(self.out_topic, [header, aud_bytes]) + + header = { + "shape": aud_array.shape, + "dtype": str(aud_array.dtype), + "rate": rate, + "timestamp": time.time() + } + header_bytes = json.dumps(header).encode('utf-8') + payload = header_bytes + b'\n' + aud_bytes + + ZenohMiddlewarePubSub._instance.session.put(self.out_topic, payload) +