Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[22033] Release participant_stateless secure builtin writer history change when authentication has finished (backport #5386) #5391

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions src/cpp/rtps/security/SecurityManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1074,11 +1074,11 @@ void SecurityManager::delete_participant_stateless_message_entities()
void SecurityManager::create_participant_stateless_message_pool()
{
participant_stateless_message_writer_hattr_ =
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize(), 20, 100 };
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE, 20, 100};
participant_stateless_message_reader_hattr_ =
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize(), 10, 5000 };
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE, 10, 5000};

BasicPoolConfig cfg{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize() };
BasicPoolConfig cfg{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE};
participant_stateless_message_pool_ = TopicPayloadPoolRegistry::get("DCPSParticipantStatelessMessage", cfg);

PoolConfig writer_cfg = PoolConfig::from_history_attributes(participant_stateless_message_writer_hattr_);
Expand Down Expand Up @@ -1232,7 +1232,7 @@ void SecurityManager::delete_participant_volatile_message_secure_entities()
void SecurityManager::create_participant_volatile_message_secure_pool()
{
participant_volatile_message_secure_hattr_ =
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, participant_->getMaxMessageSize(), 10, 0 };
{ PREALLOCATED_WITH_REALLOC_MEMORY_MODE, PARTICIPANT_VOLATILE_MESSAGE_PAYLOAD_DEFAULT_SIZE, 10, 0 };

PoolConfig pool_cfg = PoolConfig::from_history_attributes(participant_volatile_message_secure_hattr_);
participant_volatile_message_secure_pool_ =
Expand Down Expand Up @@ -1731,6 +1731,7 @@ void SecurityManager::process_participant_volatile_message_secure(
const GUID_t remote_participant_key(message.message_identity().source_guid().guidPrefix,
c_EntityId_RTPSParticipant);
std::shared_ptr<ParticipantCryptoHandle> remote_participant_crypto;
DiscoveredParticipantInfo::AuthUniquePtr remote_participant_info;

// Search remote participant crypto handle.
{
Expand All @@ -1746,6 +1747,7 @@ void SecurityManager::process_participant_volatile_message_secure(
}

remote_participant_crypto = dp_it->second->get_participant_crypto();
remote_participant_info = dp_it->second->get_auth();
}
else
{
Expand All @@ -1767,12 +1769,30 @@ void SecurityManager::process_participant_volatile_message_secure(
EPROSIMA_LOG_ERROR(SECURITY, "Cannot set remote participant crypto tokens ("
<< remote_participant_key << ") - (" << exception.what() << ")");
}
else
{
// Release the change from the participant_stateless_message_writer_pool_
// As both participants have already authorized each other

if (remote_participant_info &&
remote_participant_info->change_sequence_number_ != SequenceNumber_t::unknown())
{
participant_stateless_message_writer_history_->remove_change(
remote_participant_info->change_sequence_number_);
remote_participant_info->change_sequence_number_ = SequenceNumber_t::unknown();
}
}
}
else
{
std::lock_guard<shared_mutex> _(mutex_);
remote_participant_pending_messages_.emplace(remote_participant_key, std::move(message.message_data()));
}

if (remote_participant_info)
{
restore_discovered_participant_info(remote_participant_key, remote_participant_info);
}
}
else if (message.message_class_id().compare(GMCLASSID_SECURITY_READER_CRYPTO_TOKENS) == 0)
{
Expand Down Expand Up @@ -1928,7 +1948,7 @@ void SecurityManager::process_participant_volatile_message_secure(
}
else
{
EPROSIMA_LOG_INFO(SECURITY, "Discarted ParticipantGenericMessage with class id " << message.message_class_id());
EPROSIMA_LOG_INFO(SECURITY, "Discarded ParticipantGenericMessage with class id " << message.message_class_id());
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/cpp/rtps/security/SecurityManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ struct EndpointSecurityAttributes;
*/
class SecurityManager : private WriterListener
{
static constexpr std::size_t PARTICIPANT_STATELESS_MESSAGE_PAYLOAD_DEFAULT_SIZE = 8192;
static constexpr std::size_t PARTICIPANT_VOLATILE_MESSAGE_PAYLOAD_DEFAULT_SIZE = 1024;

public:

/**
Expand Down Expand Up @@ -403,14 +406,19 @@ class SecurityManager : private WriterListener
}

AuthenticationInfo(
AuthenticationInfo&& auth)
AuthenticationInfo&& auth) noexcept
: identity_handle_(std::move(auth.identity_handle_))
, handshake_handle_(std::move(auth.handshake_handle_))
, auth_status_(auth.auth_status_)
, expected_sequence_number_(auth.expected_sequence_number_)
, change_sequence_number_(std::move(auth.change_sequence_number_))
, event_(std::move(auth.event_))
{
auth.identity_handle_ = nullptr;
auth.handshake_handle_ = nullptr;
auth.auth_status_ = AUTHENTICATION_NOT_AVAILABLE;
auth.expected_sequence_number_ = 0;
auth.change_sequence_number_ = SequenceNumber_t::unknown();
}

int32_t handshake_requests_sent_;
Expand Down
31 changes: 26 additions & 5 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -858,16 +858,28 @@ class PubSubReader
}

#if HAVE_SECURITY
void waitAuthorized()
void waitAuthorized(
std::chrono::seconds timeout = std::chrono::seconds::zero(),
unsigned int expected = 1)
{
std::unique_lock<std::mutex> lock(mutexAuthentication_);

std::cout << "Reader is waiting authorization..." << std::endl;

cvAuthentication_.wait(lock, [&]() -> bool
{
return authorized_ > 0;
});
if (timeout == std::chrono::seconds::zero())
{
cvAuthentication_.wait(lock, [&]()
{
return authorized_ >= expected;
});
}
else
{
cvAuthentication_.wait_for(lock, timeout, [&]()
{
return authorized_ >= expected;
});
}

std::cout << "Reader authorization finished..." << std::endl;
}
Expand Down Expand Up @@ -1173,6 +1185,15 @@ class PubSubReader
return *this;
}

PubSubReader& participants_allocation_properties(
size_t initial,
size_t maximum)
{
participant_qos_.allocation().participants.initial = initial;
participant_qos_.allocation().participants.maximum = maximum;
return *this;
}

PubSubReader& expect_no_allocs()
{
// TODO(Mcc): Add no allocations check code when feature is completely ready
Expand Down
31 changes: 26 additions & 5 deletions test/blackbox/api/dds-pim/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -743,16 +743,28 @@ class PubSubWriter
}

#if HAVE_SECURITY
void waitAuthorized()
void waitAuthorized(
std::chrono::seconds timeout = std::chrono::seconds::zero(),
unsigned int expected = 1)
{
std::unique_lock<std::mutex> lock(mutexAuthentication_);

std::cout << "Writer is waiting authorization..." << std::endl;

cvAuthentication_.wait(lock, [&]() -> bool
{
return authorized_ > 0;
});
if (timeout == std::chrono::seconds::zero())
{
cvAuthentication_.wait(lock, [&]()
{
return authorized_ >= expected;
});
}
else
{
cvAuthentication_.wait_for(lock, timeout, [&]()
{
return authorized_ >= expected;
});
}

std::cout << "Writer authorization finished..." << std::endl;
}
Expand Down Expand Up @@ -1137,6 +1149,15 @@ class PubSubWriter
return *this;
}

PubSubWriter& participants_allocation_properties(
size_t initial,
size_t maximum)
{
participant_qos_.allocation().participants.initial = initial;
participant_qos_.allocation().participants.maximum = maximum;
return *this;
}

PubSubWriter& expect_no_allocs()
{
// TODO(Mcc): Add no allocations check code when feature is completely ready
Expand Down
110 changes: 106 additions & 4 deletions test/blackbox/common/BlackboxTestsSecurity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3530,11 +3530,11 @@ TEST_F(SecurityPkcs, BuiltinAuthenticationAndAccessAndCryptoPlugin_pkcs11_key)

static void CommonPermissionsConfigure(
PubSubReader<HelloWorldPubSubType>& reader,
PubSubWriter<HelloWorldPubSubType>& writer,
const std::string& governance_file,
const std::string& permissions_file)
const std::string& permissions_file,
const PropertyPolicy& extra_properties)
{
PropertyPolicy sub_property_policy;
PropertyPolicy sub_property_policy(extra_properties);
sub_property_policy.properties().emplace_back(Property("dds.sec.auth.plugin",
"builtin.PKI-DH"));
sub_property_policy.properties().emplace_back(Property("dds.sec.auth.builtin.PKI-DH.identity_ca",
Expand All @@ -3553,9 +3553,18 @@ static void CommonPermissionsConfigure(
"file://" + std::string(certs_path) + "/" + governance_file));
sub_property_policy.properties().emplace_back(Property("dds.sec.access.builtin.Access-Permissions.permissions",
"file://" + std::string(certs_path) + "/" + permissions_file));

reader.property_policy(sub_property_policy);
}

static void CommonPermissionsConfigure(
PubSubWriter<HelloWorldPubSubType>& writer,
const std::string& governance_file,
const std::string& permissions_file,
const PropertyPolicy& extra_properties)
{
PropertyPolicy pub_property_policy(extra_properties);

PropertyPolicy pub_property_policy;
pub_property_policy.properties().emplace_back(Property("dds.sec.auth.plugin",
"builtin.PKI-DH"));
pub_property_policy.properties().emplace_back(Property("dds.sec.auth.builtin.PKI-DH.identity_ca",
Expand All @@ -3574,9 +3583,21 @@ static void CommonPermissionsConfigure(
"file://" + std::string(certs_path) + "/" + governance_file));
pub_property_policy.properties().emplace_back(Property("dds.sec.access.builtin.Access-Permissions.permissions",
"file://" + std::string(certs_path) + "/" + permissions_file));

writer.property_policy(pub_property_policy);
}

static void CommonPermissionsConfigure(
PubSubReader<HelloWorldPubSubType>& reader,
PubSubWriter<HelloWorldPubSubType>& writer,
const std::string& governance_file,
const std::string& permissions_file,
const PropertyPolicy& extra_properties = PropertyPolicy())
{
CommonPermissionsConfigure(reader, governance_file, permissions_file, extra_properties);
CommonPermissionsConfigure(writer, governance_file, permissions_file, extra_properties);
}

static void BuiltinAuthenticationAndAccessAndCryptoPlugin_Permissions_validation_ok_common(
PubSubReader<HelloWorldPubSubType>& reader,
PubSubWriter<HelloWorldPubSubType>& writer,
Expand Down Expand Up @@ -5088,6 +5109,87 @@ TEST(Security, security_with_initial_peers_over_tcpv4_correctly_behaves)
tcp_server.block_for_all(std::chrono::seconds(10));
}

// Regression test for Redmine issue #22033
// Authorized participants shall remove the changes from the
// participants secure stateless msgs pool
TEST(Security, participant_stateless_secure_writer_pool_change_is_removed_upon_participant_authentication)
{
struct TestConsumer : public eprosima::fastdds::dds::LogConsumer
{
TestConsumer(
std::atomic_size_t& n_logs_ref)
: n_logs_(n_logs_ref)
{
}

void Consume(
const eprosima::fastdds::dds::Log::Entry&) override
{
++n_logs_;
}

private:

std::atomic_size_t& n_logs_;
};

// Counter for log entries
std::atomic<size_t>n_logs{};

// Prepare Log module to check that no SECURITY errors are produced
eprosima::fastdds::dds::Log::SetCategoryFilter(std::regex("SECURITY"));
eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Kind::Error);
eprosima::fastdds::dds::Log::RegisterConsumer(std::unique_ptr<eprosima::fastdds::dds::LogConsumer>(new TestConsumer(
n_logs)));

const size_t n_participants = 20;

// Create 21 secure participants
std::vector<std::shared_ptr<PubSubReader<HelloWorldPubSubType>>> participants;
participants.reserve(n_participants + 1);

for (size_t i = 0; i < n_participants + 1; ++i)
{
participants.emplace_back(std::make_shared<PubSubReader<HelloWorldPubSubType>>("HelloWorldTopic"));
// Configure security
const std::string governance_file("governance_helloworld_all_enable.smime");
const std::string permissions_file("permissions_helloworld.smime");

PropertyPolicy handshake_prop_policy;

handshake_prop_policy.properties().emplace_back(Property("dds.sec.auth.builtin.PKI-DH.max_handshake_requests",
"10000000"));
handshake_prop_policy.properties().emplace_back(Property(
"dds.sec.auth.builtin.PKI-DH.initial_handshake_resend_period",
"250"));
handshake_prop_policy.properties().emplace_back(Property(
"dds.sec.auth.builtin.PKI-DH.handshake_resend_period_gain",
"1.0"));

CommonPermissionsConfigure(*participants.back(), governance_file, permissions_file, handshake_prop_policy);

// Init all except the latest one
if (i != n_participants)
{
participants.back()->init();
ASSERT_TRUE(participants.back()->isInitialized());
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}

// Wait for the first participant to authenticate the rest
participants.front()->waitAuthorized(std::chrono::seconds::zero(), n_participants - 1);

// Init the last one
participants.back()->init();
ASSERT_TRUE(participants.back()->isInitialized());

participants.front()->waitAuthorized(std::chrono::seconds::zero(), n_participants);

// No SECURITY error logs should have been produced
eprosima::fastdds::dds::Log::Flush();
EXPECT_EQ(0u, n_logs);
}

void blackbox_security_init()
{
Expand Down
Loading