From 71a386df5684a19233331f0a255bc4926b040fe8 Mon Sep 17 00:00:00 2001 From: Tomoya Fujita Date: Fri, 13 Dec 2024 00:09:38 -0800 Subject: [PATCH 1/2] apply actual QoS from rmw to the IPC publisher. Signed-off-by: Tomoya Fujita --- rclcpp/include/rclcpp/publisher.hpp | 13 ++++++---- .../test/rclcpp/test_create_subscription.cpp | 17 +++++++++++++ rclcpp/test/rclcpp/test_publisher.cpp | 25 ++++++++++++++----- 3 files changed, 44 insertions(+), 11 deletions(-) diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 7abea6c6ba..9fb9309642 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -147,6 +147,7 @@ class Publisher : public PublisherBase const rclcpp::PublisherOptionsWithAllocator & options) { // Topic is unused for now. + (void)qos; (void)options; // If needed, setup intra process communication. @@ -154,24 +155,26 @@ class Publisher : public PublisherBase auto context = node_base->get_context(); // Get the intra process manager instance for this context. auto ipm = context->get_sub_context(); - // Register the publisher with the intra process manager. - if (qos.history() != rclcpp::HistoryPolicy::KeepLast) { + // Check if the QoS is compatible with intra-process. + auto qos_profile = get_actual_qos(); + if (qos_profile.history() != rclcpp::HistoryPolicy::KeepLast) { throw std::invalid_argument( "intraprocess communication on topic '" + topic + "' allowed only with keep last history qos policy"); } - if (qos.depth() == 0) { + if (qos_profile.depth() == 0) { throw std::invalid_argument( "intraprocess communication on topic '" + topic + "' is not allowed with a zero qos history depth value"); } - if (qos.durability() == rclcpp::DurabilityPolicy::TransientLocal) { + if (qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal) { buffer_ = rclcpp::experimental::create_intra_process_buffer< ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter>( rclcpp::detail::resolve_intra_process_buffer_type(options_.intra_process_buffer_type), - qos, + qos_profile, std::make_shared(ros_message_type_allocator_)); } + // Register the publisher with the intra process manager. uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this(), buffer_); this->setup_intra_process( intra_process_publisher_id, diff --git a/rclcpp/test/rclcpp/test_create_subscription.cpp b/rclcpp/test/rclcpp/test_create_subscription.cpp index 06536116a1..8439a88621 100644 --- a/rclcpp/test/rclcpp/test_create_subscription.cpp +++ b/rclcpp/test/rclcpp/test_create_subscription.cpp @@ -92,3 +92,20 @@ TEST_F(TestCreateSubscription, create_with_statistics) { ASSERT_NE(nullptr, subscription); EXPECT_STREQ("/ns/topic_name", subscription->get_topic_name()); } + +TEST_F(TestCreateSubscription, create_with_intra_process_com) { + auto node = std::make_shared("my_node", "/ns"); + auto options = rclcpp::SubscriptionOptions(); + options.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable; + + auto callback = [](test_msgs::msg::Empty::ConstSharedPtr) {}; + rclcpp::Subscription::SharedPtr subscription; + ASSERT_NO_THROW( + { + subscription = + rclcpp::create_subscription( + node, "topic_name", rclcpp::SystemDefaultsQoS(), callback, options); + }); + ASSERT_NE(nullptr, subscription); + EXPECT_STREQ("/ns/topic_name", subscription->get_topic_name()); +} diff --git a/rclcpp/test/rclcpp/test_publisher.cpp b/rclcpp/test/rclcpp/test_publisher.cpp index 4224bd49d8..219d7d3017 100644 --- a/rclcpp/test/rclcpp/test_publisher.cpp +++ b/rclcpp/test/rclcpp/test_publisher.cpp @@ -187,6 +187,21 @@ TEST_F(TestPublisher, various_creation_signatures) { } } +/* + Testing publisher with intraprocess enabled and SystemDefaultQoS + */ +TEST_F(TestPublisher, test_publisher_with_system_default_qos) { + initialize(rclcpp::NodeOptions().use_intra_process_comms(false)); + // explicitly enable intra-process comm with publisher option + auto options = rclcpp::PublisherOptions(); + options.use_intra_process_comm = rclcpp::IntraProcessSetting::Enable; + using test_msgs::msg::Empty; + ASSERT_NO_THROW( + { + auto publisher = node->create_publisher("topic", rclcpp::SystemDefaultsQoS()); + }); +} + /* Testing publisher with intraprocess enabled and invalid QoS */ @@ -432,12 +447,10 @@ TEST_F(TestPublisher, intra_process_publish_failures) { publisher->get_publisher_handle().get(), msg.get())); } } - RCLCPP_EXPECT_THROW_EQ( - node->create_publisher( - "topic", rclcpp::QoS(0), options), - std::invalid_argument( - "intraprocess communication on topic 'topic' " - "is not allowed with a zero qos history depth value")); + // a zero depth with KEEP_LAST doesn't make sense, + // this will be interpreted as SystemDefaultQoS by rclcpp. + EXPECT_NO_THROW( + node->create_publisher("topic", rclcpp::QoS(0), options)); } TEST_F(TestPublisher, inter_process_publish_failures) { From 9c4cdd06aa5940a180f9e4e45c3d6a9faaedbeed Mon Sep 17 00:00:00 2001 From: Tomoya Fujita Date: Fri, 13 Dec 2024 20:39:59 -0800 Subject: [PATCH 2/2] address uncrustify warning. Signed-off-by: Tomoya Fujita --- rclcpp/include/rclcpp/publisher.hpp | 1 - rclcpp/test/rclcpp/test_create_subscription.cpp | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 9fb9309642..a787781802 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -146,7 +146,6 @@ class Publisher : public PublisherBase const rclcpp::QoS & qos, const rclcpp::PublisherOptionsWithAllocator & options) { - // Topic is unused for now. (void)qos; (void)options; diff --git a/rclcpp/test/rclcpp/test_create_subscription.cpp b/rclcpp/test/rclcpp/test_create_subscription.cpp index 8439a88621..b5a5b68e35 100644 --- a/rclcpp/test/rclcpp/test_create_subscription.cpp +++ b/rclcpp/test/rclcpp/test_create_subscription.cpp @@ -102,9 +102,8 @@ TEST_F(TestCreateSubscription, create_with_intra_process_com) { rclcpp::Subscription::SharedPtr subscription; ASSERT_NO_THROW( { - subscription = - rclcpp::create_subscription( - node, "topic_name", rclcpp::SystemDefaultsQoS(), callback, options); + subscription = rclcpp::create_subscription( + node, "topic_name", rclcpp::SystemDefaultsQoS(), callback, options); }); ASSERT_NE(nullptr, subscription); EXPECT_STREQ("/ns/topic_name", subscription->get_topic_name());