Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: wrap the zenoh session with a shared pointer #333

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ namespace rmw_zenoh_cpp
{
///=============================================================================
std::shared_ptr<ClientData> ClientData::make(
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> session,
const rmw_node_t * const node,
const rmw_client_t * client,
liveliness::NodeInfo node_info,
Expand Down Expand Up @@ -167,7 +167,7 @@ std::shared_ptr<ClientData> ClientData::make(

std::size_t domain_id = node_info.domain_id_;
auto entity = liveliness::Entity::make(
z_info_zid(session),
z_info_zid(session->loan()),
std::to_string(node_id),
std::to_string(service_id),
liveliness::EntityType::Client,
Expand All @@ -192,6 +192,7 @@ std::shared_ptr<ClientData> ClientData::make(
node,
client,
entity,
session,
request_members,
response_members,
request_type_support,
Expand All @@ -211,13 +212,15 @@ ClientData::ClientData(
const rmw_node_t * rmw_node,
const rmw_client_t * rmw_client,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<ZenohSession> sess,
const void * request_type_support_impl,
const void * response_type_support_impl,
std::shared_ptr<RequestTypeSupport> request_type_support,
std::shared_ptr<ResponseTypeSupport> response_type_support)
: rmw_node_(rmw_node),
rmw_client_(rmw_client),
entity_(std::move(entity)),
sess_(std::move(sess)),
request_type_support_impl_(request_type_support_impl),
response_type_support_impl_(response_type_support_impl),
request_type_support_(request_type_support),
Expand All @@ -232,7 +235,7 @@ ClientData::ClientData(
}

///=============================================================================
bool ClientData::init(const z_loaned_session_t * session)
bool ClientData::init(std::shared_ptr<ZenohSession> session)
{
if (z_keyexpr_from_str(
&this->keyexpr_,
Expand All @@ -250,7 +253,7 @@ bool ClientData::init(const z_loaned_session_t * session)
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_keyexpr.c_str());
if (z_liveliness_declare_token(
session,
session->loan(),
&this->token_,
z_loan(liveliness_ke),
NULL
Expand All @@ -266,6 +269,7 @@ bool ClientData::init(const z_loaned_session_t * session)
z_drop(z_move(this->token_));
});

sess_ = session;
initialized_ = true;

free_ros_keyexpr.cancel();
Expand Down Expand Up @@ -470,7 +474,7 @@ rmw_ret_t ClientData::send_request(
z_owned_closure_reply_t zn_closure_reply;
z_closure(&zn_closure_reply, client_data_handler, client_data_drop, this);
z_get(
context_impl->session(),
sess_->loan(),
z_loan(keyexpr_), "",
z_move(zn_closure_reply),
&opts);
Expand Down Expand Up @@ -535,6 +539,7 @@ void ClientData::_shutdown()
z_drop(z_move(keyexpr_));
}

sess_.reset();
is_shutdown_ = true;
}

Expand Down
7 changes: 5 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_client_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
public:
// Make a shared_ptr of ClientData.
static std::shared_ptr<ClientData> make(
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> session,
const rmw_node_t * const node,
const rmw_client_t * client,
liveliness::NodeInfo node_info,
Expand Down Expand Up @@ -113,13 +113,14 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
const rmw_node_t * rmw_node,
const rmw_client_t * client,
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<ZenohSession> sess,
const void * request_type_support_impl,
const void * response_type_support_impl,
std::shared_ptr<RequestTypeSupport> request_type_support,
std::shared_ptr<ResponseTypeSupport> response_type_support);

// Initialize the Zenoh objects for this entity.
bool init(const z_loaned_session_t * session);
bool init(std::shared_ptr<ZenohSession> session);

// Shutdown this client (the mutex is expected to be held by the caller).
void _shutdown();
Expand All @@ -131,6 +132,8 @@ class ClientData final : public std::enable_shared_from_this<ClientData>
const rmw_client_t * rmw_client_;
// The Entity generated for the service.
std::shared_ptr<liveliness::Entity> entity_;
// A shared session.
std::shared_ptr<ZenohSession> sess_;
// An owned keyexpression.
z_owned_keyexpr_t keyexpr_;
// Liveliness token for the service.
Expand Down
43 changes: 23 additions & 20 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

#include "rcpputils/scope_exit.hpp"
#include "rmw/error_handling.h"
#include "zenoh_utils.hpp"

// Megabytes of SHM to reserve.
// TODO(clalancette): Make this configurable, or get it from the configuration
Expand Down Expand Up @@ -86,13 +87,18 @@ class rmw_context_impl_s::Data final
});

// Initialize the zenoh session.
if (z_open(&session_, z_move(config), NULL) != Z_OK) {
z_owned_session_t raw_session;
if (z_open(&raw_session, z_move(config), NULL) != Z_OK) {
RMW_SET_ERROR_MSG("Error setting up zenoh session.");
throw std::runtime_error("Error setting up zenoh session.");
}
if (session_ != nullptr) {
session_.reset();
}
session_ = std::make_shared<rmw_zenoh_cpp::ZenohSession>(raw_session);
auto close_session = rcpputils::make_scope_exit(
[this]() {
z_close(z_loan_mut(session_), NULL);
[&raw_session]() {
z_close(z_loan_mut(raw_session), NULL);
});

// Verify if the zenoh router is running if configured.
Expand All @@ -102,7 +108,7 @@ class rmw_context_impl_s::Data final
uint64_t connection_attempts = 0;
constexpr std::chrono::milliseconds sleep_time(1000);
constexpr int64_t ticks_between_print(std::chrono::milliseconds(1000) / sleep_time);
while ((ret = rmw_zenoh_cpp::zenoh_router_check(z_loan(session_))) != RMW_RET_OK) {
while ((ret = rmw_zenoh_cpp::zenoh_router_check(session_->loan())) != RMW_RET_OK) {
if ((connection_attempts % ticks_between_print) == 0) {
RMW_ZENOH_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
Expand All @@ -117,7 +123,7 @@ class rmw_context_impl_s::Data final
}

// Initialize the graph cache.
const z_id_t zid = z_info_zid(z_loan(session_));
const z_id_t zid = z_info_zid(session_->loan());
graph_cache_ = std::make_shared<rmw_zenoh_cpp::GraphCache>(zid);
// Setup liveliness subscriptions for discovery.
std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token(domain_id);
Expand All @@ -144,7 +150,7 @@ class rmw_context_impl_s::Data final
z_view_keyexpr_t keyexpr;
z_view_keyexpr_from_str(&keyexpr, liveliness_str.c_str());
z_liveliness_get(
z_loan(session_), z_loan(keyexpr),
session_->loan(), z_loan(keyexpr),
z_move(closure), NULL);
z_owned_reply_t reply;
while (z_recv(z_loan(handler), &reply) == Z_OK) {
Expand Down Expand Up @@ -203,7 +209,7 @@ class rmw_context_impl_s::Data final
z_view_keyexpr_t liveliness_ke;
z_view_keyexpr_from_str(&liveliness_ke, liveliness_str.c_str());
if (z_liveliness_declare_subscriber(
z_loan(session_),
session_->loan(),
&graph_subscriber_, z_loan(liveliness_ke),
z_move(callback), &sub_options) != Z_OK)
{
Expand Down Expand Up @@ -240,11 +246,8 @@ class rmw_context_impl_s::Data final
// to avoid an AB/BA deadlock if shutdown is racing with graph_sub_data_handler().
}

// Close the zenoh session
if (z_close(z_loan_mut(session_), NULL) != Z_OK) {
RMW_SET_ERROR_MSG("Error while closing zenoh session");
return RMW_RET_ERROR;
}
// Drop the shared session.
session_.reset();

return RMW_RET_OK;
}
Expand All @@ -255,10 +258,10 @@ class rmw_context_impl_s::Data final
return enclave_;
}

const z_loaned_session_t * session() const
std::shared_ptr<rmw_zenoh_cpp::ZenohSession> session() const
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
return z_loan(session_);
return session_;
}

std::optional<z_owned_shm_provider_t> & shm_provider()
Expand Down Expand Up @@ -288,7 +291,7 @@ class rmw_context_impl_s::Data final
bool session_is_valid() const
{
std::lock_guard<std::recursive_mutex> lock(mutex_);
return !z_session_is_closed(z_loan(session_));
return !z_session_is_closed(session_->loan());
}

std::shared_ptr<rmw_zenoh_cpp::GraphCache> graph_cache()
Expand All @@ -309,7 +312,7 @@ class rmw_context_impl_s::Data final
}

// Check that the Zenoh session is still valid.
if (z_session_is_closed(z_loan(session_))) {
if (z_session_is_closed(session_->loan())) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
"Unable to create NodeData as Zenoh session is invalid.");
Expand All @@ -319,7 +322,7 @@ class rmw_context_impl_s::Data final
auto node_data = rmw_zenoh_cpp::NodeData::make(
node,
this->get_next_entity_id(),
z_loan(session_),
session_->loan(),
domain_id_,
ns,
node_name,
Expand Down Expand Up @@ -395,8 +398,8 @@ class rmw_context_impl_s::Data final
std::size_t domain_id_;
// Enclave, name used to find security artifacts in a sros2 keystore.
std::string enclave_;
// An owned session.
z_owned_session_t session_;
// A shared session.
std::shared_ptr<rmw_zenoh_cpp::ZenohSession> session_{nullptr};
// An optional SHM manager that is initialized of SHM is enabled in the
// zenoh session config.
std::optional<z_owned_shm_provider_t> shm_provider_;
Expand Down Expand Up @@ -472,7 +475,7 @@ std::string rmw_context_impl_s::enclave() const
}

///=============================================================================
const z_loaned_session_t * rmw_context_impl_s::session() const
std::shared_ptr<rmw_zenoh_cpp::ZenohSession> rmw_context_impl_s::session() const
{
return data_->session();
}
Expand Down
4 changes: 2 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class rmw_context_impl_s final
// Get a copy of the enclave.
std::string enclave() const;

// Loan the Zenoh session.
const z_loaned_session_t * session() const;
// Share the Zenoh session.
std::shared_ptr<rmw_zenoh_cpp::ZenohSession> session() const;

// Get a reference to the shm_provider.
// Note: This is not thread-safe.
Expand Down
8 changes: 4 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ std::size_t NodeData::id() const
///=============================================================================
bool NodeData::create_pub_data(
const rmw_publisher_t * const publisher,
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> session,
std::size_t id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
Expand Down Expand Up @@ -187,7 +187,7 @@ void NodeData::delete_pub_data(const rmw_publisher_t * const publisher)
///=============================================================================
bool NodeData::create_sub_data(
const rmw_subscription_t * const subscription,
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> session,
std::shared_ptr<GraphCache> graph_cache,
std::size_t id,
const std::string & topic_name,
Expand Down Expand Up @@ -255,7 +255,7 @@ void NodeData::delete_sub_data(const rmw_subscription_t * const subscription)
///=============================================================================
bool NodeData::create_service_data(
const rmw_service_t * const service,
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> session,
std::size_t id,
const std::string & service_name,
const rosidl_service_type_support_t * type_supports,
Expand Down Expand Up @@ -322,7 +322,7 @@ void NodeData::delete_service_data(const rmw_service_t * const service)
///=============================================================================
bool NodeData::create_client_data(
const rmw_client_t * const client,
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> session,
std::size_t id,
const std::string & service_name,
const rosidl_service_type_support_t * type_supports,
Expand Down
8 changes: 4 additions & 4 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class NodeData final
// Create a new PublisherData for a given rmw_publisher_t.
bool create_pub_data(
const rmw_publisher_t * const publisher,
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> sess,
std::size_t id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
Expand All @@ -70,7 +70,7 @@ class NodeData final
// Create a new SubscriptionData for a given rmw_subscription_t.
bool create_sub_data(
const rmw_subscription_t * const subscription,
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> sess,
std::shared_ptr<GraphCache> graph_cache,
std::size_t id,
const std::string & topic_name,
Expand All @@ -86,7 +86,7 @@ class NodeData final
// Create a new ServiceData for a given rmw_service_t.
bool create_service_data(
const rmw_service_t * const service,
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> session,
std::size_t id,
const std::string & service_name,
const rosidl_service_type_support_t * type_support,
Expand All @@ -101,7 +101,7 @@ class NodeData final
// Create a new ClientData for a given rmw_client_t.
bool create_client_data(
const rmw_client_t * const client,
const z_loaned_session_t * session,
std::shared_ptr<ZenohSession> session,
std::size_t id,
const std::string & service_name,
const rosidl_service_type_support_t * type_support,
Expand Down
Loading
Loading