Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test package for single and multi thread #362

Open
wants to merge 1 commit into
base: foxy
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions rclcpp/executors/executor_comparison/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
cmake_minimum_required(VERSION 3.5)
project(examples_rclcpp_executor_comparison)

find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_BUILD_TYPE Debug)

add_executable(publisher_node publisher_node.cc)
ament_target_dependencies(publisher_node rclcpp std_msgs)
install(TARGETS publisher_node DESTINATION lib/simple_thread_tester)

add_executable(single_thread_subscriber_node single_thread_subscriber_node.cc)
ament_target_dependencies(single_thread_subscriber_node rclcpp std_msgs)
install(TARGETS single_thread_subscriber_node
DESTINATION lib/simple_thread_tester)

add_executable(multi_thread_reentrant_subscriber_node
multi_thread_reentrant_subscriber_node.cc)
ament_target_dependencies(multi_thread_reentrant_subscriber_node rclcpp
std_msgs)
install(TARGETS multi_thread_reentrant_subscriber_node
DESTINATION lib/simple_thread_tester)

add_executable(multi_thread_mutually_exclusive_subscriber_node
multi_thread_mutually_exclusive_subscriber_node.cc)
ament_target_dependencies(multi_thread_mutually_exclusive_subscriber_node
rclcpp std_msgs)
install(TARGETS multi_thread_mutually_exclusive_subscriber_node
DESTINATION lib/simple_thread_tester)

ament_package()
31 changes: 31 additions & 0 deletions rclcpp/executors/executor_comparison/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Simple ros2 package to understand threads

Visit [here](https://motion-boseong.vercel.app/threading-ros2) for explanations.

## Running node

### 1. Single-thread test
In terminal A (also for the two belows)

```
ros2 run simple_thread_tester publisher_node
```

In terminal B

```
ros2 run simple_thread_tester single_thread_subscriber_node
```
### 2. Multi-thread reentrant callback group test

```
ros2 run simple_thread_tester multi_thread_reentrant_subscriber_node
```

### 3. Multi-thread mutually exclusive callback group test

```
ros2 run simple_thread_tester multi_thread_reentrant_subscriber_node
# If simulate mutex
ros2 param set /subscriber_node use_mutex true
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#include <chrono>
#include <random>
#include <thread>

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>

using namespace std::chrono_literals;

class MultiThreadMutuallyExclusiveSubscriber : public rclcpp::Node {

public:
MultiThreadMutuallyExclusiveSubscriber(rclcpp::NodeOptions node_options)
: Node("subscriber_node",
node_options.allow_undeclared_parameters(true)) {
rclcpp::SubscriptionOptions options;
options.callback_group =
create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);

short_subscriber_ = create_subscription<std_msgs::msg::String>(
"/short_topic", rclcpp::QoS(10),
std::bind(&MultiThreadMutuallyExclusiveSubscriber::ShortTopicCallback,
this, std::placeholders::_1),
options);

options.callback_group =
create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
long_subscriber_ = create_subscription<std_msgs::msg::String>(
"/long_topic", rclcpp::QoS(10),
std::bind(&MultiThreadMutuallyExclusiveSubscriber::LongTopicCallback,
this, std::placeholders::_1),
options);

timer_ = create_wall_timer(
1s, std::bind(&MultiThreadMutuallyExclusiveSubscriber::TimerCallback,
this));

auto param_change_callback =
[this](std::vector<rclcpp::Parameter> parameters) {
for (auto parameter : parameters) {
if (parameter.get_name() == "use_mutex") {
use_mutex = parameter.as_bool();
if (use_mutex)
RCLCPP_INFO(get_logger(), "will use mutex");
else
RCLCPP_INFO(get_logger(), "will not mutex");
}
}

auto result = rcl_interfaces::msg::SetParametersResult();
result.successful = true;
return result;
};

callback_handler =
this->add_on_set_parameters_callback(param_change_callback);
}

private:
bool use_mutex{false};
struct {
std::mutex short_topic_mutex;
std::mutex long_topic_mutex;
} mutex_list;
std::string processed_short_string_;
std::string processed_long_string_;
void ProcessString(const std::string &raw_string, std::string &output);

rclcpp::Subscription<std_msgs::msg::String>::SharedPtr short_subscriber_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr long_subscriber_;
rclcpp::TimerBase::SharedPtr timer_;
OnSetParametersCallbackHandle::SharedPtr callback_handler;

void ShortTopicCallback(const std_msgs::msg::String::SharedPtr msg);
void LongTopicCallback(const std_msgs::msg::String::SharedPtr msg);
void TimerCallback();
};

void MultiThreadMutuallyExclusiveSubscriber::ProcessString(
const std::string &raw_string, std::string &output) {
output = "** ";

for (char c : raw_string) {
output.push_back(c);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
output += " **";
};

void MultiThreadMutuallyExclusiveSubscriber::ShortTopicCallback(
const std_msgs::msg::String::SharedPtr msg) {
std::shared_ptr<std::unique_lock<std::mutex>> lock;
if (use_mutex)
mutex_list.short_topic_mutex.lock();
ProcessString(msg->data, processed_short_string_);
RCLCPP_INFO(get_logger(), "Setting processed: %s",
processed_short_string_.c_str());
if (use_mutex)
mutex_list.short_topic_mutex.unlock();
}

void MultiThreadMutuallyExclusiveSubscriber::LongTopicCallback(
const std_msgs::msg::String::SharedPtr msg) {

if (use_mutex)
mutex_list.long_topic_mutex.lock();
ProcessString(msg->data, processed_long_string_);
RCLCPP_INFO(get_logger(), "Setting processed: %s",
processed_long_string_.c_str());
if (use_mutex)
mutex_list.long_topic_mutex.unlock();
}

void MultiThreadMutuallyExclusiveSubscriber::TimerCallback() {
if (use_mutex) {
mutex_list.short_topic_mutex.lock();
mutex_list.long_topic_mutex.lock();
}
RCLCPP_INFO(get_logger(),
"Getting processed strings: \n [long] %s \n [short] %s",
processed_long_string_.c_str(), processed_short_string_.c_str());

if (use_mutex) {
mutex_list.short_topic_mutex.unlock();
mutex_list.long_topic_mutex.unlock();
}
}

int main(int argc, char **argv) {
rclcpp::init(argc, argv);
rclcpp::NodeOptions option;
auto node = std::make_shared<MultiThreadMutuallyExclusiveSubscriber>(option);
rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(node);
executor.spin();
rclcpp::shutdown();
return 0;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include <chrono>
#include <random>
#include <thread>

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>

using namespace std::chrono_literals;

class MultiThreadReentrantSubscriber : public rclcpp::Node {

public:
MultiThreadReentrantSubscriber() : Node("subscriber_node") {
rclcpp::SubscriptionOptions options;
options.callback_group =
create_callback_group(rclcpp::CallbackGroupType::Reentrant);

short_subscriber_ = create_subscription<std_msgs::msg::String>(
"/short_topic", rclcpp::QoS(10),
std::bind(&MultiThreadReentrantSubscriber::ShortTopicCallback, this,
std::placeholders::_1),
options);

long_subscriber_ = create_subscription<std_msgs::msg::String>(
"/long_topic", rclcpp::QoS(10),
std::bind(&MultiThreadReentrantSubscriber::LongTopicCallback, this,
std::placeholders::_1),
options);
}

private:
std::string processed_short_string_;
std::string processed_long_string_;
std::string ProcessString(const std::string &raw_string);

rclcpp::Subscription<std_msgs::msg::String>::SharedPtr short_subscriber_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr long_subscriber_;
void ShortTopicCallback(const std_msgs::msg::String::SharedPtr msg);
void LongTopicCallback(const std_msgs::msg::String::SharedPtr msg);
};

std::string
MultiThreadReentrantSubscriber::ProcessString(const std::string &raw_string) {
std::thread::id this_id = std::this_thread::get_id();
std::ostringstream oss;
oss << std::this_thread::get_id();

std::this_thread::sleep_for(std::chrono::seconds(2));
return "** " + raw_string + " **" + " ( by " + oss.str() + ") ";
};

void MultiThreadReentrantSubscriber::ShortTopicCallback(
const std_msgs::msg::String::SharedPtr msg) {
auto processed_string = ProcessString(msg->data);

RCLCPP_INFO(get_logger(), "Setting processed: %s", processed_string.c_str());
processed_short_string_ = processed_string;
}

void MultiThreadReentrantSubscriber::LongTopicCallback(
const std_msgs::msg::String::SharedPtr msg) {
auto processed_string = ProcessString(msg->data);

RCLCPP_INFO(get_logger(), "Setting processed: %s", processed_string.c_str());
processed_long_string_ = processed_string;
}

int main(int argc, char **argv) {
rclcpp::init(argc, argv);
auto node = std::make_shared<MultiThreadReentrantSubscriber>();
rclcpp::executors::MultiThreadedExecutor executor;
executor.add_node(node);
executor.spin();
rclcpp::shutdown();
return 0;
}
19 changes: 19 additions & 0 deletions rclcpp/executors/executor_comparison/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>examples_rclcpp_executor_comparison</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="[email protected]">jbs</maintainer>
<license>TODO: License declaration</license>

<buildtool_depend>ament_cmake</buildtool_depend>


<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>

<export>
<build_type>ament_cmake</build_type>
</export>
</package>
54 changes: 54 additions & 0 deletions rclcpp/executors/executor_comparison/publisher_node.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#include <random>

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/string.hpp>

using namespace std::chrono_literals;

class PublisherNode : public rclcpp::Node {
public:
PublisherNode() : Node("publisher_node") {
short_publisher_ = create_publisher<std_msgs::msg::String>(
"/short_topic", rclcpp::SystemDefaultsQoS());
long_publisher_ = create_publisher<std_msgs::msg::String>(
"/long_topic", rclcpp::SystemDefaultsQoS());

short_timer_ =
create_wall_timer(100ms, [this]() { PublishShortMessage(); });

long_timer_ = create_wall_timer(1s, [this]() { PublishLongMessage(); });
}

private:
size_t short_seq_{0};
size_t long_seq_{0};

rclcpp::Publisher<std_msgs::msg::String>::SharedPtr short_publisher_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr long_publisher_;

rclcpp::TimerBase::SharedPtr short_timer_;
rclcpp::TimerBase::SharedPtr long_timer_;

void PublishShortMessage() {
std_msgs::msg::String message;
message.data = "Short message (seq=" + std::to_string(short_seq_++) + ")";
RCLCPP_INFO(get_logger(), message.data, " published.");
short_publisher_->publish(message);
}

void PublishLongMessage() {
std_msgs::msg::String message;
message.data = "Long message with more characters (seq=" +
std::to_string(long_seq_++) + ")";
RCLCPP_INFO(get_logger(), message.data, " published.");
long_publisher_->publish(message);
}
};

int main(int argc, char **argv) {
rclcpp::init(argc, argv);
auto node = std::make_shared<PublisherNode>();
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}
Loading