Skip to content

Commit

Permalink
Allow all durability configurations (#5224)
Browse files Browse the repository at this point in the history
* Refs #21538: Feature

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21538: Refactor make_persistent() and make_transient() in PubSub test API to make it clearer

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21538: Fix unittests

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21538: Add BB tests and refactor DDSPersistence test suite

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21538: Linter

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21538: versions.md

Signed-off-by: Mario Dominguez <[email protected]>

* Refs #21538: Apply Miguel's rev

Signed-off-by: Mario Dominguez <[email protected]>

---------

Signed-off-by: Mario Dominguez <[email protected]>
  • Loading branch information
Mario-DL authored Oct 3, 2024
1 parent 4b968bc commit ed58ca2
Show file tree
Hide file tree
Showing 16 changed files with 293 additions and 69 deletions.
5 changes: 0 additions & 5 deletions src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1940,11 +1940,6 @@ ReturnCode_t DataWriterImpl::check_qos_including_resource_limits(
ReturnCode_t DataWriterImpl::check_qos(
const DataWriterQos& qos)
{
if (qos.durability().kind == PERSISTENT_DURABILITY_QOS)
{
EPROSIMA_LOG_ERROR(RTPS_QOS_CHECK, "PERSISTENT Durability not supported");
return RETCODE_UNSUPPORTED;
}
if (qos.destination_order().kind == BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS)
{
EPROSIMA_LOG_ERROR(RTPS_QOS_CHECK, "BY SOURCE TIMESTAMP DestinationOrder not supported");
Expand Down
5 changes: 0 additions & 5 deletions src/cpp/fastdds/subscriber/DataReaderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1497,11 +1497,6 @@ ReturnCode_t DataReaderImpl::check_qos_including_resource_limits(
ReturnCode_t DataReaderImpl::check_qos(
const DataReaderQos& qos)
{
if (qos.durability().kind == PERSISTENT_DURABILITY_QOS)
{
EPROSIMA_LOG_ERROR(DDS_QOS_CHECK, "PERSISTENT Durability not supported");
return RETCODE_UNSUPPORTED;
}
if (qos.destination_order().kind == BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS)
{
EPROSIMA_LOG_ERROR(DDS_QOS_CHECK, "BY SOURCE TIMESTAMP DestinationOrder not supported");
Expand Down
5 changes: 0 additions & 5 deletions src/cpp/fastdds/topic/TopicImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ ReturnCode_t TopicImpl::check_qos_including_resource_limits(
ReturnCode_t TopicImpl::check_qos(
const TopicQos& qos)
{
if (PERSISTENT_DURABILITY_QOS == qos.durability().kind)
{
EPROSIMA_LOG_ERROR(DDS_QOS_CHECK, "PERSISTENT Durability not supported");
return RETCODE_UNSUPPORTED;
}
if (BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS == qos.destination_order().kind)
{
EPROSIMA_LOG_ERROR(DDS_QOS_CHECK, "BY SOURCE TIMESTAMP DestinationOrder not supported");
Expand Down
7 changes: 5 additions & 2 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2456,8 +2456,11 @@ bool RTPSParticipantImpl::get_persistence_service(
{
if (param.persistence_guid == c_Guid_Unknown)
{
EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "Cannot create persistence service. Persistence GUID not specified");
return false;
EPROSIMA_LOG_WARNING(RTPS_PARTICIPANT,
"Persistence GUID not specified. Behaving as TRANSIENT_LOCAL");
auto modified_durability_attrs = const_cast<EndpointAttributes&>(param);
modified_durability_attrs.durabilityKind = TRANSIENT_LOCAL;
return true;
}
service = get_persistence_service(param);
if (service == nullptr)
Expand Down
21 changes: 18 additions & 3 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1631,16 +1631,31 @@ class PubSubReader
}

#if HAVE_SQLITE3
PubSubReader& make_transient(
const std::string& filename,
const std::string& persistence_guid)
{
add_persitence_properties(filename, persistence_guid);
durability_kind(eprosima::fastdds::dds::TRANSIENT_DURABILITY_QOS);
return *this;
}

PubSubReader& make_persistent(
const std::string& filename,
const std::string& persistence_guid)
{
add_persitence_properties(filename, persistence_guid);
durability_kind(eprosima::fastdds::dds::PERSISTENT_DURABILITY_QOS);
return *this;
}

void add_persitence_properties(
const std::string& filename,
const std::string& persistence_guid)
{
participant_qos_.properties().properties().emplace_back("dds.persistence.plugin", "builtin.SQLITE3");
participant_qos_.properties().properties().emplace_back("dds.persistence.sqlite3.filename", filename);
datareader_qos_.durability().kind = eprosima::fastdds::dds::TRANSIENT_DURABILITY_QOS;
datareader_qos_.properties().properties().emplace_back("dds.persistence.guid", persistence_guid);

return *this;
}

#endif // if HAVE_SQLITE3
Expand Down
21 changes: 18 additions & 3 deletions test/blackbox/api/dds-pim/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1708,16 +1708,31 @@ class PubSubWriter
}

#if HAVE_SQLITE3
PubSubWriter& make_transient(
const std::string& filename,
const std::string& persistence_guid)
{
add_persitence_properties(filename, persistence_guid);
durability_kind(eprosima::fastdds::dds::TRANSIENT_DURABILITY_QOS);
return *this;
}

PubSubWriter& make_persistent(
const std::string& filename,
const std::string& persistence_guid)
{
add_persitence_properties(filename, persistence_guid);
durability_kind(eprosima::fastdds::dds::PERSISTENT_DURABILITY_QOS);
return *this;
}

void add_persitence_properties(
const std::string& filename,
const std::string& persistence_guid)
{
participant_qos_.properties().properties().emplace_back("dds.persistence.plugin", "builtin.SQLITE3");
participant_qos_.properties().properties().emplace_back("dds.persistence.sqlite3.filename", filename);
datawriter_qos_.durability().kind = eprosima::fastdds::dds::TRANSIENT_DURABILITY_QOS;
datawriter_qos_.properties().properties().emplace_back("dds.persistence.guid", persistence_guid);

return *this;
}

#endif // if HAVE_SQLITE3
Expand Down
8 changes: 4 additions & 4 deletions test/blackbox/common/DDSBlackboxTestsListeners.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1246,9 +1246,9 @@ TEST(DDSStatus, sample_lost_re_dw_re_persistence_dr)
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);

writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.make_persistent(db_file_name, "67.62.79.64.75.62.5f.60.75.72.73.5f|76.65.79.74");
.make_transient(db_file_name, "67.62.79.64.75.62.5f.60.75.72.73.5f|76.65.79.74");
reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.make_persistent(db_file_name, "67.62.79.64.75.62.5f.60.75.72.73.5f|76.65.79.72");
.make_transient(db_file_name, "67.62.79.64.75.62.5f.60.75.72.73.5f|76.65.79.72");


std::mutex test_step_mtx;
Expand Down Expand Up @@ -1651,9 +1651,9 @@ TEST(DDSStatus, sample_lost_waitset_re_dw_re_persistence_dr)
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);

writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.make_persistent(db_file_name, "67.62.79.64.75.62.5f.60.75.72.73.5f|76.65.79.74");
.make_transient(db_file_name, "67.62.79.64.75.62.5f.60.75.72.73.5f|76.65.79.74");
reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.make_persistent(db_file_name, "67.62.79.64.75.62.5f.60.75.72.73.5f|76.65.79.72");
.make_transient(db_file_name, "67.62.79.64.75.62.5f.60.75.72.73.5f|76.65.79.72");

std::mutex test_step_mtx;
std::condition_variable test_step_cv;
Expand Down
147 changes: 130 additions & 17 deletions test/blackbox/common/DDSBlackboxTestsPersistence.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ enum communication_type
DATASHARING
};

class PersistenceLargeData : public testing::TestWithParam<communication_type>
class DDSPersistenceTests : public testing::TestWithParam<communication_type>
{
public:

Expand Down Expand Up @@ -113,7 +113,7 @@ class PersistenceLargeData : public testing::TestWithParam<communication_type>
.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS)
.resource_limits_max_samples(100)
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.make_persistent(db_file_name(), "77.72.69.74.65.72.5f.70.65.72.73.5f|67.75.69.64")
.make_transient(db_file_name(), "77.72.69.74.65.72.5f.70.65.72.73.5f|67.75.69.64")
.disable_builtin_transport()
.add_user_transport_to_pparams(testTransport)
.init();
Expand Down Expand Up @@ -160,17 +160,17 @@ class PersistenceLargeData : public testing::TestWithParam<communication_type>

};

TEST_P(PersistenceLargeData, PubSubAsReliablePubPersistentWithFrag)
TEST_P(DDSPersistenceTests, PubSubAsReliablePubTransientWithFrag)
{
fragment_data(true);
}

TEST_P(PersistenceLargeData, PubSubAsReliablePubPersistentNoFrag)
TEST_P(DDSPersistenceTests, PubSubAsReliablePubTransientNoFrag)
{
fragment_data(false);
}

TEST_P(PersistenceLargeData, PubSubAsReliablePubPersistentWithLifespanBefore)
TEST_P(DDSPersistenceTests, PubSubAsReliablePubTransientWithLifespanBefore)
{
PubSubWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME);
PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
Expand All @@ -179,7 +179,7 @@ TEST_P(PersistenceLargeData, PubSubAsReliablePubPersistentWithLifespanBefore)
.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS)
.resource_limits_max_samples(100)
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.make_persistent(db_file_name(), "77.72.69.74.65.72.5f.70.65.72.73.5f|67.75.69.64")
.make_transient(db_file_name(), "77.72.69.74.65.72.5f.70.65.72.73.5f|67.75.69.64")
.lifespan_period({1, 0})
.init();

Expand Down Expand Up @@ -221,7 +221,7 @@ TEST_P(PersistenceLargeData, PubSubAsReliablePubPersistentWithLifespanBefore)
ASSERT_EQ(0u, reader.block_for_all(std::chrono::seconds(1)));
}

TEST_P(PersistenceLargeData, PubSubAsReliablePubPersistentWithLifespanSendingBefore)
TEST_P(DDSPersistenceTests, PubSubAsReliablePubTransientWithLifespanSendingBefore)
{
PubSubWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME);
PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
Expand All @@ -230,7 +230,7 @@ TEST_P(PersistenceLargeData, PubSubAsReliablePubPersistentWithLifespanSendingBef
.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS)
.resource_limits_max_samples(100)
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.make_persistent(db_file_name(), "77.72.69.74.65.72.5f.70.65.72.73.5f|67.75.69.64")
.make_transient(db_file_name(), "77.72.69.74.65.72.5f.70.65.72.73.5f|67.75.69.64")
.lifespan_period({0, 100})
.init();

Expand Down Expand Up @@ -280,7 +280,7 @@ TEST_P(PersistenceLargeData, PubSubAsReliablePubPersistentWithLifespanSendingBef
ASSERT_EQ(0u, reader.block_for_all(std::chrono::seconds(1)));
}

TEST_P(PersistenceLargeData, PubSubAsReliablePubPersistentWithLifespanAfter)
TEST_P(DDSPersistenceTests, PubSubAsReliablePubTransientWithLifespanAfter)
{
PubSubWriter<Data1mbPubSubType> writer(TEST_TOPIC_NAME);
PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
Expand All @@ -289,7 +289,7 @@ TEST_P(PersistenceLargeData, PubSubAsReliablePubPersistentWithLifespanAfter)
.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS)
.resource_limits_max_samples(100)
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.make_persistent(db_file_name(), "77.72.69.74.65.72.5f.70.65.72.73.5f|67.75.69.64")
.make_transient(db_file_name(), "77.72.69.74.65.72.5f.70.65.72.73.5f|67.75.69.64")
.lifespan_period({1, 0})
.init();

Expand Down Expand Up @@ -332,7 +332,7 @@ TEST_P(PersistenceLargeData, PubSubAsReliablePubPersistentWithLifespanAfter)
ASSERT_EQ(0u, reader.block_for_all(std::chrono::seconds(1)));
}

TEST_P(PersistenceLargeData, PubSubAsReliablePubPersistentWithStaticDiscovery)
TEST_P(DDSPersistenceTests, PubSubAsReliablePubTransientWithStaticDiscovery)
{
char* value = nullptr;
std::string TOPIC_RANDOM_NUMBER;
Expand Down Expand Up @@ -398,7 +398,7 @@ TEST_P(PersistenceLargeData, PubSubAsReliablePubPersistentWithStaticDiscovery)
writer
.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS)
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.make_persistent(db_file_name(), "78.73.69.74.65.72.5f.70.65.72.73.5f|67.75.69.1")
.make_transient(db_file_name(), "78.73.69.74.65.72.5f.70.65.72.73.5f|67.75.69.1")
.static_discovery("file://PubSubWriterPersistence_static_disc.xml")
.unicastLocatorList(WriterUnicastLocators)
.multicastLocatorList(WriterMulticastLocators)
Expand Down Expand Up @@ -426,7 +426,7 @@ TEST_P(PersistenceLargeData, PubSubAsReliablePubPersistentWithStaticDiscovery)
.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS)
.history_depth(10)
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.make_persistent(db_file_name(), "78.73.69.74.65.72.5f.70.65.72.73.5f|67.75.69.3")
.make_transient(db_file_name(), "78.73.69.74.65.72.5f.70.65.72.73.5f|67.75.69.3")
.static_discovery("file://PubSubReaderPersistence_static_disc.xml")
.unicastLocatorList(ReaderUnicastLocators)
.multicastLocatorList(ReaderMulticastLocators)
Expand All @@ -453,7 +453,6 @@ TEST_P(PersistenceLargeData, PubSubAsReliablePubPersistentWithStaticDiscovery)
// Wait expecting not receiving data.
ASSERT_EQ(10u, reader.block_for_all(std::chrono::seconds(1)));


// Destroy the DataWriter
writer.destroy();
reader.stopReception();
Expand All @@ -473,16 +472,130 @@ TEST_P(PersistenceLargeData, PubSubAsReliablePubPersistentWithStaticDiscovery)
}


TEST_P(DDSPersistenceTests, PubSubAsReliablePubPersistentBehavesAsTransient)
{
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);

writer
.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS)
.resource_limits_max_samples(100)
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
// A PERSISTENT writer with a persistence guid must behave as TRANSIENT
.make_persistent(db_file_name(), "77.72.69.74.65.72.5f.70.65.72.73.5f|67.75.69.64")
.init();

ASSERT_TRUE(writer.isInitialized());

auto data = default_helloworld_data_generator();
auto received_data = data;

// Send data
writer.send(data);
// All data should be sent
ASSERT_TRUE(data.empty());
// Destroy the DataWriter
writer.destroy();

reader
.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS)
.history_depth(10)
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
// A TRANSIENT reader with no persistence guid should behave as TRANSIENT_LOCAL
.durability_kind(eprosima::fastdds::dds::TRANSIENT_DURABILITY_QOS)
.init();

ASSERT_TRUE(reader.isInitialized());

// Load the transient DataWriter with the changes saved in the database
writer.init();

ASSERT_TRUE(writer.isInitialized());

// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

reader.startReception(received_data);

// Wait expecting receiving all data.
reader.block_for_all();
}

TEST_P(DDSPersistenceTests, PubSubAsReliablePubTransientWithNoPersistenceGUIDBehavesAsTransientLocal)
{
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);

writer
.history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS)
.resource_limits_max_samples(100)
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
// A TRANSIENT writer with a persistence guid must behave as TRANSIENT_LOCAL
.durability_kind(eprosima::fastdds::dds::TRANSIENT_DURABILITY_QOS)
.init();

ASSERT_TRUE(writer.isInitialized());

auto data = default_helloworld_data_generator();
auto received_data = data;

// Send data
writer.send(data);

// All data should be sent
ASSERT_TRUE(data.empty());

reader
.history_kind(eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS)
.history_depth(10)
.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
// A TRANSIENT reader with no persistence guid should behave as TRANSIENT_LOCAL
.durability_kind(eprosima::fastdds::dds::TRANSIENT_DURABILITY_QOS)
.init();

ASSERT_TRUE(reader.isInitialized());

// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

reader.startReception(received_data);

// Wait expecting receiving all data.
reader.block_for_all();

// Recreate the DataWriter and DataReader
writer.destroy();
reader.destroy();

writer.init();
reader.init();

ASSERT_TRUE(writer.isInitialized());
ASSERT_TRUE(reader.isInitialized());

// Reader should not receive any data
// as the writer is not transient
auto unreceived_data = default_helloworld_data_generator();

// Send data
reader.startReception(unreceived_data);

// Wait expecting not receiving data.
ASSERT_EQ(reader.block_for_all(std::chrono::seconds(2)), 0u);
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_CASE_P(x, y, z, w)
#endif // ifdef INSTANTIATE_TEST_SUITE_P

GTEST_INSTANTIATE_TEST_MACRO(PersistenceLargeData,
PersistenceLargeData,
GTEST_INSTANTIATE_TEST_MACRO(DDSPersistenceTests,
DDSPersistenceTests,
testing::Values(TRANSPORT, INTRAPROCESS, DATASHARING),
[](const testing::TestParamInfo<PersistenceLargeData::ParamType>& info)
[](const testing::TestParamInfo<DDSPersistenceTests::ParamType>& info)
{
switch (info.param)
{
Expand Down
Loading

0 comments on commit ed58ca2

Please sign in to comment.