From c47768079b252e505643c9e59a3f4d59e5f0b6db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Ferreira=20Gonz=C3=A1lez?= Date: Thu, 27 Jun 2024 10:01:00 +0200 Subject: [PATCH] Create InitialConnection for TCP initial peers 2.13 & 2.10 & 2.6 (#4947) * Refs #20650: Add test Signed-off-by: cferreiragonz * Refs #20650: Create initial connect for initial peers Signed-off-by: cferreiragonz --------- Signed-off-by: cferreiragonz (cherry picked from commit a0a4feeff3a5b8204a30ee56ed65b8a3c3309bac) # Conflicts: # test/blackbox/common/BlackboxTestsTransportTCP.cpp --- .../discovery/participant/PDPSimple.cpp | 12 +- .../common/BlackboxTestsTransportTCP.cpp | 172 +++++++++++++++++- 2 files changed, 181 insertions(+), 3 deletions(-) diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp index 074c340b14b..80d68033afc 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp @@ -378,7 +378,17 @@ bool PDPSimple::create_dcps_participant_endpoints() WriterAttributes watt = create_builtin_writer_attributes(); watt.endpoint.reliabilityKind = BEST_EFFORT; - watt.endpoint.remoteLocatorList = m_discovery.initialPeersList; + if (!m_discovery.initialPeersList.empty()) + { + if (mp_RTPSParticipant->has_tcp_transports()) + { + mp_RTPSParticipant->create_tcp_connections(m_discovery.initialPeersList); + } + else + { + watt.endpoint.remoteLocatorList = m_discovery.initialPeersList; + } + } if (mp_RTPSParticipant->getRTPSParticipantAttributes().throughputController.bytesPerPeriod != UINT32_MAX && mp_RTPSParticipant->getRTPSParticipantAttributes().throughputController.periodMillisecs != 0) diff --git a/test/blackbox/common/BlackboxTestsTransportTCP.cpp b/test/blackbox/common/BlackboxTestsTransportTCP.cpp index 885f701b95f..4d07cc0a14d 100644 --- a/test/blackbox/common/BlackboxTestsTransportTCP.cpp +++ b/test/blackbox/common/BlackboxTestsTransportTCP.cpp @@ -607,7 +607,7 @@ TEST(TransportTCP, Client_reconnection) delete requester; } -// Test copy constructor and copy assignment for TCPv4 +// Test zero listening port for TCPv4 TEST_P(TransportTCP, TCPv4_autofill_port) { PubSubReader p1(TEST_TOPIC_NAME); @@ -637,7 +637,7 @@ TEST_P(TransportTCP, TCPv4_autofill_port) EXPECT_TRUE(IPLocator::getPhysicalPort(p2_locators.begin()[0]) == port); } -// Test copy constructor and copy assignment for TCPv6 +// Test zero listening port for TCPv6 TEST_P(TransportTCP, TCPv6_autofill_port) { PubSubReader p1(TEST_TOPIC_NAME); @@ -1188,6 +1188,174 @@ TEST_P(TransportTCP, send_resource_cleanup_initial_peer) client->wait_discovery(2, std::chrono::seconds(0)); } +<<<<<<< HEAD +======= +// Test TCP transport on large message with best effort reliability +TEST_P(TransportTCP, large_message_send_receive) +{ + // Prepare data to be sent before participants discovery so it is ready to be sent as soon as possible. + std::list data; + data = default_data300kb_data_generator(1); + + uint16_t writer_port = global_port; + + /* Test configuration */ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + std::shared_ptr writer_transport; + std::shared_ptr reader_transport; + Locator_t initialPeerLocator; + if (use_ipv6) + { + reader_transport = std::make_shared(); + writer_transport = std::make_shared(); + initialPeerLocator.kind = LOCATOR_KIND_TCPv6; + IPLocator::setIPv6(initialPeerLocator, "::1"); + } + else + { + reader_transport = std::make_shared(); + writer_transport = std::make_shared(); + initialPeerLocator.kind = LOCATOR_KIND_TCPv4; + IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1); + } + writer_transport->tcp_negotiation_timeout = 100; + reader_transport->tcp_negotiation_timeout = 100; + + // Add listener port to server + writer_transport->add_listener_port(writer_port); + + // Add initial peer to client + initialPeerLocator.port = writer_port; + LocatorList_t initial_peer_list; + initial_peer_list.push_back(initialPeerLocator); + + // Setup participants + writer.disable_builtin_transport() + .add_user_transport_to_pparams(writer_transport); + + reader.disable_builtin_transport() + .initial_peers(initial_peer_list) + .add_user_transport_to_pparams(reader_transport); + + // Init participants + writer.init(); + reader.init(); + ASSERT_TRUE(writer.isInitialized()); + ASSERT_TRUE(reader.isInitialized()); + + // Wait for discovery + writer.wait_discovery(1, std::chrono::seconds(0)); + reader.wait_discovery(std::chrono::seconds(0), 1); + + // Send and receive data + reader.startReception(data); + + writer.send(data); + EXPECT_TRUE(data.empty()); + + reader.block_for_all(); +} + +// Test TCP transport on large message with best effort reliability and LARGE_DATA mode +TEST_P(TransportTCP, large_message_large_data_send_receive) +{ + // Prepare data to be sent. before participants discovery so it is ready to be sent as soon as possible. + // The writer might try to send the data before the reader has negotiated the connection. + // If the negotiation timeout is too short, the writer will fail to send the data and the reader will not receive it. + // LARGE_DATA participant discovery is tipically faster than tcp negotiation. + std::list data; + data = default_data300kb_data_generator(1); + + /* Test configuration */ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + uint32_t tcp_negotiation_timeout = 100; + writer.setup_large_data_tcp(use_ipv6, 0, tcp_negotiation_timeout); + reader.setup_large_data_tcp(use_ipv6, 0, tcp_negotiation_timeout); + + // Init participants + writer.init(); + reader.init(); + ASSERT_TRUE(writer.isInitialized()); + ASSERT_TRUE(reader.isInitialized()); + + // Wait for discovery + writer.wait_discovery(1, std::chrono::seconds(0)); + reader.wait_discovery(std::chrono::seconds(0), 1); + + // Send and receive data + reader.startReception(data); + + writer.send(data); + EXPECT_TRUE(data.empty()); + + reader.block_for_all(); +} + +// Test CreateInitialConnection for TCP +TEST_P(TransportTCP, TCP_initial_peers_connection) +{ + PubSubWriter p1(TEST_TOPIC_NAME); + PubSubReader p2(TEST_TOPIC_NAME); + PubSubReader p3(TEST_TOPIC_NAME); + + // Add TCP Transport with listening port + auto p1_transport = std::make_shared(); + p1_transport->add_listener_port(global_port); + auto p2_transport = std::make_shared(); + p2_transport->add_listener_port(global_port + 1); + auto p3_transport = std::make_shared(); + p3_transport->add_listener_port(global_port - 1); + + // Add initial peer to client + Locator_t initialPeerLocator; + initialPeerLocator.kind = LOCATOR_KIND_TCPv4; + IPLocator::setIPv4(initialPeerLocator, 127, 0, 0, 1); + initialPeerLocator.port = global_port; + LocatorList_t initial_peer_list; + initial_peer_list.push_back(initialPeerLocator); + + // Setup participants + p1.disable_builtin_transport() + .add_user_transport_to_pparams(p1_transport); + + p2.disable_builtin_transport() + .initial_peers(initial_peer_list) + .add_user_transport_to_pparams(p2_transport); + + p3.disable_builtin_transport() + .initial_peers(initial_peer_list) + .add_user_transport_to_pparams(p3_transport); + + // Init participants + p1.init(); + p2.init(); + p3.init(); + ASSERT_TRUE(p1.isInitialized()); + ASSERT_TRUE(p2.isInitialized()); + ASSERT_TRUE(p3.isInitialized()); + + // Wait for discovery + p1.wait_discovery(2, std::chrono::seconds(0)); + p2.wait_discovery(std::chrono::seconds(0), 1); + p3.wait_discovery(std::chrono::seconds(0), 1); + + // Send and receive data + auto data = default_helloworld_data_generator(); + p2.startReception(data); + p3.startReception(data); + + p1.send(data); + EXPECT_TRUE(data.empty()); + + p2.block_for_all(); + p3.block_for_all(); +} + +>>>>>>> a0a4feeff (Create InitialConnection for TCP initial peers 2.13 & 2.10 & 2.6 (#4947)) #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else