Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Switch to logging #416

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a8e6123
feature: add ZMQ logger
giulioungaretti Dec 16, 2016
e225d57
feat: Add broker
giulioungaretti Dec 18, 2016
1f82cbe
fix: Warn instead of trying to start broker
giulioungaretti Dec 19, 2016
d51e35d
feat: Make broker standalone app
giulioungaretti Dec 19, 2016
5b17fd9
feat: ship logger as script
giulioungaretti Jan 9, 2017
d2e6447
docs: Correct docstring
giulioungaretti Jan 8, 2017
92c42e6
feat: send json
giulioungaretti Jan 10, 2017
967ea7f
fix: Use nicer sytax to exectute
giulioungaretti May 29, 2017
7c65f38
Send multipart json
giulioungaretti May 29, 2017
d2e36af
feature: Make broker a class, force safe exit.
giulioungaretti May 30, 2017
2e02343
feature: Throttle logger and add general pbulishers
giulioungaretti May 31, 2017
6ccb994
rename zmq stuff
giulioungaretti May 31, 2017
87ff721
Polish borker
giulioungaretti May 31, 2017
74da6f9
rename broker
giulioungaretti May 31, 2017
4a460f4
remove legacy setup for exe broker
giulioungaretti May 31, 2017
946ca47
fix: Rename all things
giulioungaretti May 31, 2017
068cc71
fix: Broker path
giulioungaretti May 31, 2017
66bf176
fix: Cast to int
giulioungaretti May 31, 2017
5d537bd
fix: Rename Publishers
giulioungaretti May 31, 2017
1a90008
feat: Turn on debug from env var.
giulioungaretti May 31, 2017
545169a
fix: Use correct class name + pep8
giulioungaretti May 31, 2017
d4331a7
remove empty file
giulioungaretti Jun 12, 2017
c596c21
fix: add zmq to requirements
giulioungaretti Jun 12, 2017
77169b0
docs: Add note on message dropping
giulioungaretti Jun 12, 2017
a5aef87
feature: Throttle send + use defaults
giulioungaretti Jun 12, 2017
7c7ce4e
feat:Expose publisher
giulioungaretti Jun 12, 2017
5438c2d
fix: Remove unused import
giulioungaretti Jun 12, 2017
026091b
Merge branch 'master' into feature/logging
jenshnielsen Dec 11, 2017
5232918
remove duplicates
jenshnielsen Dec 11, 2017
7a1e81d
one more duplicate
jenshnielsen Dec 11, 2017
a94eef3
Merge branch 'master' into feature/logging
jenshnielsen Dec 13, 2017
aa64f26
dont set loglevel automatically
jenshnielsen Dec 13, 2017
e004bf7
use a named logger
jenshnielsen Dec 13, 2017
3a0788e
Merge branch 'master' into feature/logging
jenshnielsen Dec 13, 2017
394fc4e
Merge branch 'master' into feature/logging
astafan8 Sep 14, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions bin/broker
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#!/usr/bin/env python

import zmq


class Broker():

def __init__(self, frontend_addres="tcp://*:5559",
backend_address="tcp://*:5560",
monitor=False, monitor_address="tcp://*:5561"):
"""
XPUB/XSUB broker with optional monitoring
Listens for messages on the frontend and transparently pushes them to a
backend.
This allows to have a centralized broker sending from multiple
processes and to forwarding multiple consumers.
Messages sent but never forwarded (f.e.x if there aren't subscribers)
are quietly cached on the sender-side.
This means that it's the sender's responsabilty to avoid memory leaks.

Args:
frontend_addres (str): Interface to which the frontend is bound
backend_address (str): Interface to which the backend is bound
monitor_address (str): Interface to which the monitor is bound


"""
self.context = zmq.Context()
# Socket facing clients
self.frontend = self.context.socket(zmq.XSUB)
try:
self.frontend.bind(frontend_addres)
logging.info("XSUB listening at {}".format(frontend_addres))
except zmq.error.ZMQError:
logging.Info("Exiting. Another broker is already running")
logging.debug("ZMQ erorr: %s", e)
return

# Socket facing services
self.backend = self.context.socket(zmq.XPUB)
try:
self.backend.bind(backend_address)
logging.info("XPUB publishing at {}".format(backend_address))
except zmq.error.ZMQError as e:
logging.Info("Exiting. Another broker is already running")
logging.debug("ZMQ erorr: %s", e)
return

if monitor:
self.monitor = self.context.socket(zmq.PUB)
try:
self.monitor.bind(monitor_address)
logging.info("PUB monitor at {}".format(monitor_address))
except zmq.error.ZMQError as e:
logging.Info("Exiting. Another broker is already running")
logging.debug("ZMQ erorr: %s", e)
else:
self.monitor = None

def close(self):
"""
Close cleanly this broker
"""
self.frontend.close()
self.backend.close()
if self.monitor:
self.monitor.close()
self.context.term()
self.context.destroy()

def serve_forever(self):
"""
Start forwarding forever.
Interrput and close by raising KeyboardInterrupt.
"""
try:
if self.monitor:
zmq.proxy(self.frontend, self.backend, self.monitor)
else:
zmq.proxy(self.frontend, self.backend)
except KeyboardInterrupt:
self.close()
logging.debug("Exiting. Broker got <C-c>")
return


if __name__ == "__main__":
import logging
import atexit
from os import environ
debug_mode = environ.get("debug", False)
if debug_mode:
logging.basicConfig(level=logging.DEBUG)
broker = Broker()
atexit.register(broker.close)
broker.serve_forever()
Empty file modified docs/make.bat
100644 → 100755
Empty file.
20 changes: 17 additions & 3 deletions qcodes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
"""Set up the main qcodes namespace."""

# flake8: noqa (we don't need the "<...> imported but unused" error)

# config
# config and logging
import logging

from qcodes.config import Config
from qcodes.utils.helpers import add_to_spyder_UMR_excludelist
Expand All @@ -13,6 +12,21 @@
add_to_spyder_UMR_excludelist('qcodes')
config: Config = Config()

from qcodes.utils.zmq_helpers import check_broker
haz_broker = check_broker()

log = logging.getLogger(__name__)
if haz_broker:
from qcodes.utils.zmq_helpers import QPUBHandler
import logging.config
import pkg_resources as pkgr
logger_config = pkgr.resource_filename(__name__, "./config/logging.conf")
logging.config.fileConfig(logger_config)
else:
log.warning("Can't publish logs, did you star the server?")


# name space
from qcodes.version import __version__

plotlib = config.gui.plotlib
Expand Down
22 changes: 22 additions & 0 deletions qcodes/config/logging.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[loggers]
keys=root

[handlers]
keys=ZMQ

[formatters]
keys=simpleFormatter

[logger_root]
level=DEBUG
handlers=ZMQ

[handler_ZMQ]
class=qcodes.QPUBHandler
level=DEBUG
formatter=simpleFormatter
args=("tcp://localhost:5559",)

[formatter_simpleFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
datefmt="%H:%M:%S"
132 changes: 128 additions & 4 deletions qcodes/utils/zmq_helpers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,133 @@
import time
import re
import logging
import json
import time

import zmq

_LINGER = 1000 # milliseconds
#logging.basicConfig(level="DEBUG")
DEBUGF = "%(levelname)s:%(name)s:%(lineno)d:%(asctime)s%(message)s"
INFOF = "%(levelname)s:%(name)s:%(message)s\n"
WARNF = "%(levelname)s:%(filename)s:%(name)s:%(lineno)d - %(message)s\n"
ERRORF = "%(levelname)s:%(filename)s:%(name)s:%(lineno)d - %(message)s - %(exc_info)s\n"
CRITICALF = "%(levelname)s:%(filename)s:%(name)s:%(lineno)d - %(message)s - %(exc_info)s\n"
DATEFMT = "%H:%M:%S"

_ALMOST_RANDOM_GUESS_LOG_MSG_SIZE = 120 # B per log message
_ZMQ_HWM = int(5e8 / 120) # 500MB max memory for the logger
_LINGER = 1000 # milliseconds


class QPUBHandler(logging.Handler):
"""A basic logging handler that emits log messages through a PUB socket.

Takes an interface to connect to.

handler = PUBHandler('inproc://loc')

Log messages handled by this handler are broadcast with ZMQ topic
``log.name`` (which is the name of the module, when logging is done right).

This handler also has sane defaults when it comes to caching.

By default it sets:
- no more than 500MB (ish) message cache
- messages have at best a second of lifetime

This means:
- if more than 500MB are cached, NEW messages are dropped
- if nobody reads the messages after a second the socket will close
wihtout waiting

NOTE that this offers no guarantees on message delivery.
If there is no reciever the message is LOST.
"""
# note that if we want zmq topcis the format MUST include name
formatters = {
logging.DEBUG: logging.Formatter(DEBUGF, datefmt=DATEFMT),
logging.INFO: logging.Formatter(INFOF, datefmt=DATEFMT),
logging.WARN: logging.Formatter(
WARNF, datefmt=DATEFMT),
logging.ERROR: logging.Formatter(
ERRORF, datefmt=DATEFMT),
logging.CRITICAL: logging.Formatter(
CRITICALF, datefmt=DATEFMT),
}

def __init__(self, interface_or_socket, context=None):
logging.Handler.__init__(self)
self.ctx = context or zmq.Context()
self.socket = self.ctx.socket(zmq.PUB)
self.socket.connect(interface_or_socket)
# set-up sane defaults
self.socket.setsockopt(zmq.LINGER, _LINGER)
self.socket.set_hwm(_ZMQ_HWM)

def format(self, record):
"""Format a record."""
self.formatters[record.levelno].format(record)
values = parse(self.formatters[record.levelno]._fmt)
json_out = {}
for key in values:
json_out[key] = getattr(record, key)
return json_out

def emit(self, record):
"""Emit a record message
Args:
record (logging.record): record to shovel on the socket
"""
msg = self.format(record)
_logger_name = msg.get("name", "")
if _logger_name:
topic = "logger" + "." + _logger_name
else:
topic = "logger"
self.socket.send_multipart([topic.encode(), json.dumps(msg).encode()])


def parse(string):
"""Parses format string looking for substitutions"""
standard_formatters = re.compile(r'\((.+?)\)', re.IGNORECASE)
return standard_formatters.findall(string)


def check_broker(frontend_address="tcp://*:5559", backend_address="tcp://*:5560"):
"""
Simple and dumb check to see if a XPUB/XSUB broker exists.

Args:
frontend_address (str): Interface to which the frontend is bound
backend_address (str): Interface to which the backend is bound

Returns:
bool: Broker server exists

"""
context = zmq.Context()
# Socket facing clients
frontend = context.socket(zmq.XSUB)
f = True
b = True
try:
frontend.bind(frontend_address)
f = False
except zmq.error.ZMQError:
pass

# Socket facing services
backend = context.socket(zmq.XPUB)
try:
backend.bind(backend_address)
b = False
except zmq.error.ZMQError:
pass

frontend.close()
backend.close()
context.term()
return f and b


class UnboundedPublisher:
"""
Expand All @@ -16,7 +140,7 @@ class UnboundedPublisher:
def __init__(self,
topic: str,
interface_or_socket: str="tcp://localhost:5559",
context: zmq.Context = None) -> None:
context: zmq.Context = None):
"""

Args:
Expand Down Expand Up @@ -50,7 +174,7 @@ class Publisher(UnboundedPublisher):
def __init__(self, topic: str,
interface_or_socket: str="tcp://localhost:5559",
timeout: int = _LINGER*10,
hwm: int = _ZMQ_HWM*5, context: zmq.Context = None) -> None:
hwm: int = _ZMQ_HWM*5, context: zmq.Context = None):
"""

Args:
Expand Down
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from setuptools import setup, find_packages
from distutils.version import StrictVersion
from importlib import import_module
import sys

import versioneer

Expand Down Expand Up @@ -65,7 +64,9 @@ def readme():
'tests/drivers/auxiliary_files/*',
'py.typed']},
install_requires=install_requires,

# scripts to include
# broker
scripts=['bin/broker'],
test_suite='qcodes.tests',
extras_require=extras_require,
# zip_safe=False is required for mypy
Expand Down