From 864a532b34b77785b772857102b1a41de114dab5 Mon Sep 17 00:00:00 2001 From: "ChenYing Kuo (CY)" Date: Mon, 25 Nov 2024 19:26:19 +0800 Subject: [PATCH] Support ROS_AUTOMATIC_DISCOVERY_RANGE and ROS_STATIC_PEERS (#334) Signed-off-by: ChenYing Kuo --- Cargo.lock | 34 ++++++ Cargo.toml | 1 + DEFAULT_CONFIG.json5 | 22 ++++ README.md | 2 +- zenoh-bridge-ros2dds/src/bridge_args.rs | 16 +++ zenoh-plugin-ros2dds/Cargo.toml | 1 + zenoh-plugin-ros2dds/src/config.rs | 95 ++++++++++++++++- zenoh-plugin-ros2dds/src/lib.rs | 132 +++++++++++++++++++----- 8 files changed, 275 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9f99743..6ef5b2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3481,6 +3481,39 @@ dependencies = [ "syn 2.0.77", ] +[[package]] +name = "test-case" +version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb2550dd13afcd286853192af8601920d959b14c401fcece38071d53bf0768a8" +dependencies = [ + "test-case-macros", +] + +[[package]] +name = "test-case-core" +version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adcb7fd841cd518e279be3d5a3eb0636409487998a4aff22f3de87b81e88384f" +dependencies = [ + "cfg-if 1.0.0", + "proc-macro2", + "quote", + "syn 2.0.77", +] + +[[package]] +name = "test-case-macros" +version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c89e72a01ed4c579669add59014b9a524d609c0c88c6a585ce37485879f6ffb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", + "test-case-core", +] + [[package]] name = "thiserror" version = "1.0.63" @@ -4750,6 +4783,7 @@ dependencies = [ "rustc_version 0.4.1", "serde", "serde_json", + "test-case", "tokio", "tracing", "zenoh", diff --git a/Cargo.toml b/Cargo.toml index b5b2616..23ce06a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ regex = "1.7.1" rustc_version = "0.4" serde = "1.0.154" serde_json = "1.0.114" +test-case = { version = "3.3.1" } tokio = { version = "1.35.1", default-features = false } # Default features are disabled due to some crates' requirements tracing = "0.1" zenoh = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [ diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 0b6bd28..852e390 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -36,6 +36,28 @@ //// // ros_localhost_only: true, + //// + //// ros_automatic_discovery_range: controls how far ROS nodes will try to discover each other. + //// Valid options are: + //// * "SUBNET" is the default, and for DDS based middleware it means it will discover any node reachable via multicast. + //// * "LOCALHOST" means a node will only try to discover other nodes on the same machine. + //// * "OFF" means the node won’t discover any other nodes, even on the same machine. + //// * "SYSTEM_DEFAULT" means “don’t change any discovery settings”. + //// + //// Note that this is only available with ROS 2 Iron and later version + //// For more information: https://docs.ros.org/en/rolling/Tutorials/Advanced/Improved-Dynamic-Discovery.html + //// + // ros_automatic_discovery_range: "SUBNET", + + //// + //// ros_static_peers: is a semicolon (;) separated list of addresses that ROS should try to discover nodes on. + //// This allows connecting to nodes on specific machines (as long as their discovery range is not set to OFF). + //// + //// Note that this is only available with ROS 2 Iron and later version + //// For more information: https://docs.ros.org/en/rolling/Tutorials/Advanced/Improved-Dynamic-Discovery.html + //// + // ros_static_peers: "192.168.1.1;192.168.1.2", + //// //// shm_enabled: If set to true, the DDS implementation will use Iceoryx shared memory. //// Requires the bridge to be built with the 'dds_shm' feature for this option to valid. diff --git a/README.md b/README.md index d0363d2..4a1e6af 100644 --- a/README.md +++ b/README.md @@ -150,7 +150,7 @@ A typical usage is to run 1 bridge in a robot, and 1 bridge in another host moni It's important to make sure that NO DDS communication can occur between 2 hosts that are bridged by `zenoh-bridge-ros2dds`. Otherwise, some duplicate or looping traffic can occur. To make sure of this, you can either: -- define `ROS_LOCALHOST_ONLY=1`. +- use ROS_AUTOMATIC_DISCOVERY_RANGE=LOCALHOST after Iron and ROS_LOCALHOST_ONLY=1 before Iron. Preferably, enable MULTICAST on the loopback interface with this command (on Linux): `sudo ip l set lo multicast on` - use different `ROS_DOMAIN_ID` on each hosts - use a `CYCLONEDDS_URI` that configures CycloneDDS to only use internal interfaces to the robot. This configuration has to be used for all ROS Nodes as well as for the bridge. diff --git a/zenoh-bridge-ros2dds/src/bridge_args.rs b/zenoh-bridge-ros2dds/src/bridge_args.rs index 762ddc7..0b1b9e3 100644 --- a/zenoh-bridge-ros2dds/src/bridge_args.rs +++ b/zenoh-bridge-ros2dds/src/bridge_args.rs @@ -43,6 +43,12 @@ pub struct BridgeArgs { verbatim_doc_comment )] pub ros_localhost_only: bool, + /// Configure CycloneDDS to apply ROS_AUTOMATIC_DISCOVREY_RANGE. The argument only takes effect after ROS 2 Iron. + #[arg(long, env("ROS_AUTOMATIC_DISCOVERY_RANGE"))] + pub ros_automatic_discovery_range: Option, + /// Configure CycloneDDS to apply ROS_STATIC_PEERS. The argument only takes effect after ROS 2 Iron. + #[arg(long, env("ROS_STATIC_PEERS"))] + pub ros_static_peers: Option, /// Configure CycloneDDS to use Iceoryx shared memory. If not set, CycloneDDS will instead use any shared memory settings defined in "$CYCLONEDDS_URI" configuration. #[cfg(feature = "dds_shm")] #[arg(long)] @@ -101,6 +107,16 @@ impl From<&BridgeArgs> for Config { "plugins/ros2dds/ros_localhost_only", &args.ros_localhost_only, ); + insert_json5_option( + &mut config, + "plugins/ros2dds/ros_automatic_discovery_range", + &args.ros_automatic_discovery_range, + ); + insert_json5_option( + &mut config, + "plugins/ros2dds/ros_static_peers", + &args.ros_static_peers, + ); #[cfg(feature = "dds_shm")] { insert_json5( diff --git a/zenoh-plugin-ros2dds/Cargo.toml b/zenoh-plugin-ros2dds/Cargo.toml index dc8fc62..95aae56 100644 --- a/zenoh-plugin-ros2dds/Cargo.toml +++ b/zenoh-plugin-ros2dds/Cargo.toml @@ -46,6 +46,7 @@ lazy_static = { workspace = true } regex = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +test-case = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } zenoh = { workspace = true } diff --git a/zenoh-plugin-ros2dds/src/config.rs b/zenoh-plugin-ros2dds/src/config.rs index 6585670..0579aba 100644 --- a/zenoh-plugin-ros2dds/src/config.rs +++ b/zenoh-plugin-ros2dds/src/config.rs @@ -38,6 +38,13 @@ pub struct Config { pub domain: u32, #[serde(default = "default_localhost_only")] pub ros_localhost_only: bool, + #[serde( + default = "default_automatic_discovery_range", + deserialize_with = "deserialize_automatic_discovery_range" + )] + pub ros_automatic_discovery_range: Option, + #[serde(default, deserialize_with = "deserialize_static_peers")] + pub ros_static_peers: Option>, #[serde(default, flatten)] pub allowance: Option, #[serde( @@ -466,10 +473,63 @@ fn default_reliable_routes_blocking() -> bool { DEFAULT_RELIABLE_ROUTES_BLOCKING } +#[derive(Deserialize, Debug, Serialize, Eq, PartialEq, Clone, Copy)] +pub enum RosAutomaticDiscoveryRange { + Subnet, + Localhost, + Off, + SystemDefault, +} + fn default_localhost_only() -> bool { env::var("ROS_LOCALHOST_ONLY").as_deref() == Ok("1") } +fn default_automatic_discovery_range() -> Option { + Some(match env::var("ROS_AUTOMATIC_DISCOVERY_RANGE").as_deref() { + Ok("LOCALHOST") => RosAutomaticDiscoveryRange::Localhost, + Ok("OFF") => RosAutomaticDiscoveryRange::Localhost, + Ok("SYSTEM_DEFAULT") => RosAutomaticDiscoveryRange::SystemDefault, + _ => RosAutomaticDiscoveryRange::Subnet, + }) +} + +fn deserialize_automatic_discovery_range<'de, D>( + deserializer: D, +) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let discovery_range: String = Deserialize::deserialize(deserializer).unwrap(); + match discovery_range.as_str() { + "SUBNET" => Ok(Some(RosAutomaticDiscoveryRange::Subnet)), + "LOCALHOST" => Ok(Some(RosAutomaticDiscoveryRange::Localhost)), + "OFF" => Ok(Some(RosAutomaticDiscoveryRange::Off)), + "SYSTEM_DEFAULT" => Ok(Some(RosAutomaticDiscoveryRange::SystemDefault)), + unknown => Err(de::Error::custom(format!( + r#"Invalid parameter {unknown} for ROS_AUTOMATICALLY_DISCOVERY_RANGE"# + ))), + } +} + +fn deserialize_static_peers<'de, D>(deserializer: D) -> Result>, D::Error> +where + D: Deserializer<'de>, +{ + let peers: String = Deserialize::deserialize(deserializer).unwrap(); + let mut peer_list: Vec = Vec::new(); + for peer in peers.split(';') { + if peer != "" { + peer_list.push(peer.to_owned()); + } + } + if peer_list.is_empty() { + Ok(None) + } else { + Ok(Some(peer_list)) + } +} + fn default_transient_local_cache_multiplier() -> usize { DEFAULT_TRANSIENT_LOCAL_CACHE_MULTIPLIER } @@ -648,7 +708,9 @@ where #[cfg(test)] mod tests { - use super::Config; + use test_case::test_case; + + use super::{Config, RosAutomaticDiscoveryRange}; #[test] fn test_allowance() { @@ -862,4 +924,35 @@ mod tests { assert_eq!(__path__, None); assert_eq!(__required__, None); } + + #[test_case("{}", Some(RosAutomaticDiscoveryRange::Subnet); "Empty tests")] + #[test_case(r#"{"ros_automatic_discovery_range": "SUBNET"}"#, Some(RosAutomaticDiscoveryRange::Subnet); "SUBNET tests")] + #[test_case(r#"{"ros_automatic_discovery_range": "LOCALHOST"}"#, Some(RosAutomaticDiscoveryRange::Localhost); "LOCALHOST tests")] + #[test_case(r#"{"ros_automatic_discovery_range": "OFF"}"#, Some(RosAutomaticDiscoveryRange::Off); "OFF tests")] + #[test_case(r#"{"ros_automatic_discovery_range": "SYSTEM_DEFAULT"}"#, Some(RosAutomaticDiscoveryRange::SystemDefault); "SYSTEM_DEFAULT tests")] + fn test_ros_automatic_discovery_range( + config: &str, + result: Option, + ) { + let config = serde_json::from_str::(config); + println!(">>> {:?}", config); + assert!(config.is_ok()); + let Config { + ros_automatic_discovery_range, + .. + } = config.unwrap(); + assert_eq!(ros_automatic_discovery_range, result); + } + + #[test_case("{}", None; "Empty tests")] + #[test_case(r#"{"ros_static_peers": "127.0.0.1"}"#, Some(vec!["127.0.0.1".to_owned()]); "Single peer")] + #[test_case(r#"{"ros_static_peers": "192.168.1.1;192.168.1.2"}"#, Some(vec!["192.168.1.1".to_owned(), "192.168.1.2".to_owned()]); "Multiple peers")] + fn test_ros_static_peers(config: &str, result: Option>) { + let config = serde_json::from_str::(config); + assert!(config.is_ok()); + let Config { + ros_static_peers, .. + } = config.unwrap(); + assert_eq!(ros_static_peers, result); + } } diff --git a/zenoh-plugin-ros2dds/src/lib.rs b/zenoh-plugin-ros2dds/src/lib.rs index 7657259..d72208f 100644 --- a/zenoh-plugin-ros2dds/src/lib.rs +++ b/zenoh-plugin-ros2dds/src/lib.rs @@ -68,7 +68,7 @@ mod route_service_cli; mod route_service_srv; mod route_subscriber; mod routes_mgr; -use config::Config; +use config::{Config, RosAutomaticDiscoveryRange}; use crate::{ dds_utils::get_guid, discovery_mgr::DiscoveryMgr, events::ROS2DiscoveryEvent, @@ -129,19 +129,9 @@ kedefine!( // CycloneDDS' localhost-only: set network interface address (shortened form of config would be // possible, too, but I think it is clearer to spell it out completely). // Empty configuration fragments are ignored, so it is safe to unconditionally append a comma. -const CYCLONEDDS_CONFIG_LOCALHOST_ONLY_BEFORE_HUMBLE: &str = r#" - - ,"#; -const CYCLONEDDS_CONFIG_LOCALHOST_ONLY_AFTER_IRON: &str = r#" - - false - - - auto - 32 - - - ,"#; +const CYCLONEDDS_CONFIG_LOCALHOST_ONLY: &str = r#" + + ,"#; // CycloneDDS' enable-shm: enable usage of Iceoryx shared memory #[cfg(feature = "dds_shm")] @@ -189,6 +179,71 @@ impl Plugin for ROS2Plugin { impl PluginControl for ROS2Plugin {} impl RunningPluginTrait for ROS2Plugin {} +fn create_cyclonedds_config( + ros_automatic_discovery_range: RosAutomaticDiscoveryRange, + ros_static_peers: Vec, +) -> String { + let mut config = String::new(); + // Refer to https://github.com/ros2/rmw_cyclonedds/blob/c9e7001e6bf5373bdf1931535354b52eeddb2053/rmw_cyclonedds_cpp/src/rmw_node.cpp#L1134 + let add_localhost_as_static_peer: bool; + let add_static_peers: bool; + let disable_multicast: bool; + match ros_automatic_discovery_range { + RosAutomaticDiscoveryRange::Subnet => { + add_localhost_as_static_peer = false; + add_static_peers = true; + disable_multicast = false; + } + RosAutomaticDiscoveryRange::SystemDefault => { + add_localhost_as_static_peer = false; + add_static_peers = false; + disable_multicast = false; + } + RosAutomaticDiscoveryRange::Localhost => { + add_localhost_as_static_peer = true; + add_static_peers = true; + disable_multicast = true; + } + RosAutomaticDiscoveryRange::Off => { + add_localhost_as_static_peer = false; + add_static_peers = false; + disable_multicast = true; + } + }; + if add_localhost_as_static_peer || add_static_peers || disable_multicast { + config += ""; + + if disable_multicast { + config += "false"; + } + + let discovery_off = disable_multicast && !add_localhost_as_static_peer && !add_static_peers; + if discovery_off { + config += "none"; + config += &format!("ros_discovery_off_{}", std::process::id()); + } else { + config += "auto"; + config += "32"; + } + + if (add_static_peers && !ros_static_peers.is_empty()) || add_localhost_as_static_peer { + config += ""; + if add_localhost_as_static_peer { + config += ""; + } + for peer in ros_static_peers { + config += ""; + } + config += ""; + } + + config += ","; + } + config +} + pub async fn run(runtime: Runtime, config: Config) { // Try to initiate login. // Required in case of dynamic lib, otherwise no logs. @@ -245,27 +300,52 @@ pub async fn run(runtime: Runtime, config: Config) { } }; - // if "ros_localhost_only" is set, configure CycloneDDS to use only localhost interface - if config.ros_localhost_only { - if ros_distro_is_less_than("iron") { - env::set_var( - "CYCLONEDDS_URI", - format!( - "{}{}", - CYCLONEDDS_CONFIG_LOCALHOST_ONLY_BEFORE_HUMBLE, - env::var("CYCLONEDDS_URI").unwrap_or_default() - ), + // Dynamic Discovery is changed after iron. Need to check the ROS 2 version. + // https://docs.ros.org/en/rolling/Tutorials/Advanced/Improved-Dynamic-Discovery.html + if ros_distro_is_less_than("iron") { + if config.ros_automatic_discovery_range.is_some() { + tracing::warn!("ROS_AUTOMATIC_DISCOVERY_RANGE will be ignored since it's not supported before ROS 2 Iron"); + } + if config.ros_static_peers.is_some() { + tracing::warn!( + "ROS_STATIC_PEERS will be ignored since it's not supported before ROS 2 Iron" ); - } else { + } + // if "ros_localhost_only" is set, configure CycloneDDS to use only localhost interface + if config.ros_localhost_only { env::set_var( "CYCLONEDDS_URI", format!( "{}{}", - CYCLONEDDS_CONFIG_LOCALHOST_ONLY_AFTER_IRON, + CYCLONEDDS_CONFIG_LOCALHOST_ONLY, env::var("CYCLONEDDS_URI").unwrap_or_default() ), ); } + } else { + let (ros_automatic_discovery_range, ros_static_peers) = if config.ros_localhost_only { + // If ROS_LOCALHOST_ONLY is set, need to transform into new environmental variables + // Refer to https://github.com/ros2/ros2_documentation/pull/3519#discussion_r1186541935 + tracing::warn!("ROS_LOCALHOST_ONLY is deprecated but still honored if it is enabled. Use ROS_AUTOMATIC_DISCOVERY_RANGE and ROS_STATIC_PEERS instead."); + tracing::warn!("'localhost_only' is enabled, 'automatic_discovery_range' and 'static_peers' will be ignored."); + (Some(RosAutomaticDiscoveryRange::Localhost), None) + } else { + ( + config.ros_automatic_discovery_range.clone(), + config.ros_static_peers.clone(), + ) + }; + env::set_var( + "CYCLONEDDS_URI", + format!( + "{}{}", + create_cyclonedds_config( + ros_automatic_discovery_range.unwrap_or(RosAutomaticDiscoveryRange::Subnet), + ros_static_peers.unwrap_or(Vec::new()) + ), + env::var("CYCLONEDDS_URI").unwrap_or_default() + ), + ); } // if "enable_shm" is set, configure CycloneDDS to use Iceoryx shared memory