Skip to content

Commit

Permalink
WIP on #19; minor bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dcvan24 committed May 10, 2018
1 parent ab70e1e commit 5199d9e
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 75 deletions.
50 changes: 2 additions & 48 deletions appliance/manager.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from config import config
from commons import APIManager, Singleton, Loggable, MotorClient, AutonomousMonitor
from appliance.base import Appliance
from container.base import ContainerType, ContainerState
from container.manager import ContainerManager
from scheduler import DefaultApplianceScheduler


class ApplianceManager(Loggable, metaclass=Singleton):
Expand Down Expand Up @@ -48,7 +48,7 @@ async def create_appliance(self, data):
return status, None, err
self.logger.info(msg)
self.logger.info("Start monitoring appliance '%s'"%app)
ApplianceDAGMonitor(app).start()
DefaultApplianceScheduler(app).start()
return 201, app, None

async def delete_appliance(self, app_id):
Expand Down Expand Up @@ -138,49 +138,3 @@ async def callback(self):
elif status != 200:
self.logger.error(err)
self.stop()


class ApplianceDAGMonitor(AutonomousMonitor):

def __init__(self, app):
super(ApplianceDAGMonitor, self).__init__(5000)
self.__app = app
self.__app_mgr = ApplianceManager()
self.__contr_mgr = ContainerManager()

async def callback(self):
app = self.__app
self.logger.info('Containers left: %s'%list(app.dag.parent_map.keys()))
if self.is_running and app.dag.is_empty:
self.logger.info('DAG is empty, stop monitoring')
self.stop()
return
free_contrs = [c.id for c in app.dag.get_free_containers()]
self.logger.info('Free containers: %s'%free_contrs)
for c in app.dag.get_free_containers():
if c.state == ContainerState.SUBMITTED:
self.logger.info('Launch container: %s'%c)
status, _, err = await self.__contr_mgr.provision_container(c)
if status not in (200, 409):
self.logger.info(status)
self.logger.error("Failed to launch container '%s'"%c)
self.logger.error(err)
self.logger.info('Update DAG')
for c in app.dag.get_free_containers():
status, c, err = await self.__contr_mgr.get_container(app.id, c.id)
if status != 200:
self.logger.error(err)
if status == 404:
status, _, _ = await self.__app_mgr.get_appliance(app.id)
if status == 404:
self.logger.info("Appliance '%s' is already deleted, stop monitoring"%app.id)
self.stop()
continue
if (c.type == ContainerType.SERVICE and c.state == ContainerState.RUNNING) \
or (c.type == ContainerType.JOB and c.state == ContainerState.SUCCESS):
app.dag.remove_container(c.id)
else:
app.dag.update_container(c)
status, msg, err = await self.__app_mgr.save_appliance(app, False)
if err:
self.logger.error(err)
45 changes: 33 additions & 12 deletions cluster/manager.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,52 @@
import datetime

from config import config
from cluster.base import Host, HostResources
from commons import Singleton, MotorClient, Loggable, APIManager, AutonomousMonitor


class ClusterManager(Loggable, metaclass=Singleton):

MONITOR_INTERVAL = 30000

def __init__(self):
self.__cluster_db = ClusterDBManager()
self.__cluster_api = ClusterAPIManager()
self.__cluster_monitor = ClusterMonitor(self.__cluster_api, self.__cluster_db)
def __init__(self, monitor_interval=30000):
cluster_api, cluster_db = ClusterAPIManager(), ClusterDBManager()
self.__cluster_api, self.__cluster_db = cluster_api, cluster_db
self.__cluster_monitor = ClusterMonitor(cluster_api, cluster_db, monitor_interval)

async def get_cluster(self):
async def get_cluster(self, ttl=30):
if self._is_cache_expired(ttl):
await self.__cluster_monitor.update()
return await self.__cluster_db.get_cluster()

async def find_hosts(self, **kwargs):
async def find_hosts(self, ttl=30, **kwargs):
if self._is_cache_expired(ttl):
await self.__cluster_monitor.update()
return await self.__cluster_db.find_hosts(**kwargs)

def start_monitor(self):
self.__cluster_monitor.start()

def _is_cache_expired(self, ttl):
if ttl is None:
return False
if not self.__cluster_monitor.last_update:
return True
ttl = datetime.timedelta(seconds=ttl)
return self.__cluster_monitor.last_update - datetime.datetime.now(tz=None) > ttl


class ClusterMonitor(AutonomousMonitor):

def __init__(self, api, db):
super(ClusterMonitor, self).__init__(30000)
def __init__(self, api, db, interval=30000):
super(ClusterMonitor, self).__init__(interval)
self.__api = api
self.__db = db

async def callback(self):
self.__last_update = None

@property
def last_update(self):
return self.__last_update

async def update(self):
status, master, err = await self.__api.get_current_master()
if status != 200:
self.logger.error(err)
Expand All @@ -43,6 +60,10 @@ async def callback(self):
self.logger.info(err)
return
await self.__db.update_hosts(hosts)
self.__last_update = datetime.datetime.now(tz=None)

async def callback(self):
await self.update()


class ClusterAPIManager(APIManager):
Expand Down
20 changes: 17 additions & 3 deletions container/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from enum import Enum

from util import parse_datetime

@swagger.enum
class ContainerType(Enum):
Expand Down Expand Up @@ -416,7 +417,7 @@ def __init__(self, id, appliance, type, image, resources, cmd=None, args=[], env
self.__force_pull_image = force_pull_image
self.__dependencies = dependencies
self.__input_data = input_data
self.__last_update = last_update
self.__last_update = parse_datetime(last_update)

@property
@swagger.property
Expand Down Expand Up @@ -713,6 +714,10 @@ def rack(self, rack):
def host(self, host):
self.__host = host

@last_update.setter
def last_update(self, last_update):
self.__last_update = parse_datetime(last_update)

def add_env(self, **env):
self.__env.update(env)

Expand All @@ -730,7 +735,7 @@ def to_render(self):
state=self.state.value, is_privileged=self.is_privileged,
force_pull_image=self.force_pull_image, dependencies=self.dependencies,
input_data=self.input_data, rack=self.rack, host=self.host,
last_update=self.last_update)
last_update=self.last_update and self.last_update.isoformat())

def to_save(self):
return dict(id=self.id, appliance=self.appliance, type=self.type.value,
Expand All @@ -743,4 +748,13 @@ def to_save(self):
state=self.state.value, is_privileged=self.is_privileged,
force_pull_image=self.force_pull_image, dependencies=self.dependencies,
input_data=self.input_data, rack=self.rack, host=self.host,
last_update=self.last_update)
last_update=self.last_update and self.last_update.isoformat())

def __hash__(self):
return hash((self.id, self.appliance))

def __eq__(self, other):
return self.__class__ == other.__class__ \
and self.id == other.id \
and self.appliance == other.appliance

2 changes: 1 addition & 1 deletion container/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def to_request(self):
r = dict(name=str(self),
schedule='R%d/%s/P%s'%(self.repeats, self.start_time, self.interval),
cpus=self.resources.cpus, mem=self.resources.mem, disk=self.resources.disk,
shell=self.args is None,
shell=not self.args,
command = self.cmd if self.cmd else '',
environmentVariables=[dict(name=k,
value=parse_container_short_id(v, self.appliance))
Expand Down
16 changes: 8 additions & 8 deletions container/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,26 @@

class ContainerManager(Loggable, metaclass=Singleton):

CONTAINER_REC_TTL = timedelta(seconds=3)

def __init__(self):
def __init__(self, contr_info_ttl=timedelta(seconds=3)):
self.__service_api = ServiceAPIManager()
self.__job_api = JobAPIManager()
self.__contr_db = ContainerDBManager()
self.__cluster_db = ClusterDBManager()
self.__contr_info_ttl = contr_info_ttl

async def get_container(self, app_id, contr_id):
status, contr, err = await self.__contr_db.get_container(app_id, contr_id)
if status == 404:
return status, contr, err
if not contr.last_update or \
datetime.datetime.now(tz=None) - contr.last_update > self.CONTAINER_REC_TTL:
datetime.datetime.now(tz=None) - contr.last_update > self.__contr_info_ttl:
status, contr, err = await self._get_updated_container(contr)
if status == 404 and contr.state != ContainerState.SUBMITTED:
self.logger.info("Deleted ghost container: %s"%contr)
await self.__contr_db.delete_container(contr)
return 404, None, err
if status == 200:
contr.last_update = datetime.datetime.now(tz=None)
await self.__contr_db.save_container(contr, False)
elif status != 404:
self.logger.error("Failed to update container '%s'"%contr)
Expand All @@ -42,7 +42,7 @@ async def get_containers(self, app_id, **kwargs):
contrs_to_del, contrs_to_update = [], [],
cur_time = datetime.datetime.now(tz=None)
for c in contrs:
if c.last_update and cur_time - c.last_update <= self.CONTAINER_REC_TTL:
if c.last_update and cur_time - c.last_update <= self.__contr_info_ttl:
continue
status, c, err = await self._get_updated_container(c)
if status == 404 and c.state != ContainerState.SUBMITTED:
Expand All @@ -56,8 +56,8 @@ async def get_containers(self, app_id, **kwargs):
self.logger.error(err)
else:
self.logger.info(msg)

for c in contrs_to_update:
c.last_update = datetime.datetime.now(tz=None)
await self.__contr_db.save_container(c, upsert=False)
return 200, contrs, None

Expand Down Expand Up @@ -122,7 +122,7 @@ async def provision_container(self, contr):

async def _get_updated_container(self, contr):
assert isinstance(contr, Container)
self.logger.info('Update container info: %s'%contr)
self.logger.debug('Update container info: %s'%contr)
if contr.type == ContainerType.SERVICE:
status, raw_service, err = await self.__service_api.get_service_update(contr)
if not err:
Expand Down Expand Up @@ -235,7 +235,7 @@ async def get_job_update(self, job):
if status != 200:
self.logger.debug(err)
return status, job, err
return status, body, None
return status, jobs[0], None

async def provision_job(self, job):
api = config.chronos
Expand Down
2 changes: 0 additions & 2 deletions container/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,5 +297,3 @@ def _get_default_env(self):

def __str__(self):
return '/%s/%s'%(self.appliance, self.id)


1 change: 1 addition & 0 deletions requirement.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ motor==1.1
pymongo==3.5.1
tornado==5.0.1
PyYAML==3.12
python-dateutil==2.7.2
Loading

0 comments on commit 5199d9e

Please sign in to comment.