diff --git a/tests/subscriber.cpp b/tests/subscriber.cpp index 09be191..3d4ed3c 100644 --- a/tests/subscriber.cpp +++ b/tests/subscriber.cpp @@ -27,7 +27,7 @@ BOOST_AUTO_TEST_CASE(construction) BOOST_AUTO_TEST_CASE(invalid_construction) { - BOOST_CHECK_THROW(zeroeq::Subscriber(zeroeq::NULL_SESSION), + BOOST_CHECK_THROW(zeroeq::Subscriber{zeroeq::NULL_SESSION}, std::runtime_error); BOOST_CHECK_THROW(zeroeq::Subscriber(""), std::runtime_error); BOOST_CHECK_THROW(zeroeq::Subscriber(zeroeq::URI("localhost")), @@ -37,8 +37,7 @@ BOOST_AUTO_TEST_CASE(invalid_construction) std::runtime_error); zeroeq::Subscriber shared; - BOOST_CHECK_THROW(zeroeq::Subscriber subscriber(zeroeq::NULL_SESSION, - shared), + BOOST_CHECK_THROW(zeroeq::Subscriber(zeroeq::NULL_SESSION, shared), std::runtime_error); BOOST_CHECK_THROW(zeroeq::Subscriber("", shared), std::runtime_error); BOOST_CHECK_THROW(zeroeq::Subscriber(zeroeq::URI("localhost"), shared), diff --git a/zeroeq/CMakeLists.txt b/zeroeq/CMakeLists.txt index 09221c0..fac13b6 100644 --- a/zeroeq/CMakeLists.txt +++ b/zeroeq/CMakeLists.txt @@ -18,11 +18,11 @@ set(ZEROEQ_PUBLIC_HEADERS uri.h) set(ZEROEQ_HEADERS - detail/browser.h detail/common.h detail/constants.h detail/context.h detail/port.h + detail/receiver.h detail/sender.h detail/socket.h) diff --git a/zeroeq/client.cpp b/zeroeq/client.cpp index d4065a9..4f936c2 100644 --- a/zeroeq/client.cpp +++ b/zeroeq/client.cpp @@ -5,38 +5,36 @@ #include "client.h" -#include "detail/browser.h" #include "detail/common.h" +#include "detail/receiver.h" #include #include namespace zeroeq { -class Client::Impl : public detail::Browser +class Client::Impl : public detail::Receiver { public: explicit Impl(const std::string& session) - : Browser(SERVER_SERVICE, - session == DEFAULT_SESSION ? getDefaultRepSession() : session) + : detail::Receiver(SERVER_SERVICE, session == DEFAULT_SESSION + ? getDefaultRepSession() + : session) , _servers(zmq_socket(getContext(), ZMQ_DEALER), [](void* s) { ::zmq_close(s); }) { } explicit Impl(const URIs& uris) - : Browser(SERVER_SERVICE) + : detail::Receiver(SERVER_SERVICE) , _servers(zmq_socket(getContext(), ZMQ_DEALER), [](void* s) { ::zmq_close(s); }) { for (const auto& uri : uris) { - if (uri.getScheme() == DEFAULT_SCHEMA && - (uri.getHost().empty() || uri.getPort() == 0)) - { + if (!uri.isFullyQualified()) ZEROEQTHROW(std::runtime_error( std::string("Non-fully qualified URI used for server"))); - } const auto& zmqURI = buildZmqURI(uri); if (!addConnection(zmqURI)) diff --git a/zeroeq/detail/browser.h b/zeroeq/detail/receiver.h similarity index 89% rename from zeroeq/detail/browser.h rename to zeroeq/detail/receiver.h index 179dd1b..2bc116e 100644 --- a/zeroeq/detail/browser.h +++ b/zeroeq/detail/receiver.h @@ -19,10 +19,10 @@ namespace zeroeq namespace detail { /** Manages and updates a set of connections with a zeroconf browser. */ -class Browser +class Receiver { public: - Browser(const std::string& service, const std::string session) + Receiver(const std::string& service, const std::string session) : _servus(service) , _session(session) , _context(detail::getContext()) @@ -39,14 +39,14 @@ class Browser update(); } - Browser(const std::string& service) + Receiver(const std::string& service) : _servus(service) , _session(zeroeq::NULL_SESSION) , _context(detail::getContext()) { } - virtual ~Browser() + virtual ~Receiver() { if (_servus.isBrowsing()) _servus.endBrowsing(); @@ -77,7 +77,7 @@ class Browser const uint128_t identifier(_servus.get(instance, KEY_INSTANCE)); zmq::SocketPtr socket = createSocket(identifier); if (socket) - _addConnection(zmqURI, socket); + _connect(zmqURI, socket); } } } @@ -86,16 +86,10 @@ class Browser { zmq::SocketPtr socket = createSocket(uint128_t()); if (socket) - return _addConnection(zmqURI, socket); + return _connect(zmqURI, socket); return true; } - /** - * Create the socket for zmqURI, return nullptr if connection is to be - * ignored. - */ - virtual zmq::SocketPtr createSocket(const uint128_t& instance) = 0; - void addSockets(std::vector& entries) { entries.insert(entries.end(), _entries.begin(), _entries.end()); @@ -105,9 +99,16 @@ class Browser using SocketMap = std::map; void* getContext() { return _context.get(); } + + /** + * Create the socket for the given instance, return nullptr if connection is + * to be ignored. + */ + virtual zmq::SocketPtr createSocket(const uint128_t& instance) = 0; + const SocketMap& getSockets() { return _sockets; } - bool _addConnection(const std::string& zmqURI, zmq::SocketPtr socket) + bool _connect(const std::string& zmqURI, zmq::SocketPtr socket) { if (zmq_connect(socket.get(), zmqURI.c_str()) == -1) { diff --git a/zeroeq/detail/sender.cpp b/zeroeq/detail/sender.cpp index 6065b0a..686bab4 100644 --- a/zeroeq/detail/sender.cpp +++ b/zeroeq/detail/sender.cpp @@ -25,6 +25,11 @@ namespace zeroeq { namespace detail { +Sender::Sender(const URI& uri_, const int type) + : Sender(uri_, type, {}, {}) +{ +} + Sender::Sender(const URI& uri_, const int type, const std::string service, const std::string& session) : _context(getContext()) @@ -37,11 +42,6 @@ Sender::Sender(const URI& uri_, const int type, const std::string service, zmq_setsockopt(socket.get(), ZMQ_SNDHWM, &hwm, sizeof(hwm)); } -Sender::Sender(const URI& uri_, const int type) - : Sender(uri_, type, {}, {}) -{ -} - Sender::~Sender() { socket.reset(); @@ -62,23 +62,23 @@ void Sender::initURI() host.clear(); uint16_t port = uri.getPort(); - if (host.empty() || port == 0) - { - std::string hostStr, portStr; - _getEndPoint(hostStr, portStr); - - if (port == 0) - { - // No overflow is possible unless ZMQ reports bad port number. - port = std::stoi(portStr); - uri.setPort(port); - } + if (!host.empty() && port != 0) + return; - if (host.empty()) - uri.setHost(hostStr); + std::string hostStr, portStr; + _getEndPoint(hostStr, portStr); - ZEROEQINFO << "Bound to " << uri << std::endl; + if (port == 0) + { + // No overflow is possible unless ZMQ reports bad port number. + port = std::stoi(portStr); + uri.setPort(port); } + + if (host.empty()) + uri.setHost(hostStr); + + ZEROEQINFO << "Bound to " << uri << std::endl; } void Sender::announce() diff --git a/zeroeq/detail/sender.h b/zeroeq/detail/sender.h index 118940b..b8f4631 100644 --- a/zeroeq/detail/sender.h +++ b/zeroeq/detail/sender.h @@ -20,9 +20,9 @@ class Sender zmq::ContextPtr _context; // must be private before socket public: + Sender(const URI& uri_, const int type); Sender(const URI& uri_, const int type, const std::string service, const std::string& session); - Sender(const URI& uri_, const int type); ~Sender(); std::string getAddress() const; diff --git a/zeroeq/receiver.cpp b/zeroeq/receiver.cpp index e0f2740..4663b1a 100644 --- a/zeroeq/receiver.cpp +++ b/zeroeq/receiver.cpp @@ -103,10 +103,9 @@ class Receiver::Impl intervals.push_back(sockets.size() - before); } - const uint32_t remaining = - duration_cast(high_resolution_clock::now() - - startTime) - .count(); + const auto remaining = duration_cast( + high_resolution_clock::now() - startTime) + .count(); switch (zmq_poll(sockets.data(), int(sockets.size()), remaining)) { @@ -152,10 +151,9 @@ class Receiver::Impl } } } - } while (haveData && - duration_cast(high_resolution_clock::now() - - startTime) - .count() < timeout); + } while (haveData && duration_cast( + high_resolution_clock::now() - startTime) + .count() < timeout); return hadData; } }; diff --git a/zeroeq/receiver.h b/zeroeq/receiver.h index 89b690f..0d6ce20 100644 --- a/zeroeq/receiver.h +++ b/zeroeq/receiver.h @@ -87,6 +87,7 @@ class Receiver * their list of sockets. */ virtual void update() {} + /** * Add the given connection to the list of receiving sockets. * diff --git a/zeroeq/server.cpp b/zeroeq/server.cpp index 46f0805..bc9ee89 100644 --- a/zeroeq/server.cpp +++ b/zeroeq/server.cpp @@ -5,9 +5,11 @@ #include "server.h" -#include "detail/browser.h" +#include "detail/receiver.h" #include "detail/sender.h" +#include + #include #include diff --git a/zeroeq/subscriber.cpp b/zeroeq/subscriber.cpp index a0f1c32..62cd865 100644 --- a/zeroeq/subscriber.cpp +++ b/zeroeq/subscriber.cpp @@ -6,10 +6,10 @@ #include "subscriber.h" -#include "detail/browser.h" #include "detail/byteswap.h" #include "detail/common.h" #include "detail/constants.h" +#include "detail/receiver.h" #include "detail/sender.h" #include "detail/socket.h" #include "log.h" @@ -24,26 +24,23 @@ namespace zeroeq { -class Subscriber::Impl : public detail::Browser +class Subscriber::Impl : public detail::Receiver { public: Impl(const std::string& session) - : Browser(PUBLISHER_SERVICE, + : detail::Receiver(PUBLISHER_SERVICE, session == DEFAULT_SESSION ? getDefaultPubSession() : session) , _selfInstance(detail::Sender::getUUID()) { } Impl(const URI& uri) - : Browser(PUBLISHER_SERVICE) + : detail::Receiver(PUBLISHER_SERVICE) , _selfInstance(detail::Sender::getUUID()) { - if (uri.getScheme() == DEFAULT_SCHEMA && - (uri.getHost().empty() || uri.getPort() == 0)) - { + if (!uri.isFullyQualified()) ZEROEQTHROW(std::runtime_error( std::string("Non-fully qualified URI used for subscriber"))); - } const std::string& zmqURI = buildZmqURI(uri); if (!addConnection(zmqURI)) diff --git a/zeroeq/uri.cpp b/zeroeq/uri.cpp index 5f03032..eaad54c 100644 --- a/zeroeq/uri.cpp +++ b/zeroeq/uri.cpp @@ -96,4 +96,10 @@ bool URI::operator!=(const servus::URI& rhs) const { return servus::URI::operator!=(rhs); } + +bool URI::isFullyQualified() const +{ + return getScheme() != DEFAULT_SCHEMA || + (!getHost().empty() && getPort() != 0); +} } diff --git a/zeroeq/uri.h b/zeroeq/uri.h index 08db88c..885837e 100644 --- a/zeroeq/uri.h +++ b/zeroeq/uri.h @@ -50,6 +50,10 @@ class URI : private servus::URI /** Convert this URI to a servus::URI */ const servus::URI& toServusURI() const { return *this; } + + /** @return true if the host and port are given for a tcp URI. */ + ZEROEQ_API bool isFullyQualified() const; + /** @name servus::URI API */ //@{ using servus::URI::getScheme;