Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorrect state of Liveliness of Readers who accept data from a single Writer #4610

Closed
1 task done
i-and opened this issue Mar 24, 2024 · 4 comments · Fixed by #4822
Closed
1 task done

Incorrect state of Liveliness of Readers who accept data from a single Writer #4610

i-and opened this issue Mar 24, 2024 · 4 comments · Fixed by #4822
Assignees
Labels
in progress Issue or PR which is being reviewed

Comments

@i-and
Copy link

i-and commented Mar 24, 2024

Is there an already existing issue for this?

  • I have searched the existing issues

Expected behavior

The correctness of the Liveliness of Readers should not depend on their number.

Current behavior

Incorrect state of Liveliness of Readers who accept data from a single Writer

Steps to reproduce

Developed a test based on the HelloWorldExample code in accordance with the attached patch. In the HelloWorldExample on the subscriber's side, another Reader of the existing HelloWorldTopic was simply added.
To repeat this situation, you need to open two terminals and perform the following sequence of actions.

  1. In terminal 1, execute ./DDSHelloWorldExample publisher -s 0 -i 2000
  2. In terminal 2, execute ./DDSHelloWorldExample subscriber
  3. As a result, two Readers switch to the Alive Liveliness state and accept the Topic from the Writer (see the timestamp 10031.383 on the log below)
  4. Pressing Enter ends the Publisher's work in the first terminal
  5. At time 10036.566, incorrect behavior is observed - the callback for the Reader 0x5c3278c143c0 was not called. At the same time, the Reader 0x5c3278c88670 behaves correctly: alive_count=0 not_alive_count=0 alive_count_change=-1 not_alive_count_change=0
  6. In terminal 1, execute ./DDSHelloWorldExample publisher -s 0 -i 2000
  7. At time 10044.605 the Reader 0x5c3278c143c0 has an incorrect state alive_count=2 not_alive_count=0 alive_count_change=1 not_alive_count_change=0 instead alive_count=1 not_alive_count=0 alive_count_change=1 not_alive_count_change=0
  8. Pressing <Ctrl-C> ends the Publisher's work in the first terminal
  9. At time 10051.306 the Reader 0x5c3278c143c0 has an incorrect state alive_count=1 not_alive_count=1 alive_count_change=-1 not_alive_count_change=1 instead alive_count=0 not_alive_count=1 alive_count_change=-1 not_alive_count_change=1
  10. After 20 seconds (timestamp 10069.707), the Participant timeout is triggered - the Reader's 0x5c3278c143c0 callback is not called. This is incorrect.
  11. In terminal 1, execute ./DDSHelloWorldExample publisher -s 0 -i 2000
  12. At time 10077.248 the Reader 0x5c3278c143c0 has an incorrect state alive_count=2 not_alive_count=1 alive_count_change=1 not_alive_count_change=0 instead alive_count=1 not_alive_count=0 alive_count_change=1 not_alive_count_change=0

Log from terminal 2:

Starting 
Reader 0x5c3278c143c0 for topic HelloWorldTopic has been created.
Reader 0x5c3278c88670 for topic HelloWorldTopic has been created.
Subscriber running. Please press enter to stop the Subscriber
10031.374 s: Reader 0x5c3278c143c0 matched.
10031.374 s: Reader 0x5c3278c88670 matched.
10031.383 s: on_liveliness_changed() called for Reader 0x5c3278c143c0 alive_count=1 not_alive_count=0 alive_count_change=1 not_alive_count_change=0 last_publication_handle=1.f.f7.f3.2a.57.5d.18.0.0.0.0.0.0.1.3
10031.383 s: on_liveliness_changed() called for Reader 0x5c3278c88670 alive_count=1 not_alive_count=0 alive_count_change=1 not_alive_count_change=0 last_publication_handle=1.f.f7.f3.2a.57.5d.18.0.0.0.0.0.0.1.3
10032.565 s: Reader 0x5c3278c88670 receive message: HelloWorld 3 RECEIVED
10032.566 s: Reader 0x5c3278c143c0 receive message: HelloWorld 3 RECEIVED
10034.565 s: Reader 0x5c3278c88670 receive message: HelloWorld 4 RECEIVED
10034.565 s: Reader 0x5c3278c143c0 receive message: HelloWorld 4 RECEIVED
10036.566 s: Reader 0x5c3278c143c0 unmatched.
10036.566 s: on_liveliness_changed() called for Reader 0x5c3278c88670 alive_count=0 not_alive_count=0 alive_count_change=-1 not_alive_count_change=0 last_publication_handle=1.f.f7.f3.2a.57.5d.18.0.0.0.0.0.0.1.3
10036.566 s: Reader 0x5c3278c88670 unmatched.
10043.705 s: Reader 0x5c3278c143c0 matched.
10043.705 s: Reader 0x5c3278c88670 matched.
10044.605 s: on_liveliness_changed() called for Reader 0x5c3278c143c0 alive_count=2 not_alive_count=0 alive_count_change=1 not_alive_count_change=0 last_publication_handle=1.f.f7.f3.3d.57.f0.2a.0.0.0.0.0.0.1.3
10044.605 s: on_liveliness_changed() called for Reader 0x5c3278c88670 alive_count=1 not_alive_count=0 alive_count_change=1 not_alive_count_change=0 last_publication_handle=1.f.f7.f3.3d.57.f0.2a.0.0.0.0.0.0.1.3
10045.706 s: Reader 0x5c3278c88670 receive message: HelloWorld 2 RECEIVED
10045.706 s: Reader 0x5c3278c143c0 receive message: HelloWorld 2 RECEIVED
10047.706 s: Reader 0x5c3278c88670 receive message: HelloWorld 3 RECEIVED
10047.706 s: Reader 0x5c3278c143c0 receive message: HelloWorld 3 RECEIVED
10049.706 s: Reader 0x5c3278c88670 receive message: HelloWorld 4 RECEIVED
10049.706 s: Reader 0x5c3278c143c0 receive message: HelloWorld 4 RECEIVED
10051.306 s: on_liveliness_changed() called for Reader 0x5c3278c143c0 alive_count=1 not_alive_count=1 alive_count_change=-1 not_alive_count_change=1 last_publication_handle=1.f.f7.f3.3d.57.f0.2a.0.0.0.0.0.0.1.3
10051.306 s: on_liveliness_changed() called for Reader 0x5c3278c88670 alive_count=0 not_alive_count=1 alive_count_change=-1 not_alive_count_change=1 last_publication_handle=1.f.f7.f3.3d.57.f0.2a.0.0.0.0.0.0.1.3
10069.707 s: Reader 0x5c3278c143c0 unmatched.
10069.707 s: on_liveliness_changed() called for Reader 0x5c3278c88670 alive_count=0 not_alive_count=0 alive_count_change=0 not_alive_count_change=-1 last_publication_handle=1.f.f7.f3.3d.57.f0.2a.0.0.0.0.0.0.1.3
10069.707 s: Reader 0x5c3278c88670 unmatched.
10076.348 s: Reader 0x5c3278c143c0 matched.
10076.348 s: Reader 0x5c3278c88670 matched.
10077.248 s: on_liveliness_changed() called for Reader 0x5c3278c143c0 alive_count=2 not_alive_count=1 alive_count_change=1 not_alive_count_change=0 last_publication_handle=1.f.f7.f3.48.57.9c.81.0.0.0.0.0.0.1.3
10077.248 s: on_liveliness_changed() called for Reader 0x5c3278c88670 alive_count=1 not_alive_count=0 alive_count_change=1 not_alive_count_change=0 last_publication_handle=1.f.f7.f3.48.57.9c.81.0.0.0.0.0.0.1.3
10078.348 s: Reader 0x5c3278c88670 receive message: HelloWorld 2 RECEIVED
10078.348 s: Reader 0x5c3278c143c0 receive message: HelloWorld 2 RECEIVED
10080.349 s: Reader 0x5c3278c88670 receive message: HelloWorld 3 RECEIVED
10080.349 s: Reader 0x5c3278c143c0 receive message: HelloWorld 3 RECEIVED

HelloWorld example code patch:

diff --git a/examples/cpp/dds/HelloWorldExample/HelloWorldPublisher.cpp b/examples/cpp/dds/HelloWorldExample/HelloWorldPublisher.cpp
index 86f3e0009..28a8aca15 100644
--- a/examples/cpp/dds/HelloWorldExample/HelloWorldPublisher.cpp
+++ b/examples/cpp/dds/HelloWorldExample/HelloWorldPublisher.cpp
@@ -107,6 +107,10 @@ bool HelloWorldPublisher::init(
         publisher_->get_default_datawriter_qos(wqos);
     }
 
+    wqos.liveliness().kind = AUTOMATIC_LIVELINESS_QOS;
+    wqos.liveliness().lease_duration = eprosima::fastrtps::Time_t(1, 000000000);
+    wqos.liveliness().announcement_period = eprosima::fastrtps::Time_t(0, 900000000);
+
     writer_ = publisher_->create_datawriter(
         topic_,
         wqos,
diff --git a/examples/cpp/dds/HelloWorldExample/HelloWorldSubscriber.cpp b/examples/cpp/dds/HelloWorldExample/HelloWorldSubscriber.cpp
index 6823a32f4..29c96a872 100644
--- a/examples/cpp/dds/HelloWorldExample/HelloWorldSubscriber.cpp
+++ b/examples/cpp/dds/HelloWorldExample/HelloWorldSubscriber.cpp
@@ -32,11 +32,23 @@
 
 using namespace eprosima::fastdds::dds;
 
+std::string get_time()
+{
+    using namespace std::chrono;
+
+    auto t = (duration_cast<duration<double>>(steady_clock::now().time_since_epoch())).count();
+    
+    char buf[256];
+    std::snprintf(buf, sizeof(buf), "%.3f s: ", t);
+    return buf;
+}
+
 HelloWorldSubscriber::HelloWorldSubscriber()
     : participant_(nullptr)
     , subscriber_(nullptr)
     , topic_(nullptr)
     , reader_(nullptr)
+    , reader_2_(nullptr)
     , type_(new HelloWorldPubSubType())
 {
 }
@@ -106,12 +118,24 @@ bool HelloWorldSubscriber::init(
         subscriber_->get_default_datareader_qos(rqos);
     }
 
+    rqos.liveliness().lease_duration = eprosima::fastrtps::Time_t(1, 600000000);
     reader_ = subscriber_->create_datareader(topic_, rqos, &listener_);
 
     if (reader_ == nullptr)
     {
         return false;
     }
+    std::cout << "Reader " << reader_ << " for topic " << 
+            reader_->get_topicdescription()->get_name() << " has been created." << std::endl;
+
+    reader_2_ = subscriber_->create_datareader(topic_, rqos, &listener_);
+
+    if (reader_2_ == nullptr)
+    {
+        return false;
+    }
+    std::cout << "Reader " << reader_2_ << " for topic " << 
+            reader_2_->get_topicdescription()->get_name() << " has been created." << std::endl;
 
     return true;
 }
@@ -122,6 +146,10 @@ HelloWorldSubscriber::~HelloWorldSubscriber()
     {
         subscriber_->delete_datareader(reader_);
     }
+    if (reader_2_ != nullptr)
+    {
+        subscriber_->delete_datareader(reader_2_);
+    }
     if (topic_ != nullptr)
     {
         participant_->delete_topic(topic_);
@@ -134,18 +162,18 @@ HelloWorldSubscriber::~HelloWorldSubscriber()
 }
 
 void HelloWorldSubscriber::SubListener::on_subscription_matched(
-        DataReader*,
+        DataReader* reader,
         const SubscriptionMatchedStatus& info)
 {
     if (info.current_count_change == 1)
     {
         matched_ = info.total_count;
-        std::cout << "Subscriber matched." << std::endl;
+        std::cout << get_time() << "Reader " << reader << " matched." << std::endl;
     }
     else if (info.current_count_change == -1)
     {
         matched_ = info.total_count;
-        std::cout << "Subscriber unmatched." << std::endl;
+        std::cout << get_time() << "Reader " << reader << " unmatched." << std::endl;
     }
     else
     {
@@ -164,11 +192,25 @@ void HelloWorldSubscriber::SubListener::on_data_available(
         {
             samples_++;
             // Print your structure data here.
-            std::cout << "Message " << hello_.message() << " " << hello_.index() << " RECEIVED" << std::endl;
+            std::cout << get_time() << "Reader " << reader << " receive message: " << 
+                    hello_.message() << " " << hello_.index() << " RECEIVED" << std::endl;
         }
     }
 }
 
+void HelloWorldSubscriber::SubListener::on_liveliness_changed(
+        eprosima::fastdds::dds::DataReader* reader,
+        const eprosima::fastdds::dds::LivelinessChangedStatus& status)
+{
+    std::cout << get_time() << "on_liveliness_changed() called" << 
+        " for Reader " << reader <<
+        " alive_count=" << status.alive_count <<
+        " not_alive_count=" << status.not_alive_count << 
+        " alive_count_change=" << status.alive_count_change << 
+        " not_alive_count_change=" << status.not_alive_count_change << 
+        " last_publication_handle=" << status.last_publication_handle << "\n";
+}
+
 void HelloWorldSubscriber::run()
 {
     std::cout << "Subscriber running. Please press enter to stop the Subscriber" << std::endl;
diff --git a/examples/cpp/dds/HelloWorldExample/HelloWorldSubscriber.h b/examples/cpp/dds/HelloWorldExample/HelloWorldSubscriber.h
index ecd897bd5..c6ab96615 100644
--- a/examples/cpp/dds/HelloWorldExample/HelloWorldSubscriber.h
+++ b/examples/cpp/dds/HelloWorldExample/HelloWorldSubscriber.h
@@ -55,6 +55,7 @@ private:
     eprosima::fastdds::dds::Topic* topic_;
 
     eprosima::fastdds::dds::DataReader* reader_;
+    eprosima::fastdds::dds::DataReader* reader_2_;
 
     eprosima::fastdds::dds::TypeSupport type_;
 
@@ -79,6 +80,10 @@ private:
                 eprosima::fastdds::dds::DataReader* reader,
                 const eprosima::fastdds::dds::SubscriptionMatchedStatus& info) override;
 
+        void on_liveliness_changed(
+            eprosima::fastdds::dds::DataReader* reader,
+            const eprosima::fastdds::dds::LivelinessChangedStatus& status) override;
+
         HelloWorld hello_;
 
         int matched_;

Fast DDS version/commit

2.13.0

Platform/Architecture

Ubuntu Focal 20.04 amd64

Transport layer

Default configuration, UDPv4 & SHM

Additional context

No response

XML configuration file

No response

Relevant log output

No response

Network traffic capture

No response

@i-and i-and added the triage Issue pending classification label Mar 24, 2024
@i-and
Copy link
Author

i-and commented Apr 6, 2024

The issue arises from the use of the count field in the LivelinessData, since this does not allow the LivelinessManager to accurately track which readers want to control the timeouts of the corresponding writers. The reader_guids vector is introduced for the solution.
Here is the corresponding patch:

diff --git a/include/fastdds/rtps/builtin/liveliness/WLP.h b/include/fastdds/rtps/builtin/liveliness/WLP.h
index a143bec57..217422551 100644
--- a/include/fastdds/rtps/builtin/liveliness/WLP.h
+++ b/include/fastdds/rtps/builtin/liveliness/WLP.h
@@ -246,7 +246,8 @@ private:
             const LivelinessQosPolicyKind& kind,
             const Duration_t& lease_duration,
             int32_t alive_change,
-            int32_t not_alive_change);
+            int32_t not_alive_change,
+            const GUID_t* reader_guid);
 
     /**
      * @brief A method to update the liveliness changed status of a given reader
diff --git a/include/fastdds/rtps/writer/LivelinessData.h b/include/fastdds/rtps/writer/LivelinessData.h
index f33b7bbcf..a1aeaaab0 100644
--- a/include/fastdds/rtps/writer/LivelinessData.h
+++ b/include/fastdds/rtps/writer/LivelinessData.h
@@ -52,12 +52,17 @@ struct LivelinessData
     LivelinessData(
             GUID_t guid_in,
             LivelinessQosPolicyKind kind_in,
-            Duration_t lease_duration_in)
+            Duration_t lease_duration_in,
+            const GUID_t* reader_guid_in = nullptr)
         : guid(guid_in)
         , kind(kind_in)
         , lease_duration(lease_duration_in)
         , status(WriterStatus::NOT_ASSERTED)
-    {}
+    {
+        if (reader_guid_in) {
+            reader_guids.emplace_back(*reader_guid_in);
+        }
+    }
 
     LivelinessData()
         : guid()
@@ -96,6 +101,9 @@ struct LivelinessData
     //! GUID of the writer
     GUID_t guid;
 
+    //! GUID of the readers
+    std::vector<GUID_t> reader_guids;
+
     //! Writer liveliness kind
     LivelinessQosPolicyKind kind;
 
@@ -103,6 +111,7 @@ struct LivelinessData
     Duration_t lease_duration;
 
     //! The number of times the writer is being counted
+    //! NOTE: Probably this field can be excluded because an array reader_guids has been added
     unsigned int count = 1;
 
     //! The writer status
diff --git a/include/fastdds/rtps/writer/LivelinessManager.h b/include/fastdds/rtps/writer/LivelinessManager.h
index d99db0f6e..f527258e1 100644
--- a/include/fastdds/rtps/writer/LivelinessManager.h
+++ b/include/fastdds/rtps/writer/LivelinessManager.h
@@ -34,7 +34,8 @@ using LivelinessCallback = std::function<void (
                     const LivelinessQosPolicyKind&,
                     const Duration_t&,
                     int32_t alive_change,
-                    int32_t not_alive_change)>;
+                    int32_t not_alive_change,
+                    const GUID_t*)>;
 
 /**
  * @brief A class managing the liveliness of a set of writers. Writers are represented by their LivelinessData
@@ -78,7 +79,8 @@ public:
     bool add_writer(
             GUID_t guid,
             LivelinessQosPolicyKind kind,
-            Duration_t lease_duration);
+            Duration_t lease_duration,
+            const GUID_t* reader_guid = nullptr);
 
     /**
      * @brief Removes a writer
@@ -90,7 +92,8 @@ public:
     bool remove_writer(
             GUID_t guid,
             LivelinessQosPolicyKind kind,
-            Duration_t lease_duration);
+            Duration_t lease_duration,
+            const GUID_t* reader_guid = nullptr);
 
     /**
      * @brief Asserts liveliness of a writer in the set
diff --git a/src/cpp/rtps/builtin/liveliness/WLP.cpp b/src/cpp/rtps/builtin/liveliness/WLP.cpp
index 9824f6d70..a89c3d9d2 100644
--- a/src/cpp/rtps/builtin/liveliness/WLP.cpp
+++ b/src/cpp/rtps/builtin/liveliness/WLP.cpp
@@ -193,8 +193,10 @@ bool WLP::initWL(
         const LivelinessQosPolicyKind& kind,
         const Duration_t& lease_duration,
         int alive_count,
-        int not_alive_count) -> void
+        int not_alive_count,
+        const GUID_t* reader_guid) -> void
         {
+            (void)reader_guid;
             pub_liveliness_changed(
                 guid,
                 kind,
@@ -210,14 +212,16 @@ bool WLP::initWL(
         const LivelinessQosPolicyKind& kind,
         const Duration_t& lease_duration,
         int alive_count,
-        int not_alive_count) -> void
+        int not_alive_count,
+        const GUID_t* reader_guid) -> void
         {
             sub_liveliness_changed(
                 guid,
                 kind,
                 lease_duration,
                 alive_count,
-                not_alive_count);
+                not_alive_count,
+                reader_guid);
         },
         mp_participant->getEventResource());
 
@@ -1062,7 +1066,8 @@ void WLP::sub_liveliness_changed(
         const LivelinessQosPolicyKind& kind,
         const Duration_t& lease_duration,
         int32_t alive_change,
-        int32_t not_alive_change)
+        int32_t not_alive_change,
+        const GUID_t* reader_guid)
 {
     // Writer with given guid lost liveliness, check which readers were matched and inform them
 
@@ -1073,6 +1078,10 @@ void WLP::sub_liveliness_changed(
         {
             if (reader->matched_writer_is_matched(writer))
             {
+                if (reader_guid != nullptr && *reader_guid != reader->getGuid()) 
+                {
+                    continue;
+                }
                 update_liveliness_changed_status(
                     writer,
                     reader,
diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp
index 9d631d51b..4ae7e326a 100644
--- a/src/cpp/rtps/reader/StatefulReader.cpp
+++ b/src/cpp/rtps/reader/StatefulReader.cpp
@@ -340,7 +340,8 @@ bool StatefulReader::matched_writer_add(
             wlp->sub_liveliness_manager_->add_writer(
                 wdata.guid(),
                 liveliness_kind_,
-                liveliness_lease_duration_);
+                liveliness_lease_duration_,
+                &getGuid());
         }
         else
         {
@@ -379,7 +380,8 @@ bool StatefulReader::matched_writer_remove(
             wlp->sub_liveliness_manager_->remove_writer(
                 writer_guid,
                 liveliness_kind_,
-                liveliness_lease_duration_);
+                liveliness_lease_duration_,
+                &getGuid());
         }
         else
         {
diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp
index d1d7742e8..0e055f331 100644
--- a/src/cpp/rtps/reader/StatelessReader.cpp
+++ b/src/cpp/rtps/reader/StatelessReader.cpp
@@ -192,7 +192,8 @@ bool StatelessReader::matched_writer_add(
             wlp->sub_liveliness_manager_->add_writer(
                 wdata.guid(),
                 liveliness_kind_,
-                liveliness_lease_duration_);
+                liveliness_lease_duration_,
+                &getGuid());
         }
         else
         {
@@ -229,7 +230,8 @@ bool StatelessReader::matched_writer_remove(
             wlp->sub_liveliness_manager_->remove_writer(
                 writer_guid,
                 liveliness_kind_,
-                liveliness_lease_duration_);
+                liveliness_lease_duration_,
+                &getGuid());
         }
         else
         {
diff --git a/src/cpp/rtps/writer/LivelinessManager.cpp b/src/cpp/rtps/writer/LivelinessManager.cpp
index a5e395366..a4974c021 100644
--- a/src/cpp/rtps/writer/LivelinessManager.cpp
+++ b/src/cpp/rtps/writer/LivelinessManager.cpp
@@ -41,7 +41,8 @@ LivelinessManager::~LivelinessManager()
 bool LivelinessManager::add_writer(
         GUID_t guid,
         LivelinessQosPolicyKind kind,
-        Duration_t lease_duration)
+        Duration_t lease_duration,
+        const GUID_t* reader_guid)
 {
     if (!manage_automatic_ && kind == LivelinessQosPolicyKind::AUTOMATIC_LIVELINESS_QOS)
     {
@@ -55,17 +56,37 @@ bool LivelinessManager::add_writer(
         // writers_ elements guard
         std::lock_guard<std::mutex> __(mutex_);
 
-        for (LivelinessData& writer : writers_)
+
+        if (reader_guid == nullptr)
         {
-            if (writer.guid == guid &&
-                    writer.kind == kind &&
-                    writer.lease_duration == lease_duration)
+            for (LivelinessData& writer : writers_)
+            {
+                if (writer.guid == guid &&
+                        writer.kind == kind &&
+                        writer.lease_duration == lease_duration)
+                {
+                    writer.count++;
+                    return true;
+                }
+            }
+        } 
+        else 
+        {
+            for (LivelinessData& writer : writers_)
             {
-                writer.count++;
-                return true;
+                if (writer.guid == guid &&
+                        writer.kind == kind &&
+                        writer.lease_duration == lease_duration)
+                {
+                    if (std::find(writer.reader_guids.begin(), writer.reader_guids.end(), *reader_guid) == writer.reader_guids.end()) 
+                    {
+                        writer.reader_guids.push_back(*reader_guid);
+                    }
+                    return true;
+                }
             }
         }
-        writers_.emplace_back(guid, kind, lease_duration);
+        writers_.emplace_back(guid, kind, lease_duration, reader_guid);
     }
 
     if (!calculate_next())
@@ -92,10 +113,12 @@ bool LivelinessManager::add_writer(
 bool LivelinessManager::remove_writer(
         GUID_t guid,
         LivelinessQosPolicyKind kind,
-        Duration_t lease_duration)
+        Duration_t lease_duration,
+        const GUID_t* reader_guid)
 {
     bool removed = false;
     LivelinessData::WriterStatus status;
+    bool found = false;
 
     {
         // collection guard
@@ -103,17 +126,33 @@ bool LivelinessManager::remove_writer(
         // writers_ elements guard
         std::lock_guard<std::mutex> __(mutex_);
 
-        removed = writers_.remove_if([guid, kind, lease_duration, &status](LivelinessData& writer)
+        removed = writers_.remove_if([guid, kind, lease_duration, &status, &found, reader_guid](LivelinessData& writer)
                         {
                             status = writer.status;
-                            return writer.guid == guid &&
-                            writer.kind == kind &&
-                            writer.lease_duration == lease_duration &&
-                            --writer.count == 0;
+                            found = writer.guid == guid &&
+                                writer.kind == kind &&
+                                writer.lease_duration == lease_duration;
+                            if (!found)
+                            {
+                                return false;
+                            }
+                            
+                            if (reader_guid == nullptr) {
+                                return --writer.count == 0;
+                            }
+
+                            auto it = std::find(writer.reader_guids.begin(), writer.reader_guids.end(), *reader_guid);
+                            if (it == writer.reader_guids.end()) 
+                            {
+                                found = false;
+                                return false;
+                            }
+                            writer.reader_guids.erase(it);
+                            return writer.reader_guids.size() == 0;
                         });
     }
 
-    if (!removed)
+    if (!found)
     {
         return false;
     }
@@ -122,14 +161,19 @@ bool LivelinessManager::remove_writer(
     {
         if (status == LivelinessData::WriterStatus::ALIVE)
         {
-            callback_(guid, kind, lease_duration, -1, 0);
+            callback_(guid, kind, lease_duration, -1, 0, reader_guid);
         }
         else if (status == LivelinessData::WriterStatus::NOT_ALIVE)
         {
-            callback_(guid, kind, lease_duration, 0, -1);
+            callback_(guid, kind, lease_duration, 0, -1, reader_guid);
         }
     }
 
+    if (!removed) 
+    {
+        return true;
+    }
+
     std::unique_lock<std::mutex> lock(mutex_);
 
     if (timer_owner_ != nullptr)
@@ -335,7 +379,7 @@ bool LivelinessManager::timer_expired()
 
     if (callback_ != nullptr)
     {
-        callback_(guid, kind, lease_duration, -1, 1);
+        callback_(guid, kind, lease_duration, -1, 1, nullptr);
     }
 
     if (calculate_next())
@@ -394,11 +438,11 @@ void LivelinessManager::assert_writer_liveliness(
     {
         if (status == LivelinessData::WriterStatus::NOT_ASSERTED)
         {
-            callback_(guid, kind, lease_duration, 1, 0);
+            callback_(guid, kind, lease_duration, 1, 0, nullptr);
         }
         else if (status == LivelinessData::WriterStatus::NOT_ALIVE)
         {
-            callback_(guid, kind, lease_duration, 1, -1);
+            callback_(guid, kind, lease_duration, 1, -1, nullptr);
         }
     }
 }

@Mario-DL
Copy link
Member

Mario-DL commented Apr 8, 2024

Hi @i-and,
Thanks for the report and the provided patch. We are taking a look at it and will come back to you.

@Mario-DL Mario-DL removed the triage Issue pending classification label Apr 8, 2024
@Mario-DL Mario-DL self-assigned this Apr 8, 2024
@Mario-DL Mario-DL added the in progress Issue or PR which is being reviewed label Apr 8, 2024
@Mario-DL
Copy link
Member

Hi @i-and and sorry for the late response.
Please, could you confirm that the provided fix above solves the issue ?
Thanks in advance

@i-and
Copy link
Author

i-and commented Jun 4, 2024

Hi @Mario-DL
I was able to check your concise patch - it works. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in progress Issue or PR which is being reviewed
Projects
None yet
2 participants