Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ext cpu throttling scenarios #2581

Merged
merged 7 commits into from
May 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions azurelinuxagent/common/cgroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def _get_cpu_ticks(self, allow_no_such_file_or_directory_error=False):

return cpu_ticks

def _get_throttled_time(self):
def get_throttled_time(self):
try:
with open(os.path.join(self.path, 'cpu.stat')) as cpu_stat:
#
Expand Down Expand Up @@ -205,7 +205,7 @@ def initialize_cpu_usage(self):
raise CGroupsException("initialize_cpu_usage() should be invoked only once")
self._current_cgroup_cpu = self._get_cpu_ticks(allow_no_such_file_or_directory_error=True)
self._current_system_cpu = self._osutil.get_total_cpu_ticks_since_boot()
self._current_throttled_time = self._get_throttled_time()
self._current_throttled_time = self.get_throttled_time()

def get_cpu_usage(self):
"""
Expand All @@ -229,16 +229,20 @@ def get_cpu_usage(self):

return round(100.0 * self._osutil.get_processor_cores() * float(cgroup_delta) / float(system_delta), 3)

def get_throttled_time(self):
def get_cpu_throttled_time(self, read_previous_throttled_time=True):
"""
Computes the throttled time (in seconds) since the last call to this function.
NOTE: initialize_cpu_usage() must be invoked before calling this function
Compute only current throttled time if read_previous_throttled_time set to False
"""
if not read_previous_throttled_time:
return float(self.get_throttled_time() / 1E9)

if not self._cpu_usage_initialized():
raise CGroupsException("initialize_cpu_usage() must be invoked before the first call to get_throttled_time()")

self._previous_throttled_time = self._current_throttled_time
self._current_throttled_time = self._get_throttled_time()
self._current_throttled_time = self.get_throttled_time()

return float(self._current_throttled_time - self._previous_throttled_time) / 1E9

Expand All @@ -249,7 +253,7 @@ def get_tracked_metrics(self, **kwargs):
tracked.append(MetricValue(MetricsCategory.CPU_CATEGORY, MetricsCounter.PROCESSOR_PERCENT_TIME, self.name, cpu_usage))

if 'track_throttled_time' in kwargs and kwargs['track_throttled_time']:
throttled_time = self.get_throttled_time()
throttled_time = self.get_cpu_throttled_time()
if cpu_usage >= float(0) and throttled_time >= float(0):
tracked.append(MetricValue(MetricsCategory.CPU_CATEGORY, MetricsCounter.THROTTLED_TIME, self.name, throttled_time))

Expand Down
6 changes: 4 additions & 2 deletions azurelinuxagent/common/cgroupapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh

logger.info("Started extension in unit '{0}'", scope_name)

cpu_cgroup = None
try:
cgroup_relative_path = os.path.join('azure.slice/azure-vmextensions.slice', extension_slice_name)

Expand All @@ -289,7 +290,8 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh
logger.info("The CPU controller is not mounted; will not track resource usage")
else:
cpu_cgroup_path = os.path.join(cpu_cgroup_mountpoint, cgroup_relative_path)
CGroupsTelemetry.track_cgroup(CpuCgroup(extension_name, cpu_cgroup_path))
cpu_cgroup = CpuCgroup(extension_name, cpu_cgroup_path)
CGroupsTelemetry.track_cgroup(cpu_cgroup)

except IOError as e:
if e.errno == 2: # 'No such file or directory'
Expand All @@ -301,7 +303,7 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh
# Wait for process completion or timeout
try:
return handle_process_completion(process=process, command=command, timeout=timeout, stdout=stdout,
stderr=stderr, error_code=error_code)
stderr=stderr, error_code=error_code, cpu_cgroup=cpu_cgroup)
except ExtensionError as e:
# The extension didn't terminate successfully. Determine whether it was due to systemd errors or
# extension errors.
Expand Down
103 changes: 85 additions & 18 deletions azurelinuxagent/common/cgroupconfigurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.
#
# Requires Python 2.6+ and Openssl 1.0+
import glob
import json
import os
import re
import subprocess
Expand Down Expand Up @@ -358,12 +360,16 @@ def __setup_azure_slice():
CGroupConfigurator._Impl.__cleanup_unit_file(unit_file)
return

# reload the systemd configuration; the new slices will be used once the agent's service restarts
try:
logger.info("Executing systemctl daemon-reload...")
shellutil.run_command(["systemctl", "daemon-reload"])
except Exception as exception:
_log_cgroup_warning("daemon-reload failed (create azure slice): {0}", ustr(exception))
CGroupConfigurator._Impl.__reload_systemd_config()

@staticmethod
def __reload_systemd_config():
# reload the systemd configuration; the new slices will be used once the agent's service restarts
try:
logger.info("Executing systemctl daemon-reload...")
shellutil.run_command(["systemctl", "daemon-reload"])
except Exception as exception:
_log_cgroup_warning("daemon-reload failed (create azure slice): {0}", ustr(exception))

@staticmethod
def __create_unit_file(path, contents):
Expand Down Expand Up @@ -479,18 +485,23 @@ def enable(self):
self.__set_cpu_quota(conf.get_agent_cpu_quota())

def disable(self, reason, disable_cgroups):
# Todo: disable/reset extension when ext quotas introduced
if disable_cgroups == DisableCgroups.ALL: # disable all
self._agent_cgroups_enabled = False
self._extensions_cgroups_enabled = False
# Reset quotas
self.__reset_agent_cpu_quota()
extension_services = self.get_extension_services_list()
for extension in extension_services:
narrieta marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Resetting extension : {0} and it's services: {1} CPUQuota".format(extension, extension_services[extension]))
self.__reset_extension_cpu_quota(extension_name=extension)
self.__reset_extension_services_cpu_quota(extension_services[extension])
self.__reload_systemd_config()
narrieta marked this conversation as resolved.
Show resolved Hide resolved

CGroupsTelemetry.reset()
self._agent_cgroups_enabled = False
self._extensions_cgroups_enabled = False
elif disable_cgroups == DisableCgroups.AGENT: # disable agent
self._agent_cgroups_enabled = False
self.__reset_agent_cpu_quota()
CGroupsTelemetry.stop_tracking(CpuCgroup(AGENT_NAME_TELEMETRY, self._agent_cpu_cgroup_path))
elif disable_cgroups == DisableCgroups.EXTENSIONS: # disable extensions
self._extensions_cgroups_enabled = False
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed as we don't disable particular extension


message = "[CGW] Disabling resource usage monitoring. Reason: {0}".format(reason)
logger.info(message) # log as INFO for now, in the future it should be logged as WARNING
Expand Down Expand Up @@ -519,7 +530,6 @@ def __reset_agent_cpu_quota():
"""
logger.info("Resetting agent's CPUQuota")
if CGroupConfigurator._Impl.__try_set_cpu_quota(''): # setting an empty value resets to the default (infinity)
CGroupsTelemetry.set_track_throttled_time(False)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed as we are already stop tracking the cgroup and also this would set to false for extensions too which we don't want to do it on agent throttling case

_log_cgroup_info('CPUQuota: {0}', systemd.get_unit_property(systemd.get_agent_unit_name(), "CPUQuotaPerSecUSec"))

@staticmethod
Expand Down Expand Up @@ -763,6 +773,16 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh
process = subprocess.Popen(command, shell=shell, cwd=cwd, env=env, stdout=stdout, stderr=stderr, preexec_fn=os.setsid) # pylint: disable=W1509
return handle_process_completion(process=process, command=command, timeout=timeout, stdout=stdout, stderr=stderr, error_code=error_code)

def __reset_extension_cpu_quota(self, extension_name):
"""
Removes any CPUQuota on the extension

NOTE: This resets the quota on the extension's slice; any local overrides on the VM will take precedence
over this setting.
"""
if self.enabled():
self.setup_extension_slice(extension_name, cpu_quota=None)

def setup_extension_slice(self, extension_name, cpu_quota):
"""
Each extension runs under its own slice (Ex "Microsoft.CPlat.Extension.slice"). All the slices for
Expand All @@ -777,11 +797,11 @@ def setup_extension_slice(self, extension_name, cpu_quota):
extension_slice_path = os.path.join(unit_file_install_path,
SystemdCgroupsApi.get_extension_slice_name(extension_name))
try:
cpu_quota = str(cpu_quota) + "%" if cpu_quota is not None else ""
cpu_quota = str(cpu_quota) + "%" if cpu_quota is not None else "" # setting an empty value resets to the default (infinity)
slice_contents = _EXTENSION_SLICE_CONTENTS.format(extension_name=extension_name, cpu_quota=cpu_quota)
CGroupConfigurator._Impl.__create_unit_file(extension_slice_path, slice_contents)
except Exception as exception:
_log_cgroup_warning("Failed to create unit files for the extension slice: {0}", ustr(exception))
_log_cgroup_warning("Failed to set the extension {0} slice and quotas: {1}", extension_name, ustr(exception))
CGroupConfigurator._Impl.__cleanup_unit_file(extension_slice_path)

def remove_extension_slice(self, extension_name):
Expand Down Expand Up @@ -823,13 +843,35 @@ def set_extension_services_cpu_memory_quota(self, services_list):
files_to_create.append((drop_in_file_cpu_quota, cpu_quota_contents))

self.__create_all_files(files_to_create)
self.__reload_systemd_config()

def __reset_extension_services_cpu_quota(self, services_list):
"""
Removes any CPUQuota on the extension service

# reload the systemd configuration; the new unit will be used once the service restarts
NOTE: This resets the quota on the extension service's default dropin file; any local overrides on the VM will take precedence
over this setting.
"""
if self.enabled() and services_list is not None:
try:
logger.info("Executing systemctl daemon-reload...")
shellutil.run_command(["systemctl", "daemon-reload"])
service_name = None
for service in services_list:
service_name = service.get('name', None)
unit_file_path = systemd.get_unit_file_install_path()
if service_name is not None and unit_file_path is not None:
files_to_create = []
drop_in_path = os.path.join(unit_file_path, "{0}.d".format(service_name))
cpu_quota = "" # setting an empty value resets to the default (infinity)
drop_in_file_cpu_quota = os.path.join(drop_in_path, _DROP_IN_FILE_CPU_QUOTA)
cpu_quota_contents = _DROP_IN_FILE_CPU_QUOTA_CONTENTS_FORMAT.format(cpu_quota)
if os.path.exists(drop_in_file_cpu_quota):
with open(drop_in_file_cpu_quota, "r") as file_:
if file_.read() == cpu_quota_contents:
return
files_to_create.append((drop_in_file_cpu_quota, cpu_quota_contents))
self.__create_all_files(files_to_create)
except Exception as exception:
_log_cgroup_warning("daemon-reload failed (create service unit files): {0}", ustr(exception))
_log_cgroup_warning('Failed to reset CPUQuota for {0} : {1}', service_name, ustr(exception))

def remove_extension_services_drop_in_files(self, services_list):
"""
Expand Down Expand Up @@ -873,6 +915,31 @@ def start_tracking_extension_services_cgroups(self, services_list):
if service_name is not None:
self.start_tracking_unit_cgroups(service_name)

@staticmethod
def get_extension_services_list():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retrieve the manifest files of installed extensions to reset quotas when we disable cgroups for all.

"""
ResourceLimits for extensions are coming from <extName>/HandlerManifest.json file.
Use this pattern to determine all the installed extension HandlerManifest files and
read the extension services if ResourceLimits are present.
"""
extensions_services = {}
for manifest_path in glob.iglob(os.path.join(conf.get_lib_dir(), "*/HandlerManifest.json")):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we abstract the extension traversal in ExtensionsHandlerHandler? that class already has some code to do this kind of stuff, and takes into account, for example, extensions that failed to install (you probably want to skip those)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The challenge was getting around cyclic dependency if I have to move this to extensionhandler

match = re.search("(?P<extname>[\\w+\\.-]+).HandlerManifest\\.json", manifest_path)
if match is not None:
extensions_name = match.group('extname')
if not extensions_name.startswith('WALinuxAgent'):
try:
data = json.loads(fileutil.read_file(manifest_path))
resource_limits = data[0].get('resourceLimits', None)
services = resource_limits.get('services') if resource_limits else None
extensions_services[extensions_name] = services
except (IOError, OSError) as e:
_log_cgroup_warning(
'Failed to load manifest file ({0}): {1}'.format(manifest_path, e.strerror))
except ValueError:
_log_cgroup_warning('Malformed manifest file ({0}).'.format(manifest_path))
return extensions_services

# unique instance for the singleton
_instance = None

Expand Down
30 changes: 26 additions & 4 deletions azurelinuxagent/common/utils/extensionprocessutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import signal
import time

from azurelinuxagent.common import logger
from azurelinuxagent.common.exception import ExtensionErrorCodes, ExtensionOperationError, ExtensionError
from azurelinuxagent.common.future import ustr

TELEMETRY_MESSAGE_MAX_LEN = 3200


def wait_for_process_completion_or_timeout(process, timeout):
def wait_for_process_completion_or_timeout(process, timeout, cpu_cgroup):
"""
Utility function that waits for the process to complete within the given time frame. This function will terminate
the process if when the given time frame elapses.
Expand All @@ -40,18 +41,20 @@ def wait_for_process_completion_or_timeout(process, timeout):
timeout -= 1

return_code = None
throttled_time = 0

if timeout == 0:
throttled_time = get_cpu_throttled_time(cpu_cgroup)
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
else:
# process completed or forked; sleep 1 sec to give the child process (if any) a chance to start
time.sleep(1)
return_code = process.wait()

return timeout == 0, return_code
return timeout == 0, return_code, throttled_time


def handle_process_completion(process, command, timeout, stdout, stderr, error_code):
def handle_process_completion(process, command, timeout, stdout, stderr, error_code, cpu_cgroup=None):
"""
Utility function that waits for process completion and retrieves its output (stdout and stderr) if it completed
before the timeout period. Otherwise, the process will get killed and an ExtensionError will be raised.
Expand All @@ -62,13 +65,18 @@ def handle_process_completion(process, command, timeout, stdout, stderr, error_c
:param stdout: Must be a file since we seek on it when parsing the subprocess output
:param stderr: Must be a file since we seek on it when parsing the subprocess outputs
:param error_code: The error code to set if we raise an ExtensionError
:param cpu_cgroup: Reference the cpu cgroup name and path
:return:
"""
# Wait for process completion or timeout
timed_out, return_code = wait_for_process_completion_or_timeout(process, timeout)
timed_out, return_code, throttled_time = wait_for_process_completion_or_timeout(process, timeout, cpu_cgroup)
process_output = read_output(stdout, stderr)

if timed_out:
if cpu_cgroup is not None:# Report CPUThrottledTime when timeout happens
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Report the Throttled time in the exception msg when timeout case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of passing the cpu_cgroup as parameter down the call stack, can we handle the exception in the cgroup configurator and report the throttle time there? (you can create a new exception ExtensionTimeout derived from ExtensionError)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I would like to do that way but when timeout happens, we kill the process before returns ExtensionTimeout exception. That would remove the cgroups so I had to pass cpu_cgroup down the lane to compute before kill.

raise ExtensionError("Timeout({0});CPUThrottledTime({1}secs): {2}\n{3}".format(timeout, throttled_time, command, process_output),
code=ExtensionErrorCodes.PluginHandlerScriptTimedout)

raise ExtensionError("Timeout({0}): {1}\n{2}".format(timeout, command, process_output),
code=ExtensionErrorCodes.PluginHandlerScriptTimedout)

Expand Down Expand Up @@ -141,3 +149,17 @@ def to_s(captured_stdout, stdout_offset, captured_stderr, stderr_offset):
return to_s(stdout, -1*stdout_len, stderr, 0)
else:
return to_s(stdout, -1*max_len_each, stderr, -1*max_len_each)


def get_cpu_throttled_time(cpu_cgroup):
"""
return the throttled time for the given cgroup.
"""
throttled_time = 0
if cpu_cgroup is not None:
try:
throttled_time = cpu_cgroup.get_cpu_throttled_time(read_previous_throttled_time=False)
except Exception as e:
logger.warn("Failed to get cpu throttled time for the extension: {0}", ustr(e))

return throttled_time
Loading