From bfbbadce1cc56e5c04ec6eeda7afc1e9d2d25d62 Mon Sep 17 00:00:00 2001 From: deanlee Date: Wed, 29 May 2024 20:16:31 +0800 Subject: [PATCH] improve blocking receive --- msgq/impl_msgq.cc | 89 +++++++++++++++++------------------------------ 1 file changed, 31 insertions(+), 58 deletions(-) diff --git a/msgq/impl_msgq.cc b/msgq/impl_msgq.cc index b23991351..c98faac53 100644 --- a/msgq/impl_msgq.cc +++ b/msgq/impl_msgq.cc @@ -1,21 +1,12 @@ #include #include #include -#include +#include #include -#include #include "msgq/impl_msgq.h" -volatile sig_atomic_t msgq_do_exit = 0; - -void sig_handler(int signal) { - assert(signal == SIGINT || signal == SIGTERM); - msgq_do_exit = 1; -} - - MSGQContext::MSGQContext() { } @@ -70,61 +61,43 @@ int MSGQSubSocket::connect(Context *context, std::string endpoint, std::string a return 0; } - -Message * MSGQSubSocket::receive(bool non_blocking){ - msgq_do_exit = 0; - - void (*prev_handler_sigint)(int); - void (*prev_handler_sigterm)(int); - if (!non_blocking){ - prev_handler_sigint = std::signal(SIGINT, sig_handler); - prev_handler_sigterm = std::signal(SIGTERM, sig_handler); - } - +Message *MSGQSubSocket::receive(bool non_blocking) { msgq_msg_t msg; - - MSGQMessage *r = NULL; - int rc = msgq_msg_recv(&msg, q); - // Hack to implement blocking read with a poller. Don't use this - while (!non_blocking && rc == 0 && msgq_do_exit == 0){ - msgq_pollitem_t items[1]; - items[0].q = q; - - int t = (timeout != -1) ? timeout : 100; - - int n = msgq_poll(items, 1, t); - rc = msgq_msg_recv(&msg, q); - - // The poll indicated a message was ready, but the receive failed. Try again - if (n == 1 && rc == 0){ - continue; - } - - if (timeout != -1){ - break; + if (rc == 0 && !non_blocking) { + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGINT); + sigaddset(&mask, SIGTERM); + sigprocmask(SIG_BLOCK, &mask, nullptr); + + int64_t timieout_ns = ((timeout != -1) ? timeout : 100) * 1000000; + auto start = steady_clock::now(); + + // Continue receiving messages until timeout or interruption by SIGINT or SIGTERM + while (rc == 0 && timieout_ns > 0) { + struct timespec ts {timieout_ns / 1000000000, timieout_ns % 1000000000}; + int ret = sigtimedwait(&mask, nullptr, &ts); + if (ret == SIGINT || ret == SIGTERM) { + raise(ret); // Raise the signal again to ensure it's not missed + break; // Exit the loop + } else if (ret == -1 && errno == EAGAIN && timeout != -1) { + break; // Timed out + } + + rc = msgq_msg_recv(&msg, q); + timieout_ns -= (timeout == -1 ? 0 : duration_cast(steady_clock::now() - start).count()); } + sigprocmask(SIG_UNBLOCK, &mask, nullptr); } - - if (!non_blocking){ - std::signal(SIGINT, prev_handler_sigint); - std::signal(SIGTERM, prev_handler_sigterm); - } - - errno = msgq_do_exit ? EINTR : 0; - - if (rc > 0){ - if (msgq_do_exit){ - msgq_msg_close(&msg); // Free unused message on exit - } else { - r = new MSGQMessage; - r->takeOwnership(msg.data, msg.size); - } + if (rc > 0) { + MSGQMessage *r = new MSGQMessage; + r->takeOwnership(msg.data, msg.size); + return r; } - - return (Message*)r; + return nullptr; } void MSGQSubSocket::setTimeout(int t){