Skip to content

Commit

Permalink
Issue/async bootloader (#873)
Browse files Browse the repository at this point in the history
* Make bootloader async
  • Loading branch information
arnaudsjs authored and wouterdb committed Jan 9, 2019
1 parent b2f1acd commit 621a660
Show file tree
Hide file tree
Showing 12 changed files with 172 additions and 125 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
v 2018.4 (2018-12-xx)
Changes in this release:
- Various bugfixes and performance enhancements
- Various bugfixes and performance enhancements (#873)
- Dependency updates
- Removal of snapshot and restore functionality from the server (#789)
- Replace virtualenv by python standard venv (#783)
Expand Down
50 changes: 35 additions & 15 deletions src/inmanta/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@
import os
import pwd
import socket
import signal

import colorlog
from inmanta.command import command, Commander, CLIException
from inmanta.compiler import do_compile
from inmanta.config import Config
from tornado.ioloop import IOLoop
from tornado.util import TimeoutError
from tornado import gen
from inmanta import protocol, module, moduletool, const
from inmanta.export import cfg_env, ModelExporter
import yaml
Expand All @@ -55,31 +58,48 @@

@command("server", help_msg="Start the inmanta server")
def start_server(options):
io_loop = IOLoop.current()

ibl = InmantaBootloader()
ibl.start()

try:
io_loop.start()
except KeyboardInterrupt:
IOLoop.current().stop()
ibl.stop()
setup_signal_handlers(ibl.stop)
IOLoop.current().add_callback(ibl.start)
IOLoop.current().start()


@command("agent", help_msg="Start the inmanta agent")
def start_agent(options):
from inmanta import agent
io_loop = IOLoop.current()

a = agent.Agent()
a.start()
setup_signal_handlers(a.stop)
IOLoop.current().add_callback(a.start)
IOLoop.current().start()


def setup_signal_handlers(shutdown_function):
"""
Make sure that shutdown_function is called when a SIGTERM or a SIGINT interrupt occurs.
:param shutdown_function: The function that contains the shutdown logic.
"""
def handle_signal(signum, frame):
IOLoop.current().add_callback_from_signal(safe_shutdown_wrapper, shutdown_function)

signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)


@gen.coroutine
def safe_shutdown_wrapper(shutdown_function):
"""
Wait 10 seconds to gracefully shutdown the instance.
Afterwards stop the IOLoop
"""
future = shutdown_function()
try:
io_loop.start()
except KeyboardInterrupt:
timeout = IOLoop.current().time() + 10
yield gen.with_timeout(timeout, future)
except TimeoutError:
pass
finally:
IOLoop.current().stop()
a.stop()


def compiler_config(parser):
Expand Down
2 changes: 2 additions & 0 deletions src/inmanta/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,7 @@ def set_environment(self, environment_id: uuid.UUID):
else:
self._env_id = environment_id

@gen.coroutine
def start(self):
"""
Connect to the server and use a heartbeat and long-poll for two-way communication
Expand All @@ -807,6 +808,7 @@ def start(self):
self._client = AgentClient(self.name, self.sessionid, transport=self._transport, timeout=self.server_timeout)
IOLoop.current().add_callback(self.perform_heartbeat)

@gen.coroutine
def stop(self):
self.running = False

Expand Down
8 changes: 6 additions & 2 deletions src/inmanta/server/agentmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ def __init__(self, restserver, closesessionsonstart=True, fact_back_off=None):

self.closesessionsonstart = closesessionsonstart

@gen.coroutine
def prestart(self, server):
ServerSlice.prestart(self, server)
yield ServerSlice.prestart(self, server)
self._server = server.get_endpoint("server")
self._server_storage = self._server._server_storage
server.get_endpoint("session").add_listener(self)
Expand All @@ -135,13 +136,15 @@ def get_agent_client(self, tid: uuid.UUID, endpoint):
return self.tid_endpoint_to_session[(tid, endpoint)].get_client()
return None

@gen.coroutine
def start(self):
self.add_future(self.start_agents())
if self.closesessionsonstart:
self.add_future(self.clean_db())

@gen.coroutine
def stop(self):
self.terminate_agents()
yield self.terminate_agents()

# Agent Management
@gen.coroutine
Expand Down Expand Up @@ -484,6 +487,7 @@ def __do_start_agent(self, agents, env):
LOGGER.debug("Started new agent with PID %s", proc.pid)
return True

@gen.coroutine
def terminate_agents(self):
for proc in self._agent_procs.values():
proc.terminate()
Expand Down
7 changes: 5 additions & 2 deletions src/inmanta/server/bootloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from inmanta.server import server
from inmanta.server.protocol import RESTServer
from inmanta.server.agentmanager import AgentManager
from tornado import gen


class InmantaBootloader(object):
Expand All @@ -35,10 +36,12 @@ def get_agent_manager_slice(self):
def get_server_slices(self):
return [self.get_server_slice(), self.get_agent_manager_slice()]

@gen.coroutine
def start(self):
for mypart in self.get_server_slices():
self.restserver.add_endpoint(mypart)
self.restserver.start()
yield self.restserver.start()

@gen.coroutine
def stop(self):
self.restserver.stop()
yield self.restserver.stop()
29 changes: 22 additions & 7 deletions src/inmanta/server/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,22 @@ def create_op_mapping(self):
url_map[url][properties["operation"]] = (properties, call, method.__wrapped__)
return url_map

@gen.coroutine
def start(self):
"""
Start the transport
Start the transport.
The order in which the different endpoints are prestarted/started, is determined by the
order in which they are added to the RESTserver via the add_endpoint(endpoint) method.
This order is hardcoded in the get_server_slices() method in server/bootloader.py
"""
LOGGER.debug("Starting Server Rest Endpoint")

for endpoint in self.__end_points:
endpoint.prestart(self)
yield endpoint.prestart(self)

for endpoint in self.__end_points:
endpoint.start()
yield endpoint.start()
self._handlers.extend(endpoint.get_handlers())

url_map = self.create_op_mapping()
Expand Down Expand Up @@ -130,16 +135,23 @@ def start(self):
LOGGER.debug("Created REST transport with SSL")
else:
self.http_server = HTTPServer(application, decompress_request=True)

self.http_server.listen(port)

LOGGER.debug("Start REST transport")

@gen.coroutine
def stop(self):
LOGGER.debug("Stoppin Server Rest Endpoint")
"""
Stop the transport.
The order in which the endpoint are stopped, is reverse compared to the starting order.
This prevents database connection from being closed too early. This order in which the endpoint
are started, is hardcoded in the get_server_slices() method in server/bootloader.py
"""
LOGGER.debug("Stopping Server Rest Endpoint")
self.http_server.stop()
for endpoint in self.__end_points:
endpoint.stop()
for endpoint in reversed(self.__end_points):
yield endpoint.stop()

def return_error_msg(self, status=500, msg="", headers={}):
body = {"message": msg}
Expand All @@ -161,13 +173,16 @@ def __init__(self, name):
self._handlers = []
self._sched = Scheduler()

@gen.coroutine
def prestart(self, server: RESTServer):
"""Called by the RestServer host prior to start, can be used to collect references to other server slices"""
pass

@gen.coroutine
def start(self):
pass

@gen.coroutine
def stop(self):
pass

Expand Down
45 changes: 24 additions & 21 deletions src/inmanta/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,39 +65,42 @@ def __init__(self, database_host=None, database_port=None, agent_no_log=False):
self._server_storage = self.check_storage()
self._agent_no_log = agent_no_log

self._db = None
if database_host is None:
database_host = opt.db_host.get()
self._recompiles = defaultdict(lambda: None)

if database_port is None:
database_port = opt.db_port.get()
self.setup_dashboard()
self.dryrun_lock = locks.Lock()
self._fact_expire = opt.server_fact_expire.get()
self._fact_renew = opt.server_fact_renew.get()
self._database_host = database_host
self._database_port = database_port

data.connect(database_host, database_port, opt.db_name.get())
LOGGER.info("Connected to mongodb database %s on %s:%d", opt.db_name.get(), database_host, database_port)
@gen.coroutine
def prestart(self, server):
self.agentmanager = server.get_endpoint("agentmanager")

ioloop.IOLoop.current().add_callback(data.create_indexes)
@gen.coroutine
def start(self):
if self._database_host is None:
self._database_host = opt.db_host.get()

self._fact_expire = opt.server_fact_expire.get()
self._fact_renew = opt.server_fact_renew.get()
if self._database_port is None:
self._database_port = opt.db_port.get()

data.connect(self._database_host, self._database_port, opt.db_name.get())
LOGGER.info("Connected to mongodb database %s on %s:%d", opt.db_name.get(), self._database_host, self._database_port)

ioloop.IOLoop.current().add_callback(data.create_indexes)

self.schedule(self.renew_expired_facts, self._fact_renew)
self.schedule(self._purge_versions, opt.server_purge_version_interval.get())

ioloop.IOLoop.current().add_callback(self._purge_versions)

self._recompiles = defaultdict(lambda: None)

self.setup_dashboard()
self.dryrun_lock = locks.Lock()

def prestart(self, server):
self.agentmanager = server.get_endpoint("agentmanager")

def start(self):
super().start()
yield super().start()

@gen.coroutine
def stop(self):
super().stop()
yield super().stop()

def get_agent_client(self, tid: UUID, endpoint):
return self.agentmanager.get_agent_client(tid, endpoint)
Expand Down
8 changes: 4 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ async def server(inmanta_config, mongo_db, mongo_client, motor):
await data.create_indexes()

ibl = InmantaBootloader()
ibl.start()
await ibl.start()

yield ibl.restserver

ibl.stop()
await ibl.stop()
shutil.rmtree(state_dir)


Expand Down Expand Up @@ -256,11 +256,11 @@ async def server_multi(inmanta_config, mongo_db, mongo_client, request, motor):
await data.create_indexes()

ibl = InmantaBootloader()
ibl.start()
await ibl.start()

yield ibl.restserver

ibl.stop()
await ibl.stop()

shutil.rmtree(state_dir)

Expand Down
Loading

0 comments on commit 621a660

Please sign in to comment.