Skip to content

Commit

Permalink
Zenoh PUB/SUB added to the list of supported middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
fabawi committed Nov 1, 2024
1 parent ec96173 commit 267c3a8
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 63 deletions.
46 changes: 22 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ Wrapyfi is a middleware communication wrapper for transmitting data across nodes
altering the operation pipeline of your Python scripts. Wrapyfi introduces
a number of helper functions to make middleware integration possible without the need to learn an entire framework, just to parallelize your processes on
multiple machines.
Wrapyfi supports [YARP](https://www.yarp.it/yarp_swig.html), [ROS](http://wiki.ros.org/rospy), [ROS 2](https://docs.ros2.org/foxy/api/rclpy/index.html), [ZeroMQ](http://zeromq.org/), [Websocket](https://socket.io/), and [MQTT](https://mqtt.org).


Wrapyfi supports [YARP](https://www.yarp.it/yarp_swig.html), [ROS](http://wiki.ros.org/rospy), [ROS 2](https://docs.ros2.org/foxy/api/rclpy/index.html), [ZeroMQ](http://zeromq.org/), [Websocket](https://socket.io/), [Zenoh](https://zenoh.io/) and [MQTT](https://mqtt.org).


# Attribution
Expand All @@ -61,33 +59,37 @@ Please refer to the following [paper](https://www2.informatik.uni-hamburg.de/wtm

# Getting Started

Before using Wrapyfi, YARP, ROS, ZeroMQ, Websocket, or MQTT must be installed.
Before using Wrapyfi, YARP, ROS, ZeroMQ, Websocket, Zenoh, or MQTT must be installed.

* Follow the [YARP installation guide](https://github.com/fabawi/wrapyfi/tree/main/wrapyfi_extensions/yarp/README.md?rank=0).<!-- [YARP installation guide](docs/yarp_install_lnk.md). -->
Note that the iCub package is not needed for Wrapyfi to work and does not have to be installed if you do not intend to use the iCub robot.
* **YARP**: Follow the [YARP installation guide](https://github.com/fabawi/wrapyfi/tree/main/wrapyfi_extensions/yarp/README.md?rank=0).<!-- [YARP installation guide](docs/yarp_install_lnk.md). -->
The `yarpserver` server must be running before running any YARP-based scripts. Note that the iCub package is not needed for Wrapyfi to work and does not have to be installed if you do not intend to use the iCub robot.

* For installing ROS, follow the ROS installation guide [\[Ubuntu\]](http://wiki.ros.org/noetic/Installation/Ubuntu)[\[Windows\]](https://wiki.ros.org/noetic/Installation/Windows).
We recommend installing ROS on Conda using the [RoboStack](https://github.com/RoboStack/ros-noetic) environment. Additionally, the
* **ROS**: For installing ROS, follow the ROS installation guide [\[Ubuntu\]](http://wiki.ros.org/noetic/Installation/Ubuntu)[\[Windows\]](https://wiki.ros.org/noetic/Installation/Windows).
We recommend installing ROS on Conda using the [RoboStack](https://github.com/RoboStack/ros-noetic) environment. The `roscore` server must be running before running any ROS-based scripts. Additionally, the
[Wrapyfi ROS interfaces](https://github.com/modular-ml/wrapyfi_ros_interfaces/blob/master/README.md?rank=0) must be
built to support messages needed for audio transmission [![ROS Package Index](https://img.shields.io/ros/v/noetic/wrapyfi_ros_interfaces)](https://index.ros.org/r/wrapyfi_ros_interfaces/#noetic)

* For installing ROS 2, follow the ROS 2 installation guide [\[Ubuntu\]](https://docs.ros.org/en/humble/Installation/Ubuntu-Install-Debians.html)[\[Windows\]](https://docs.ros.org/en/humble/Installation/Windows-Install-Binary.html).
* **ROS 2**: For installing ROS 2, follow the ROS 2 installation guide [\[Ubuntu\]](https://docs.ros.org/en/humble/Installation/Ubuntu-Install-Debians.html)[\[Windows\]](https://docs.ros.org/en/humble/Installation/Windows-Install-Binary.html).
We recommend installing ROS 2 on Conda using the [RoboStack](https://github.com/RoboStack/ros-humble) environment. Additionally, the
[Wrapyfi ROS 2 interfaces](https://github.com/modular-ml/wrapyfi_ros2_interfaces/blob/master/README.md?rank=0)
must be built to support messages and services needed for audio transmission and the REQ/REP pattern [![ROS Package Index](https://img.shields.io/ros/v/humble/wrapyfi_ros2_interfaces)](https://index.ros.org/p/wrapyfi_ros2_interfaces/#humble)

* ZeroMQ can be installed using pip: `pip install pyzmq`.
* **ZeroMQ**: ZeroMQ can be installed using pip: `pip install pyzmq`.
The XPUB/XSUB and XREQ/XREP patterns followed in our ZeroMQ implementation requires a proxy broker. A broker is spawned by default as a daemon process.
To avoid automatic spawning, pass the argument `start_proxy_broker=False` to the method register decorator.
A standalone broker can be found [here](https://github.com/fabawi/wrapyfi/tree/main/wrapyfi/standalone/zeromq_proxy_broker.py)

* Websocket can be installed using pip: `pip install python-socketio`.
* **Websocket**: Websocket can be installed using pip: `pip install python-socketio`.
The PUB/SUB pattern followed in our Websocket implementation requires a socket server. We recommend setting the server
to run using [Flask-SocketIO](https://flask-socketio.readthedocs.io/en/latest/) which can be installed with `pip install flask-socketio`.
Note that the server must be running and also scripted to forward messages to the listening from the publishing client as demonstrated in the example found
[here](https://github.com/fabawi/wrapyfi/tree/main/wrapyfi/examples/websockets/websocket_server.py)

* MQTT can be installed using pip: `pip install paho-mqtt`.
* **Zenoh**: Zenoh can be installed using pip: `pip install zenoh`. It is recommended to use the `WRAPYFI_ZENOH_MODE` environment variable to set the mode to `peer` for running in peer-to-peer mode.
The PUB/SUB pattern followed in our Zenoh implementation requires a router. To install the Zenoh router, follow the instructions found [here](https://github.com/eclipse-zenoh/zenoh/?tab=readme-ov-file#how-to-install-it).
The `zenohd` router must be running before running any Zenoh-based scripts. *NOTE*: The `zenohd --rest-http-port 8082` command must be executed with an arbitrary (non-conflicting) port to avoid collision with other services occupying the default port (8000).

* **MQTT**: MQTT can be installed using pip: `pip install paho-mqtt`.
The PUB/SUB pattern followed in our MQTT implementation requires a broker. The default broker used by Wrapyfi [broker.emqx.io](https://broker.emqx.io). However,
this broker is not recommended for production use or for transmitting video/audio as it is a public online broker and requires an internet connection (not secure and suffers high latency). We recommend setting up a local broker
using [Mosquitto](https://mosquitto.org/download/). A Dockerized version can be found [here](https://github.com/sukesh-ak/setup-mosquitto-with-docker). The broker must be running, and the `WRAPYFI_MQTT_BROKER_ADDRESS` as well as `WRAPYFI_MQTT_BROKER_PORT` environment variables must be set to the
Expand Down Expand Up @@ -124,6 +126,7 @@ def send_message(self):
* ROS 2 Humble Hawksbill **|** Galactic Geochelone **|** Foxy Fitzroy
* PyZMQ 16.0, 17.1 and 19.0
* Python-SocketIO >= 5.0.4
* Eclipse-Zenoh >= 1.0.0
* Paho-MQTT >= 2.0 *(Hard-coded to v2 in Wrapyfi and not compatible with v1)*


Expand Down Expand Up @@ -181,13 +184,19 @@ or when installing Wrapyfi to work with websockets (headless) including `numpy`,
pip install .[headless_websockets]
```

or when installing Wrapyfi to work with Zenoh (headless) including `numpy`, `opencv-python-headless`, and `eclipse-zenoh`:

```
pip install .[headless_zenoh]
```

or when installing Wrapyfi to work with MQTT (headless) including `numpy`, `opencv-python-headless`, and `paho-mqtt`:

```
pip install .[headless_mqtt]
```

or install Wrapyfi *without* NumPy, OpenCV, ZeroMQ, Websocket, and MQTT:
or install Wrapyfi *without* NumPy, OpenCV, ZeroMQ, Websocket, Zenoh, and MQTT:

```
pip install .
Expand Down Expand Up @@ -367,22 +376,11 @@ For more examples of usage, refer to the [user guide](docs/usage.md). Run script

# Supported Formats

## Middleware
- [x] **YARP**
- [x] **ROS**
- [x] **ROS 2**
- [x] **ZeroMQ** [*beta feature*]:
* `should_wait` trigger introduced with event monitoring
* Event monitoring currently cannot be disabled [![planned](https://custom-icon-badges.demolab.com/badge/planned%20for%20Wrapyfi%20v0.5-%23C2E0C6.svg?logo=hourglass&logoColor=white)](https://github.com/modular-ml/wrapyfi/issues/99 "planned link")
- [x] **Websocket** *Only PUB/SUB* [*alpha support*]
- [x] **MQTT** *Only PUB/SUB* [*alpha support*]

## Serializers
- [x] **JSON**
- [ ] **msgpack**
- [ ] **protobuf**


## Data Structures

Supported Objects by the `NativeObject` type include:
Expand Down
2 changes: 1 addition & 1 deletion docs/exclude_packages.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
["torch", "tensorflow", "collections", "std_msgs.msg", "std_msgs.msg.String", "std_msgs.msg.Image", "dask.array", "dask.dataframe", "numpy", "cupy.ndarray", "wrapyfi.encoders", "wrapyfi.connect.wrapper", "wrapyfi.connect.listeners", "PIL", "importlib", "scipy.stats", "rostopic", "Cython.Compiler.UtilNodes", "jax.numpy", "jax.numpy.DeviceArray", "jax.Array", "jaxlib.xla_extension.ArrayImpl", "trax", "trax.fastmath", "trax.fastmath.numpy", "trax.fastmath", "rospy", "wrapyfi.connect.clients", "xarray.DataArray", "xarray.Dataset", "wrapyfi_ros2_interfaces.srv", "zipfile", "wrapyfi.config.manager", "Utils", "zipimport", "rclpy.node", "cv2", "wrapyfi.tests.tools.class_test", "wrapyfi.connect.servers", "pandas.DataFrame", "pandas.Series", "wrapyfi.utils", "yarp", "rclpy", "astropy.table", "pint", "mxnet", "wrapyfi.connect.publishers", "pexpect", "zarr.Array", "zarr.Group", "sensor_msgs.msg", "gzip", "zmq", "pyarrow.StructArray", "paddle", "sounddevice", "traceback", "geometry_msgs.msg", "astropy", "tempfile"]
["zenoh", "torch", "tensorflow", "collections", "std_msgs.msg", "std_msgs.msg.String", "std_msgs.msg.Image", "dask.array", "dask.dataframe", "numpy", "cupy.ndarray", "wrapyfi.encoders", "wrapyfi.connect.wrapper", "wrapyfi.connect.listeners", "PIL", "importlib", "scipy.stats", "rostopic", "Cython.Compiler.UtilNodes", "jax.numpy", "jax.numpy.DeviceArray", "jax.Array", "jaxlib.xla_extension.ArrayImpl", "trax", "trax.fastmath", "trax.fastmath.numpy", "trax.fastmath", "rospy", "wrapyfi.connect.clients", "xarray.DataArray", "xarray.Dataset", "wrapyfi_ros2_interfaces.srv", "zipfile", "wrapyfi.config.manager", "Utils", "zipimport", "rclpy.node", "cv2", "wrapyfi.tests.tools.class_test", "wrapyfi.connect.servers", "pandas.DataFrame", "pandas.Series", "wrapyfi.utils", "yarp", "rclpy", "astropy.table", "pint", "mxnet", "wrapyfi.connect.publishers", "pexpect", "zarr.Array", "zarr.Group", "sensor_msgs.msg", "gzip", "zmq", "pyarrow.StructArray", "paddle", "sounddevice", "traceback", "geometry_msgs.msg", "astropy", "tempfile"]
31 changes: 23 additions & 8 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ All messages are transmitted using the `zmq` Python bindings. Transmission follo
#### Websocket:

```{note}
Websocket assumes a server is running on the specified address and port. The forwarding of messages canonly be done manually by the user. An example server can be found [here](https://github.com/fabawi/wrapyfi/tree/main/wrapyfi/examples/websockets/websocket_server.py)
Websocket assumes a server is running on the specified address and port. The forwarding of messages can only be done manually by the user. An example server can be found [here](https://github.com/fabawi/wrapyfi/tree/main/wrapyfi/examples/websockets/websocket_server.py)
```

All messages are transmitted using the `python-socketio` Python bindings. Transmission follows the [socket.io protocol](https://socket.io/docs/v4/)
Expand All @@ -259,6 +259,15 @@ All messages are transmitted using the `python-socketio` Python bindings. Transm
`socketio.emit` for publishing and `socketio.on` for receiving messages
* **Properties**: Transmits properties [![planned](https://custom-icon-badges.demolab.com/badge/planned%20for%20Wrapyfi%20v0.5-%23C2E0C6.svg?logo=hourglass&logoColor=white)](https://github.com/modular-ml/wrapyfi/issues/99 "planned link")

#### Zenoh:

All messages are transmitted using the `eclipse-zenoh` Python bindings. Transmission follows the [zenoh protocol](https://zenoh.io/)

* **Image**: Transmits and receives a `cv2` or `numpy` image wrapped in the `NativeObject` construct as `zenoh.Bytes`
* **AudioChunk**: Transmits and receives a `numpy` audio chunk wrapped in the `NativeObject` construct as `zenoh.Bytes`
* **NativeObject**: Transmits and receives a `json` string supporting all native Python objects, `numpy` arrays and [other formats](<Plugins.md#data-structure-types>) as `zenoh.Bytes` using
`zenoh.session.key.put` for publishing and an asynchronus callback for receiving messages

#### MQTT:

```{note}
Expand Down Expand Up @@ -412,15 +421,15 @@ Wrapyfi natively supports a [number of middleware](#middleware). However, more m
* Ensure that the middleware communication pattern scripts reside within directories named `listeners`, `publishers`, `clients`, or `servers` nested inside the `WRAPYFI_MWARE_PATH` and that the directory contains an `__init__.py` file

### Natively Supported Middleware
- **YARP**
- **ROS**
- **ROS 2**
- **ZeroMQ** [*beta feature*]:
- [YARP](https://www.yarp.it/yarp_swig.html)
- [ROS](http://wiki.ros.org/rospy)
- [ROS 2](https://docs.ros2.org/foxy/api/rclpy/index.html)
- [ZeroMQ](http://zeromq.org/) [*beta feature*]:
* `should_wait` trigger introduced with event monitoring
* Event monitoring currently cannot be disabled [![planned](https://custom-icon-badges.demolab.com/badge/planned%20for%20Wrapyfi%20v0.5-%23C2E0C6.svg?logo=hourglass&logoColor=white)](https://github.com/modular-ml/wrapyfi/issues/99 "planned link")
- **Websocket** *Only PUB/SUB* [*alpha support*]
- **MQTT** *Only PUB/SUB* [*alpha support*]

- [Websocket](https://socket.io/) *Only PUB/SUB* [*alpha support*]
- [Zenoh](https://zenoh.io/) *Only PUB/SUB* [*alpha support*]
- [MQTT](https://mqtt.org) *Only PUB/SUB* [*alpha support*]

## Plugins

Expand Down Expand Up @@ -566,6 +575,12 @@ This can be achieved by setting:
* `WRAPYFI_WEBSOCKET_SOCKET_PORT`: The socket port. Defaults to 5000
* `WRAPYFI_WEBSOCKET_NAMESPACE`: The socket namespace. Defaults to "/"
* `WRAPYFI_WEBSOCKET_MONITOR_LISTENER_SPAWN`: Either spawn the websocket monitor listener as a "process" or "thread". Defaults to "thread" which is the only supported option for now
* `WRAPYFI_ZENOH_IP`: IP address of the Zenoh socket. Defaults to "127.0.0.1"
* `WRAPYFI_ZENOH_PORT`: The Zenoh socket port. Defaults to 7447
* `WRAPYFI_ZENOH_MODE`: The Zenoh mode indicating whether to use the router as a broker or adopt peer-to-peer communication. Defaults to "peer"
* `WRAPYFI_ZENOH_CONNECT`: The Zenoh connect endpoints seperated by a comma e.g., "tcp/127.0.0.1:7447,udp/127.0.0.1:7448". This overrides `WRAPYFI_ZENOH_IP` and `WRAPYFI_ZENOH_PORT`. Defaults to an empty list
* `WRAPYFI_ZENOH_LISTEN`: The Zenoh listen endpoints seperated by a comma e.g., "tcp/127.0.0.1:7446". Defaults to an empty list
* `WRAPYFI_ZENOH_CONFIG_FILEPATH`: The Zenoh configuration file path. Defaults to None. Conflicting keys are overriden by `WRAPYFI_ZENOH_IP`, `WRAPYFI_ZENOH_PORT`, `WRAPYFI_ZENOH_CONNECT`, and `WRAPYFI_ZENOH_LISTEN`
* `WRAPYFI_MQTT_BROKER_ADDRESS`: The MQTT broker address. Defaults to "broker.emqx.io"
* `WRAPYFI_MQTT_BROKER_PORT`: The MQTT broker port. Defaults to 1883

Expand Down
11 changes: 10 additions & 1 deletion docs/usage/User Guide/Communication Patterns.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ All messages are transmitted using the `zmq` Python bindings. Transmission follo
#### Websocket:

```{note}
Websocket assumes a server is running on the specified address and port. The forwarding of messages canonly be done manually by the user. An example server can be found [here](https://github.com/fabawi/wrapyfi/tree/main/wrapyfi/examples/websockets/websocket_server.py)
Websocket assumes a server is running on the specified address and port. The forwarding of messages can only be done manually by the user. An example server can be found [here](https://github.com/fabawi/wrapyfi/tree/main/wrapyfi/examples/websockets/websocket_server.py)
```

All messages are transmitted using the `python-socketio` Python bindings. Transmission follows the [socket.io protocol](https://socket.io/docs/v4/)
Expand All @@ -94,6 +94,15 @@ All messages are transmitted using the `python-socketio` Python bindings. Transm
`socketio.emit` for publishing and `socketio.on` for receiving messages
* **Properties**: Transmits properties [![planned](https://custom-icon-badges.demolab.com/badge/planned%20for%20Wrapyfi%20v0.5-%23C2E0C6.svg?logo=hourglass&logoColor=white)](https://github.com/modular-ml/wrapyfi/issues/99 "planned link")

#### Zenoh:

All messages are transmitted using the `eclipse-zenoh` Python bindings. Transmission follows the [zenoh protocol](https://zenoh.io/)

* **Image**: Transmits and receives a `cv2` or `numpy` image wrapped in the `NativeObject` construct as `zenoh.Bytes`
* **AudioChunk**: Transmits and receives a `numpy` audio chunk wrapped in the `NativeObject` construct as `zenoh.Bytes`
* **NativeObject**: Transmits and receives a `json` string supporting all native Python objects, `numpy` arrays and [other formats](<Plugins.md#data-structure-types>) as `zenoh.Bytes` using
`zenoh.session.key.put` for publishing and an asynchronus callback for receiving messages

#### MQTT:

```{note}
Expand Down
8 changes: 7 additions & 1 deletion docs/usage/User Guide/Environment Variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Wrapyfi reserves specific environment variable names for the functionality of it
ZeroMQ requires socket configurations that can be passed as arguments to the respective middleware constructor (through the Wrapyfi decorator) or using environment variables. Note that these configurations are needed both by the proxy and the message publisher and listener.
The downside to such an approach is that all messages share the same configs. Since the proxy broker spawns once on first trigger (if enabled) as well as a singleton subscriber monitoring instance, using environment variables is the recommended approach to avoid unintended behavior.
This can be achieved by setting:

* `WRAPYFI_ZEROMQ_SOCKET_IP`: IP address of the socket. Defaults to "127.0.0.1"
* `WRAPYFI_ZEROMQ_SOCKET_PUB_PORT`: The publishing socket port. Defaults to 5555
* `WRAPYFI_ZEROMQ_SOCKET_SUB_PORT`: The sub-socket port (listening port for the broker). Defaults to 5556
Expand All @@ -25,6 +25,12 @@ This can be achieved by setting:
* `WRAPYFI_WEBSOCKET_SOCKET_PORT`: The socket port. Defaults to 5000
* `WRAPYFI_WEBSOCKET_NAMESPACE`: The socket namespace. Defaults to "/"
* `WRAPYFI_WEBSOCKET_MONITOR_LISTENER_SPAWN`: Either spawn the websocket monitor listener as a "process" or "thread". Defaults to "thread" which is the only supported option for now
* `WRAPYFI_ZENOH_IP`: IP address of the Zenoh socket. Defaults to "127.0.0.1"
* `WRAPYFI_ZENOH_PORT`: The Zenoh socket port. Defaults to 7447
* `WRAPYFI_ZENOH_MODE`: The Zenoh mode indicating whether to use the router as a broker or adopt peer-to-peer communication. Defaults to "peer"
* `WRAPYFI_ZENOH_CONNECT`: The Zenoh connect endpoints seperated by a comma e.g., "tcp/127.0.0.1:7447,udp/127.0.0.1:7448". This overrides `WRAPYFI_ZENOH_IP` and `WRAPYFI_ZENOH_PORT`. Defaults to an empty list
* `WRAPYFI_ZENOH_LISTEN`: The Zenoh listen endpoints seperated by a comma e.g., "tcp/127.0.0.1:7446". Defaults to an empty list
* `WRAPYFI_ZENOH_CONFIG_FILEPATH`: The Zenoh configuration file path. Defaults to None. Conflicting keys are overriden by `WRAPYFI_ZENOH_IP`, `WRAPYFI_ZENOH_PORT`, `WRAPYFI_ZENOH_CONNECT`, and `WRAPYFI_ZENOH_LISTEN`
* `WRAPYFI_MQTT_BROKER_ADDRESS`: The MQTT broker address. Defaults to "broker.emqx.io"
* `WRAPYFI_MQTT_BROKER_PORT`: The MQTT broker port. Defaults to 1883

Expand Down
Loading

0 comments on commit 267c3a8

Please sign in to comment.