diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index f340da0f858..3acff60c526 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -612,6 +612,26 @@ bool StatelessReader::processDataFragMsg( { if (work_change->sequenceNumber < change_to_add->sequenceNumber) { + SequenceNumber_t updated_seq = work_change->sequenceNumber; + SequenceNumber_t previous_seq{ 0, 0 }; + previous_seq = update_last_notified(writer_guid, updated_seq); + + // Notify lost samples + auto listener = getListener(); + if (listener != nullptr) + { + if (SequenceNumber_t{ 0, 0 } != previous_seq) + { + assert(previous_seq < updated_seq); + uint64_t tmp = (updated_seq - previous_seq).to64long(); + int32_t lost_samples = + tmp > static_cast(std::numeric_limits::max()) ? + std::numeric_limits::max() : static_cast(tmp); + assert (0 < lost_samples); + listener->on_sample_lost(this, lost_samples); + } + } + // Pending change should be dropped. Check if it can be reused if (sampleSize <= work_change->serializedPayload.max_size) { diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index 4a099c579fd..f4c981880dc 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -677,16 +677,29 @@ TEST_P(DDSStatus, DataAvailableConditions) subscriber_reader.wait_waitset_timeout(); } +// We want to ensure that samples are only lost due to the custom filter we have set in sample_lost_test_dw_init. +// Since we are going to send 300KB samples in the test for fragments, let's increase the buffer size to avoid any +// other possible loss. +static constexpr uint32_t SAMPLE_LOST_TEST_BUFFER_SIZE = + 300ul * 1024ul // sample size + * 13ul // number of samples + * 2ul; // 2x to avoid any possible loss + +template void sample_lost_test_dw_init( - PubSubWriter& writer) + PubSubWriter& writer) { auto testTransport = std::make_shared(); + testTransport->sendBufferSize = SAMPLE_LOST_TEST_BUFFER_SIZE; + testTransport->receiveBufferSize = SAMPLE_LOST_TEST_BUFFER_SIZE; + testTransport->drop_data_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool { uint32_t old_pos = msg.pos; // see RTPS DDS 9.4.5.3 Data Submessage - EntityId_t readerID, writerID; + EntityId_t readerID; + EntityId_t writerID; SequenceNumber_t sn; msg.pos += 2; // flags @@ -714,6 +727,43 @@ void sample_lost_test_dw_init( return false; }; + testTransport->drop_data_frag_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool + { + uint32_t old_pos = msg.pos; + + // see RTPS DDS 9.4.5.4 DataFrag Submessage + EntityId_t readerID; + EntityId_t writerID; + SequenceNumber_t sn; + uint32_t first_fragment = 0; + + msg.pos += 2; // flags + msg.pos += 2; // octets to inline quos + CDRMessage::readEntityId(&msg, &readerID); + CDRMessage::readEntityId(&msg, &writerID); + CDRMessage::readSequenceNumber(&msg, &sn); + CDRMessage::readUInt32(&msg, &first_fragment); + + // restore buffer pos + msg.pos = old_pos; + + // generate losses + if ((writerID.value[3] & 0xC0) == 0 // only user endpoints + && (1 == first_fragment) // only first fragment + && (sn == SequenceNumber_t{0, 2} || + sn == SequenceNumber_t(0, 3) || + sn == SequenceNumber_t(0, 4) || + sn == SequenceNumber_t(0, 6) || + sn == SequenceNumber_t(0, 8) || + sn == SequenceNumber_t(0, 10) || + sn == SequenceNumber_t(0, 11) || + sn == SequenceNumber_t(0, 13))) + { + return true; + } + + return false; + }; writer.disable_builtin_transport() @@ -724,8 +774,9 @@ void sample_lost_test_dw_init( } +template void sample_lost_test_dr_init( - PubSubReader& reader, + PubSubReader& reader, std::function functor) { reader.sample_lost_status_functor(functor) @@ -734,11 +785,15 @@ void sample_lost_test_dr_init( ASSERT_TRUE(reader.isInitialized()); } +template void sample_lost_test_init( - PubSubReader& reader, - PubSubWriter& writer, + PubSubReader& reader, + PubSubWriter& writer, std::function functor) { + reader.socket_buffer_size(SAMPLE_LOST_TEST_BUFFER_SIZE); + writer.socket_buffer_size(SAMPLE_LOST_TEST_BUFFER_SIZE); + sample_lost_test_dw_init(writer); sample_lost_test_dr_init(reader, functor); @@ -805,6 +860,78 @@ TEST(DDSStatus, sample_lost_be_dw_be_dr) }); } +/*! + * \test DDS-STS-SLS-01 Test `SampleLostStatus` in a Best-Effort DataWriter and a Best-Effort DataReader communication. + * This is also a regression test for bug redmine 20162 + */ +TEST(DDSStatus, sample_lost_be_dw_be_dr_fragments) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + std::mutex test_step_mtx; + std::condition_variable test_step_cv; + uint8_t test_step = 0; + + writer.reliability(eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS); + reader.reliability(eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS); + + sample_lost_test_init(reader, writer, [&test_step_mtx, &test_step_cv, &test_step]( + const eprosima::fastdds::dds::SampleLostStatus& status) + { + { + std::unique_lock lock(test_step_mtx); + std::cout << status.total_count << " " << status.total_count_change << std::endl; + if (0 == test_step && 1 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (1 == test_step && 2 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (2 == test_step && 3 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (3 == test_step && 4 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (4 == test_step && 5 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (5 == test_step && 6 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else if (6 == test_step && 7 == status.total_count && 1 == status.total_count_change) + { + ++test_step; + } + else + { + test_step = 0; + } + } + + test_step_cv.notify_all(); + }); + + + auto data = default_data300kb_data_generator(13); + + reader.startReception(data); + writer.send(data, 100); + + std::unique_lock lock(test_step_mtx); + test_step_cv.wait(lock, [&test_step]() + { + return 7 == test_step; + }); +} + /*! * \test DDS-STS-SLS-02 Test `SampleLostStatus` in a Best-Effort DataWriter and a late-joiner Best-Effort DataReader * communication.