Skip to content

Commit

Permalink
Fix on_sample_lost notification on best-effort readers for fragmented…
Browse files Browse the repository at this point in the history
… samples (#4187) (#4607)

* Handle errors when setting socket buffer sizes (#4760) (#4795)

* Refs #20972. Move code into new private methods.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Refactor on configure_send_buffer_size.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Refactor on configure_receive_buffer_size.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Check user configuration at the beginning of init method.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Use maxMessageSize as minimum possible value.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Applying changes on OpenAndBindUnicastOutputSocket.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Add helper header with template method.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Configure methods return boolean.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Configure methods use new template method.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. OpenAndBindUnicastOutputSocket uses new template method.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Changes in OpenAndBindInputSocket.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972.Setting options on TCP channels.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Doxygen.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Check limits of configured sizes.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Add UDP unit tests.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Add TCP unit tests.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Move checks in TCP to beginning of init.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Refactor for common code in UDP.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Refactor for common code in TCP.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Remove unused constants in UDP tests.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Check final configuration on unit tests.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Uncrustify.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Less strict tests.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Remove `s_minimumSocketBuffer` from tests.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20972. Deprecate `s_minimumSocketBuffer`.

Signed-off-by: Miguel Company <[email protected]>
Signed-off-by: Miguel Company <[email protected]>

---------

Signed-off-by: Miguel Company <[email protected]>
Signed-off-by: Miguel Company <[email protected]>
Co-authored-by: Miguel Company <[email protected]>
(cherry picked from commit 532acfa)

* Fix conflicts

Signed-off-by: Miguel Company <[email protected]>

* Fix on_sample_lost notification on best-effort readers for fragmented samples (#4187)

* Refs #20162. Regression test.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20162. Notify sample lost when dropping fragmented change.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20167. Linters.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20162. Apply suggestions.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20162. Use constexpr for buffer size.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20162. Lower buffer size.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20351. Uncrustify.

Signed-off-by: Miguel Company <[email protected]>

---------

Signed-off-by: Miguel Company <[email protected]>
(cherry picked from commit 5ac198e)

* Make sample_lost_be_dw_be_dr_fragments test less flaky (#4620)

* Refs #20692. Make sample_lost_be_dw_be_dr_fragments test less flakey.

Signed-off-by: Miguel Company <[email protected]>

* Refs #20692. Uncrustify.

Signed-off-by: Miguel Company <[email protected]>

---------

Signed-off-by: Miguel Company <[email protected]>

* Improve test.

Signed-off-by: Miguel Company <[email protected]>

---------

Signed-off-by: Miguel Company <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: Miguel Company <[email protected]>
  • Loading branch information
mergify[bot] and MiguelCompany authored Jun 28, 2024
1 parent 36fefdb commit b64e40e
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 5 deletions.
20 changes: 20 additions & 0 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,26 @@ bool StatelessReader::processDataFragMsg(
{
if (work_change->sequenceNumber < change_to_add->sequenceNumber)
{
SequenceNumber_t updated_seq = work_change->sequenceNumber;
SequenceNumber_t previous_seq{ 0, 0 };
previous_seq = update_last_notified(writer_guid, updated_seq);

// Notify lost samples
auto listener = getListener();
if (listener != nullptr)
{
if (SequenceNumber_t{ 0, 0 } != previous_seq)
{
assert(previous_seq < updated_seq);
uint64_t tmp = (updated_seq - previous_seq).to64long();
int32_t lost_samples =
tmp > static_cast<uint64_t>(std::numeric_limits<int32_t>::max()) ?
std::numeric_limits<int32_t>::max() : static_cast<int32_t>(tmp);
assert (0 < lost_samples);
listener->on_sample_lost(this, lost_samples);
}
}

// Pending change should be dropped. Check if it can be reused
if (sampleSize <= work_change->serializedPayload.max_size)
{
Expand Down
137 changes: 132 additions & 5 deletions test/blackbox/common/DDSBlackboxTestsListeners.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,16 +677,29 @@ TEST_P(DDSStatus, DataAvailableConditions)
subscriber_reader.wait_waitset_timeout();
}

// We want to ensure that samples are only lost due to the custom filter we have set in sample_lost_test_dw_init.
// Since we are going to send 300KB samples in the test for fragments, let's increase the buffer size to avoid any
// other possible loss.
static constexpr uint32_t SAMPLE_LOST_TEST_BUFFER_SIZE =
300ul * 1024ul // sample size
* 13ul // number of samples
* 2ul; // 2x to avoid any possible loss

template<typename T>
void sample_lost_test_dw_init(
PubSubWriter<HelloWorldPubSubType>& writer)
PubSubWriter<T>& writer)
{
auto testTransport = std::make_shared<test_UDPv4TransportDescriptor>();
testTransport->sendBufferSize = SAMPLE_LOST_TEST_BUFFER_SIZE;
testTransport->receiveBufferSize = SAMPLE_LOST_TEST_BUFFER_SIZE;

testTransport->drop_data_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool
{
uint32_t old_pos = msg.pos;

// see RTPS DDS 9.4.5.3 Data Submessage
EntityId_t readerID, writerID;
EntityId_t readerID;
EntityId_t writerID;
SequenceNumber_t sn;

msg.pos += 2; // flags
Expand Down Expand Up @@ -714,6 +727,43 @@ void sample_lost_test_dw_init(

return false;
};
testTransport->drop_data_frag_messages_filter_ = [](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool
{
uint32_t old_pos = msg.pos;

// see RTPS DDS 9.4.5.4 DataFrag Submessage
EntityId_t readerID;
EntityId_t writerID;
SequenceNumber_t sn;
uint32_t first_fragment = 0;

msg.pos += 2; // flags
msg.pos += 2; // octets to inline quos
CDRMessage::readEntityId(&msg, &readerID);
CDRMessage::readEntityId(&msg, &writerID);
CDRMessage::readSequenceNumber(&msg, &sn);
CDRMessage::readUInt32(&msg, &first_fragment);

// restore buffer pos
msg.pos = old_pos;

// generate losses
if ((writerID.value[3] & 0xC0) == 0 // only user endpoints
&& (1 == first_fragment) // only first fragment
&& (sn == SequenceNumber_t{0, 2} ||
sn == SequenceNumber_t(0, 3) ||
sn == SequenceNumber_t(0, 4) ||
sn == SequenceNumber_t(0, 6) ||
sn == SequenceNumber_t(0, 8) ||
sn == SequenceNumber_t(0, 10) ||
sn == SequenceNumber_t(0, 11) ||
sn == SequenceNumber_t(0, 13)))
{
return true;
}

return false;
};


writer.disable_builtin_transport()
Expand All @@ -724,8 +774,9 @@ void sample_lost_test_dw_init(

}

template<typename T>
void sample_lost_test_dr_init(
PubSubReader<HelloWorldPubSubType>& reader,
PubSubReader<T>& reader,
std::function<void(const eprosima::fastdds::dds::SampleLostStatus& status)> functor)
{
reader.sample_lost_status_functor(functor)
Expand All @@ -734,11 +785,15 @@ void sample_lost_test_dr_init(
ASSERT_TRUE(reader.isInitialized());
}

template<typename T>
void sample_lost_test_init(
PubSubReader<HelloWorldPubSubType>& reader,
PubSubWriter<HelloWorldPubSubType>& writer,
PubSubReader<T>& reader,
PubSubWriter<T>& writer,
std::function<void(const eprosima::fastdds::dds::SampleLostStatus& status)> functor)
{
reader.socket_buffer_size(SAMPLE_LOST_TEST_BUFFER_SIZE);
writer.socket_buffer_size(SAMPLE_LOST_TEST_BUFFER_SIZE);

sample_lost_test_dw_init(writer);
sample_lost_test_dr_init(reader, functor);

Expand Down Expand Up @@ -805,6 +860,78 @@ TEST(DDSStatus, sample_lost_be_dw_be_dr)
});
}

/*!
* \test DDS-STS-SLS-01 Test `SampleLostStatus` in a Best-Effort DataWriter and a Best-Effort DataReader communication.
* This is also a regression test for bug redmine 20162
*/
TEST(DDSStatus, sample_lost_be_dw_be_dr_fragments)
{
PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
PubSubWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME);

std::mutex test_step_mtx;
std::condition_variable test_step_cv;
uint8_t test_step = 0;

writer.reliability(eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS);
reader.reliability(eprosima::fastdds::dds::BEST_EFFORT_RELIABILITY_QOS);

sample_lost_test_init(reader, writer, [&test_step_mtx, &test_step_cv, &test_step](
const eprosima::fastdds::dds::SampleLostStatus& status)
{
{
std::unique_lock<std::mutex> lock(test_step_mtx);
std::cout << status.total_count << " " << status.total_count_change << std::endl;
if (0 == test_step && 1 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else if (1 == test_step && 2 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else if (2 == test_step && 3 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else if (3 == test_step && 4 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else if (4 == test_step && 5 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else if (5 == test_step && 6 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else if (6 == test_step && 7 == status.total_count && 1 == status.total_count_change)
{
++test_step;
}
else
{
test_step = 0;
}
}

test_step_cv.notify_all();
});


auto data = default_data300kb_data_generator(13);

reader.startReception(data);
writer.send(data, 100);

std::unique_lock<std::mutex> lock(test_step_mtx);
test_step_cv.wait(lock, [&test_step]()
{
return 7 == test_step;
});
}

/*!
* \test DDS-STS-SLS-02 Test `SampleLostStatus` in a Best-Effort DataWriter and a late-joiner Best-Effort DataReader
* communication.
Expand Down

0 comments on commit b64e40e

Please sign in to comment.