diff --git a/orchagent/orch.cpp b/orchagent/orch.cpp index d1cbdb89c8..708a86280a 100644 --- a/orchagent/orch.cpp +++ b/orchagent/orch.cpp @@ -242,14 +242,12 @@ void Consumer::execute() // ConsumerBase::execute_impl(); SWSS_LOG_ENTER(); - size_t update_size = 0; auto table = static_cast(getSelectable()); - do - { - std::deque entries; - table->pops(entries); - update_size = addToSync(entries); - } while (update_size != 0); + std::deque entries; + table->pops(entries); + + // add to sync + addToSync(entries); drain(); } diff --git a/tests/mock_tests/consumer_ut.cpp b/tests/mock_tests/consumer_ut.cpp index 500bf45879..f0008a964b 100644 --- a/tests/mock_tests/consumer_ut.cpp +++ b/tests/mock_tests/consumer_ut.cpp @@ -10,6 +10,25 @@ namespace consumer_test { using namespace std; + class TestOrch : public Orch + { + public: + TestOrch(swss::DBConnector *db, string tableName) + :Orch(db, tableName), + m_notification_count(0) + { + } + + void doTask(Consumer& consumer) + { + std::cout << "TestOrch::doTask " << consumer.m_toSync.size() << std::endl; + m_notification_count += consumer.m_toSync.size(); + consumer.m_toSync.clear(); + } + + long m_notification_count; + }; + struct ConsumerTest : public ::testing::Test { shared_ptr m_app_db; @@ -322,4 +341,31 @@ namespace consumer_test validate_syncmap(consumer->m_toSync, 1, key, exp_kofv); } + + TEST_F(ConsumerTest, ConsumerPops_notification_count) + { + int consumer_pops_batch_size = 10; + TestOrch test_orch(m_config_db.get(), "CFG_TEST_TABLE"); + Consumer test_consumer( + new swss::ConsumerStateTable(m_config_db.get(), "CFG_TEST_TABLE", consumer_pops_batch_size, 1), &test_orch, "CFG_TEST_TABLE"); + swss::ProducerStateTable producer_table(m_config_db.get(), "CFG_TEST_TABLE"); + + m_config_db->flushdb(); + for (int notification_count = 0; notification_count< consumer_pops_batch_size*2; notification_count++) + { + std::vector fields; + FieldValueTuple t("test_field", "test_value"); + fields.push_back(t); + producer_table.set(std::to_string(notification_count), fields); + + cout << "ConsumerPops_notification_count:: add key: " << notification_count << endl; + } + + // consumer should pops consumer_pops_batch_size notifications + test_consumer.execute(); + ASSERT_EQ(test_orch.m_notification_count, consumer_pops_batch_size); + + test_consumer.execute(); + ASSERT_EQ(test_orch.m_notification_count, consumer_pops_batch_size*2); + } } diff --git a/tests/mock_tests/mock_consumerstatetable.cpp b/tests/mock_tests/mock_consumerstatetable.cpp index 822727929a..2764bb10f2 100644 --- a/tests/mock_tests/mock_consumerstatetable.cpp +++ b/tests/mock_tests/mock_consumerstatetable.cpp @@ -7,4 +7,34 @@ namespace swss TableName_KeySet(tableName) { } + + void ConsumerStateTable::pops(std::deque &vkco, const std::string& /*prefix*/) + { + int count = 0; + swss::Table table(getDbConnector(), getTableName()); + std::vector keys; + table.getKeys(keys); + for (const auto &key: keys) + { + // pop with batch size + if (count < POP_BATCH_SIZE) + { + count++; + } + else + { + break; + } + + KeyOpFieldsValuesTuple kco; + kfvKey(kco) = key; + kfvOp(kco) = SET_COMMAND; + if (!table.get(key, kfvFieldsValues(kco))) + { + continue; + } + table.del(key); + vkco.push_back(kco); + } + } }