Skip to content

Commit

Permalink
Support ROS <=> Zenoh CI.
Browse files Browse the repository at this point in the history
Signed-off-by: ChenYing Kuo <[email protected]>
  • Loading branch information
evshary committed Dec 12, 2024
1 parent ac34f8b commit 490e5c6
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 32 deletions.
1 change: 1 addition & 0 deletions zenoh-test-ros2dds/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ cdr = "0.2.4"
futures = "0.3.26"
r2r = "0.9"
serde = "1.0.154"
serde_derive = "1.0.154"
serde_json = "1.0.114"
tokio = { version = "1.35.1", default-features = false } # Default features are disabled due to some crates' requirements
zenoh = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [
Expand Down
42 changes: 42 additions & 0 deletions zenoh-test-ros2dds/tests/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use zenoh::{
config::Config,
internal::{plugins::PluginsManager, runtime::RuntimeBuilder},
};
use zenoh_config::ModeDependentValue;

pub fn init_env() {
std::env::set_var("RMW_IMPLEMENTATION", "rmw_cyclonedds_cpp");
}

pub async fn create_bridge() {
let mut plugins_mgr = PluginsManager::static_plugins_only();
plugins_mgr.declare_static_plugin::<zenoh_plugin_ros2dds::ROS2Plugin, &str>("ros2dds", true);
let mut config = Config::default();
config.insert_json5("plugins/ros2dds", "{}").unwrap();
config
.timestamping
.set_enabled(Some(ModeDependentValue::Unique(true)))
.unwrap();
config.adminspace.set_enabled(true).unwrap();
config.plugins_loading.set_enabled(true).unwrap();
let mut runtime = RuntimeBuilder::new(config)
.plugins_manager(plugins_mgr)
.build()
.await
.unwrap();
runtime.start().await.unwrap();
}
150 changes: 150 additions & 0 deletions zenoh-test-ros2dds/tests/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

pub mod common;

use std::time::Duration;

use futures::StreamExt;
use r2r::{self, QosProfile};
use serde_derive::{Deserialize, Serialize};
use tokio::sync::oneshot;
use zenoh::Wait;

// The test service
const TEST_SERVICE_Z2R: &str = "test_service_z2r";
const TEST_SERVICE_R2Z: &str = "test_service_r2Z";

#[derive(Serialize, Deserialize, PartialEq, Clone)]
struct AddTwoIntsRequest {
a: i64,
b: i64,
}
#[derive(Serialize, Deserialize, PartialEq, Clone)]
struct AddTwoIntsReply {
sum: i64,
}

#[tokio::test(flavor = "multi_thread")]
async fn test_ros_client_zenoh_service() {
common::init_env();
// Create zenoh-bridge-ros2dds
tokio::spawn(common::create_bridge());

let a = 1;
let b = 2;

// Zenoh service
let session = zenoh::open(zenoh::Config::default()).await.unwrap();
let _queryable = session
.declare_queryable(TEST_SERVICE_R2Z)
.callback(|query| {
let request: AddTwoIntsRequest =
cdr::deserialize(&query.payload().unwrap().to_bytes()).unwrap();
let response = AddTwoIntsReply {
sum: request.a + request.b,
};
let data = cdr::serialize::<_, _, cdr::CdrLe>(&response, cdr::Infinite).unwrap();
query.reply(TEST_SERVICE_R2Z, data).wait().unwrap();
})
.await
.unwrap();

// ROS client
let ctx = r2r::Context::create().unwrap();
let mut node = r2r::Node::create(ctx, "ros_client", "").unwrap();
let client = node
.create_client::<r2r::example_interfaces::srv::AddTwoInts::Service>(
&format!("/{}", TEST_SERVICE_R2Z),
QosProfile::default(),
)
.unwrap();

// Node spin
let (term_tx, mut term_rx) = oneshot::channel();
let _handler = tokio::task::spawn_blocking(move || {
while term_rx.try_recv().is_err() {
node.spin_once(std::time::Duration::from_millis(100));
}
});

// Wait for the environment to be ready
tokio::time::sleep(Duration::from_secs(1)).await;

// Send the request and then process the response
let my_req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b };
let resp = client.request(&my_req).unwrap().await.unwrap();

assert_eq!(resp.sum, a + b);

term_tx.send(()).unwrap();
}

#[tokio::test(flavor = "multi_thread")]
async fn test_zenoh_client_ros_service() {
common::init_env();
// Create zenoh-bridge-ros2dds
tokio::spawn(common::create_bridge());

let a = 1;
let b = 2;

// ROS service
let ctx = r2r::Context::create().unwrap();
let mut node = r2r::Node::create(ctx, "ros_service", "").unwrap();
let mut service = node
.create_service::<r2r::example_interfaces::srv::AddTwoInts::Service>(
&format!("/{}", TEST_SERVICE_Z2R),
QosProfile::default(),
)
.unwrap();
// Processing the requests and send back responses
tokio::spawn(async move {
while let Some(req) = service.next().await {
let resp = r2r::example_interfaces::srv::AddTwoInts::Response {
sum: req.message.a + req.message.b,
};
req.respond(resp).unwrap();
}
});

// Node spin
let (term_tx, mut term_rx) = oneshot::channel();
let _handler = tokio::task::spawn_blocking(move || {
while term_rx.try_recv().is_err() {
node.spin_once(std::time::Duration::from_millis(100));
}
});

// Zenoh client
let session = zenoh::open(zenoh::Config::default()).await.unwrap();
let client = session.declare_querier(TEST_SERVICE_Z2R).await.unwrap();

// Wait for the environment to be ready
tokio::time::sleep(Duration::from_secs(1)).await;

// Send request to ROS service
let req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b };
let buf = cdr::serialize::<_, _, cdr::CdrLe>(&req, cdr::size::Infinite).unwrap();
let recv_handler = client.get().payload(buf).await.unwrap();

// Process the response
let reply = recv_handler.recv().unwrap();
let reader = reply.result().unwrap().payload().reader();
let result: Result<i64, _> = cdr::deserialize_from(reader, cdr::size::Infinite);

assert_eq!(result.unwrap(), a + b);

term_tx.send(()).unwrap();
}
38 changes: 6 additions & 32 deletions zenoh-test-ros2dds/tests/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,51 +12,25 @@
// ZettaScale Zenoh Team, <[email protected]>
//

pub mod common;

use std::{sync::mpsc::channel, time::Duration};

use futures::StreamExt;
use r2r::{self, QosProfile};
use zenoh::{
config::Config,
internal::{plugins::PluginsManager, runtime::RuntimeBuilder},
};
use zenoh_config::ModeDependentValue;

// The test topic
const TEST_TOPIC: &str = "test_topic";
// The test TEST_PAYLOAD
const TEST_PAYLOAD: &str = "Hello World";

fn init_env() {
std::env::set_var("RMW_IMPLEMENTATION", "rmw_cyclonedds_cpp");
}

async fn create_bridge() {
let mut plugins_mgr = PluginsManager::static_plugins_only();
plugins_mgr.declare_static_plugin::<zenoh_plugin_ros2dds::ROS2Plugin, &str>("ros2dds", true);
let mut config = Config::default();
config.insert_json5("plugins/ros2dds", "{}").unwrap();
config
.timestamping
.set_enabled(Some(ModeDependentValue::Unique(true)))
.unwrap();
config.adminspace.set_enabled(true).unwrap();
config.plugins_loading.set_enabled(true).unwrap();
let mut runtime = RuntimeBuilder::new(config)
.plugins_manager(plugins_mgr)
.build()
.await
.unwrap();
runtime.start().await.unwrap();
}

#[tokio::test(flavor = "multi_thread")]
async fn test_zenoh_pub_ros_sub() {
init_env();
common::init_env();
let (tx, rx) = channel();

// Create zenoh-bridge-ros2dds
tokio::spawn(create_bridge());
tokio::spawn(common::create_bridge());

// ROS subscriber
let ctx = r2r::Context::create().unwrap();
Expand Down Expand Up @@ -94,9 +68,9 @@ async fn test_zenoh_pub_ros_sub() {

#[tokio::test(flavor = "multi_thread")]
async fn test_ros_pub_zenoh_sub() {
init_env();
common::init_env();
// Create zenoh-bridge-ros2dds
tokio::spawn(create_bridge());
tokio::spawn(common::create_bridge());

// Zenoh subscriber
let session = zenoh::open(zenoh::Config::default()).await.unwrap();
Expand Down

0 comments on commit 490e5c6

Please sign in to comment.