Skip to content

Commit

Permalink
Fix unique network flows with TCP transports (#5461)
Browse files Browse the repository at this point in the history
* Refs #22055: Add regression tests

Signed-off-by: cferreiragonz <[email protected]>

* Refs #22055: Fix unique flows for TCP

Signed-off-by: cferreiragonz <[email protected]>

* Refs #22055: Fix tests

Signed-off-by: cferreiragonz <[email protected]>

---------

Signed-off-by: cferreiragonz <[email protected]>
  • Loading branch information
cferreiragonz authored Dec 11, 2024
1 parent 971f120 commit 81cdb10
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 1 deletion.
10 changes: 9 additions & 1 deletion src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1737,7 +1737,15 @@ bool RTPSParticipantImpl::createAndAssociateReceiverswithEndpoint(
// Set port on unicast locators
for (Locator_t& loc : attributes.unicastLocatorList)
{
loc.port = port;
// Set logical port only TCP locators
if (LOCATOR_KIND_TCPv4 == loc.kind || LOCATOR_KIND_TCPv6 == loc.kind)
{
IPLocator::setLogicalPort(loc, port);
}
else
{
loc.port = port;
}
}

// Try creating receiver resources
Expand Down
7 changes: 7 additions & 0 deletions test/blackbox/api/dds-pim/PubSubParticipant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,13 @@ class PubSubParticipant
return false;
}

PubSubParticipant& initial_peers(
const eprosima::fastdds::rtps::LocatorList& initial_peers)
{
participant_qos_.wire_protocol().builtin.initialPeersList = initial_peers;
return *this;
}

PubSubParticipant& pub_property_policy(
const eprosima::fastdds::rtps::PropertyPolicy property_policy)
{
Expand Down
131 changes: 131 additions & 0 deletions test/blackbox/common/BlackboxTestsTransportTCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "../api/dds-pim/TCPReqRepHelloWorldRequester.hpp"
#include "../api/dds-pim/TCPReqRepHelloWorldReplier.hpp"
#include "PubSubParticipant.hpp"
#include "PubSubReader.hpp"
#include "PubSubWriter.hpp"
#include "DatagramInjectionTransport.hpp"
Expand Down Expand Up @@ -1375,6 +1376,136 @@ TEST_P(TransportTCP, TCP_initial_peers_connection)
p3.block_for_all();
}

TEST_P(TransportTCP, tcp_unique_network_flows_init)
{
// TCP Writer creation should fail as feature is not implemented for writers
{
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
PropertyPolicy properties;
properties.properties().emplace_back("fastdds.unique_network_flows", "");

test_transport_->add_listener_port(global_port);
writer.disable_builtin_transport().add_user_transport_to_pparams(test_transport_);

writer.entity_property_policy(properties).init();

EXPECT_FALSE(writer.isInitialized());
}

// Two readers on the same participant not requesting unique flows should give the same logical port and same physical port
{
PubSubParticipant<HelloWorldPubSubType> participant(0, 2, 0, 0);

participant.sub_topic_name(TEST_TOPIC_NAME);

participant.disable_builtin_transport().add_user_transport_to_pparams(test_transport_);

ASSERT_TRUE(participant.init_participant());
ASSERT_TRUE(participant.init_subscriber(0));
ASSERT_TRUE(participant.init_subscriber(1));

LocatorList_t locators;
LocatorList_t locators2;

participant.get_native_reader(0).get_listening_locators(locators);
participant.get_native_reader(1).get_listening_locators(locators2);

EXPECT_TRUE(locators == locators2);
// LocatorList size depends on the number of interfaces. Different address but same port.
ASSERT_GT(locators.size(), 0);
ASSERT_GT(locators2.size(), 0);
auto locator1 = locators.begin();
auto locator2 = locators2.begin();
EXPECT_EQ(IPLocator::getPhysicalPort(*locator1), IPLocator::getPhysicalPort(*locator2));
EXPECT_EQ(IPLocator::getLogicalPort(*locator1), IPLocator::getLogicalPort(*locator2));
}

// Two TCP readers on the same participant requesting unique flows should give different logical ports but same physical port
{
PubSubParticipant<HelloWorldPubSubType> participant(0, 2, 0, 0);

PropertyPolicy properties;
properties.properties().emplace_back("fastdds.unique_network_flows", "");
participant.sub_topic_name(TEST_TOPIC_NAME).sub_property_policy(properties);

participant.disable_builtin_transport().add_user_transport_to_pparams(test_transport_);

ASSERT_TRUE(participant.init_participant());
ASSERT_TRUE(participant.init_subscriber(0));
ASSERT_TRUE(participant.init_subscriber(1));

LocatorList_t locators;
LocatorList_t locators2;

participant.get_native_reader(0).get_listening_locators(locators);
participant.get_native_reader(1).get_listening_locators(locators2);

EXPECT_FALSE(locators == locators2);
// LocatorList size depends on the number of interfaces. Different address but same port.
ASSERT_GT(locators.size(), 0);
ASSERT_GT(locators2.size(), 0);
auto locator1 = locators.begin();
auto locator2 = locators2.begin();
EXPECT_EQ(IPLocator::getPhysicalPort(*locator1), IPLocator::getPhysicalPort(*locator2));
EXPECT_NE(IPLocator::getLogicalPort(*locator1), IPLocator::getLogicalPort(*locator2));
}
}

TEST_P(TransportTCP, tcp_unique_network_flows_communication)
{
PubSubParticipant<HelloWorldPubSubType> readers(0, 2, 0, 2);
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);

PropertyPolicy properties;
properties.properties().emplace_back("fastdds.unique_network_flows", "");
readers.disable_builtin_transport().add_user_transport_to_pparams(test_transport_);

eprosima::fastdds::rtps::Locator_t initial_peer_locator;
if (use_ipv6)
{
initial_peer_locator.kind = LOCATOR_KIND_TCPv6;
eprosima::fastdds::rtps::IPLocator::setIPv6(initial_peer_locator, "::1");
}
else
{
initial_peer_locator.kind = LOCATOR_KIND_TCPv4;
eprosima::fastdds::rtps::IPLocator::setIPv4(initial_peer_locator, "127.0.0.1");
}
eprosima::fastdds::rtps::IPLocator::setPhysicalPort(initial_peer_locator, global_port);
eprosima::fastdds::rtps::LocatorList_t initial_peer_list;
initial_peer_list.push_back(initial_peer_locator);

readers.sub_topic_name(TEST_TOPIC_NAME)
.sub_property_policy(properties)
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.initial_peers(initial_peer_list);

ASSERT_TRUE(readers.init_participant());
ASSERT_TRUE(readers.init_subscriber(0));
ASSERT_TRUE(readers.init_subscriber(1));

test_transport_->add_listener_port(global_port);
writer.disable_builtin_transport()
.add_user_transport_to_pparams(test_transport_)
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.history_depth(100);

writer.init();
ASSERT_TRUE(writer.isInitialized());

// Wait for discovery.
writer.wait_discovery();
readers.sub_wait_discovery();

// Send data
auto data = default_helloworld_data_generator();
writer.send(data);
// In this test all data should be sent.
ASSERT_TRUE(data.empty());
// Block until readers have acknowledged all samples.
EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(30)));
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
Expand Down

0 comments on commit 81cdb10

Please sign in to comment.