From 3ed3a12585d9f3a5cae6e7aa63a8b7e8bb5cec44 Mon Sep 17 00:00:00 2001 From: Nageswara Nandigam Date: Wed, 20 Apr 2022 16:59:37 -0700 Subject: [PATCH 1/3] ext cpu throttling scenarios --- azurelinuxagent/common/cgroup.py | 15 ++- azurelinuxagent/common/cgroupapi.py | 6 +- azurelinuxagent/common/cgroupconfigurator.py | 103 ++++++++++++++---- .../common/utils/extensionprocessutil.py | 30 ++++- tests/common/test_cgroupconfigurator.py | 54 ++++++++- tests/common/test_cgroups.py | 2 +- tests/common/test_cgroupstelemetry.py | 2 +- tests/utils/test_extension_process_util.py | 50 ++++++++- 8 files changed, 223 insertions(+), 39 deletions(-) diff --git a/azurelinuxagent/common/cgroup.py b/azurelinuxagent/common/cgroup.py index 900f4f5b4..bc9c8ea80 100644 --- a/azurelinuxagent/common/cgroup.py +++ b/azurelinuxagent/common/cgroup.py @@ -13,6 +13,7 @@ # limitations under the License. # # Requires Python 2.6+ and Openssl 1.0+ +import threading from collections import namedtuple import errno @@ -138,6 +139,7 @@ def __init__(self, name, cgroup_path): self._current_system_cpu = None self._previous_throttled_time = None self._current_throttled_time = None + self._throttled_time_lock = threading.RLock() # Protect the get_throttled_time which is called from Monitor thread and cgroupconfigurator. def _get_cpu_ticks(self, allow_no_such_file_or_directory_error=False): """ @@ -171,7 +173,8 @@ def _get_cpu_ticks(self, allow_no_such_file_or_directory_error=False): return cpu_ticks - def _get_throttled_time(self): + def get_throttled_time(self): + self._throttled_time_lock.acquire() try: with open(os.path.join(self.path, 'cpu.stat')) as cpu_stat: # @@ -193,6 +196,8 @@ def _get_throttled_time(self): raise CGroupsException("Failed to read cpu.stat: {0}".format(ustr(e))) except Exception as e: raise CGroupsException("Failed to read cpu.stat: {0}".format(ustr(e))) + finally: + self._throttled_time_lock.release() def _cpu_usage_initialized(self): return self._current_cgroup_cpu is not None and self._current_system_cpu is not None @@ -205,7 +210,7 @@ def initialize_cpu_usage(self): raise CGroupsException("initialize_cpu_usage() should be invoked only once") self._current_cgroup_cpu = self._get_cpu_ticks(allow_no_such_file_or_directory_error=True) self._current_system_cpu = self._osutil.get_total_cpu_ticks_since_boot() - self._current_throttled_time = self._get_throttled_time() + self._current_throttled_time = self.get_throttled_time() def get_cpu_usage(self): """ @@ -229,7 +234,7 @@ def get_cpu_usage(self): return round(100.0 * self._osutil.get_processor_cores() * float(cgroup_delta) / float(system_delta), 3) - def get_throttled_time(self): + def get_cpu_throttled_time(self): """ Computes the throttled time (in seconds) since the last call to this function. NOTE: initialize_cpu_usage() must be invoked before calling this function @@ -238,7 +243,7 @@ def get_throttled_time(self): raise CGroupsException("initialize_cpu_usage() must be invoked before the first call to get_throttled_time()") self._previous_throttled_time = self._current_throttled_time - self._current_throttled_time = self._get_throttled_time() + self._current_throttled_time = self.get_throttled_time() return float(self._current_throttled_time - self._previous_throttled_time) / 1E9 @@ -249,7 +254,7 @@ def get_tracked_metrics(self, **kwargs): tracked.append(MetricValue(MetricsCategory.CPU_CATEGORY, MetricsCounter.PROCESSOR_PERCENT_TIME, self.name, cpu_usage)) if 'track_throttled_time' in kwargs and kwargs['track_throttled_time']: - throttled_time = self.get_throttled_time() + throttled_time = self.get_cpu_throttled_time() if cpu_usage >= float(0) and throttled_time >= float(0): tracked.append(MetricValue(MetricsCategory.CPU_CATEGORY, MetricsCounter.THROTTLED_TIME, self.name, throttled_time)) diff --git a/azurelinuxagent/common/cgroupapi.py b/azurelinuxagent/common/cgroupapi.py index f69b1591c..7b7e688a1 100644 --- a/azurelinuxagent/common/cgroupapi.py +++ b/azurelinuxagent/common/cgroupapi.py @@ -280,6 +280,7 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh logger.info("Started extension in unit '{0}'", scope_name) + cpu_cgroup = None try: cgroup_relative_path = os.path.join('azure.slice/azure-vmextensions.slice', extension_slice_name) @@ -289,7 +290,8 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh logger.info("The CPU controller is not mounted; will not track resource usage") else: cpu_cgroup_path = os.path.join(cpu_cgroup_mountpoint, cgroup_relative_path) - CGroupsTelemetry.track_cgroup(CpuCgroup(extension_name, cpu_cgroup_path)) + cpu_cgroup = CpuCgroup(extension_name, cpu_cgroup_path) + CGroupsTelemetry.track_cgroup(cpu_cgroup) except IOError as e: if e.errno == 2: # 'No such file or directory' @@ -301,7 +303,7 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh # Wait for process completion or timeout try: return handle_process_completion(process=process, command=command, timeout=timeout, stdout=stdout, - stderr=stderr, error_code=error_code) + stderr=stderr, error_code=error_code, cpu_cgroup=cpu_cgroup) except ExtensionError as e: # The extension didn't terminate successfully. Determine whether it was due to systemd errors or # extension errors. diff --git a/azurelinuxagent/common/cgroupconfigurator.py b/azurelinuxagent/common/cgroupconfigurator.py index 7dc0a80a9..7bfd4c62c 100644 --- a/azurelinuxagent/common/cgroupconfigurator.py +++ b/azurelinuxagent/common/cgroupconfigurator.py @@ -14,6 +14,8 @@ # limitations under the License. # # Requires Python 2.6+ and Openssl 1.0+ +import glob +import json import os import re import subprocess @@ -358,12 +360,16 @@ def __setup_azure_slice(): CGroupConfigurator._Impl.__cleanup_unit_file(unit_file) return - # reload the systemd configuration; the new slices will be used once the agent's service restarts - try: - logger.info("Executing systemctl daemon-reload...") - shellutil.run_command(["systemctl", "daemon-reload"]) - except Exception as exception: - _log_cgroup_warning("daemon-reload failed (create azure slice): {0}", ustr(exception)) + CGroupConfigurator._Impl.__reload_systemd_config() + + @staticmethod + def __reload_systemd_config(): + # reload the systemd configuration; the new slices will be used once the agent's service restarts + try: + logger.info("Executing systemctl daemon-reload...") + shellutil.run_command(["systemctl", "daemon-reload"]) + except Exception as exception: + _log_cgroup_warning("daemon-reload failed (create azure slice): {0}", ustr(exception)) @staticmethod def __create_unit_file(path, contents): @@ -479,18 +485,22 @@ def enable(self): self.__set_cpu_quota(conf.get_agent_cpu_quota()) def disable(self, reason, disable_cgroups): - # Todo: disable/reset extension when ext quotas introduced if disable_cgroups == DisableCgroups.ALL: # disable all - self._agent_cgroups_enabled = False - self._extensions_cgroups_enabled = False + # Reset quotas self.__reset_agent_cpu_quota() + extension_services = self.get_extension_services_list() + for extension in extension_services: + self.__reset_extension_cpu_quota(extension_name=extension) + self.__reset_extension_services_cpu_quota(extension_services[extension]) + self.__reload_systemd_config() + CGroupsTelemetry.reset() + self._agent_cgroups_enabled = False + self._extensions_cgroups_enabled = False elif disable_cgroups == DisableCgroups.AGENT: # disable agent self._agent_cgroups_enabled = False self.__reset_agent_cpu_quota() CGroupsTelemetry.stop_tracking(CpuCgroup(AGENT_NAME_TELEMETRY, self._agent_cpu_cgroup_path)) - elif disable_cgroups == DisableCgroups.EXTENSIONS: # disable extensions - self._extensions_cgroups_enabled = False message = "[CGW] Disabling resource usage monitoring. Reason: {0}".format(reason) logger.info(message) # log as INFO for now, in the future it should be logged as WARNING @@ -519,7 +529,6 @@ def __reset_agent_cpu_quota(): """ logger.info("Resetting agent's CPUQuota") if CGroupConfigurator._Impl.__try_set_cpu_quota(''): # setting an empty value resets to the default (infinity) - CGroupsTelemetry.set_track_throttled_time(False) _log_cgroup_info('CPUQuota: {0}', systemd.get_unit_property(systemd.get_agent_unit_name(), "CPUQuotaPerSecUSec")) @staticmethod @@ -763,6 +772,16 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh process = subprocess.Popen(command, shell=shell, cwd=cwd, env=env, stdout=stdout, stderr=stderr, preexec_fn=os.setsid) # pylint: disable=W1509 return handle_process_completion(process=process, command=command, timeout=timeout, stdout=stdout, stderr=stderr, error_code=error_code) + def __reset_extension_cpu_quota(self, extension_name): + """ + Removes any CPUQuota on the extension + + NOTE: This resets the quota on the extension's slice; any local overrides on the VM will take precedence + over this setting. + """ + if self.enabled(): + self.setup_extension_slice(extension_name, cpu_quota=None) + def setup_extension_slice(self, extension_name, cpu_quota): """ Each extension runs under its own slice (Ex "Microsoft.CPlat.Extension.slice"). All the slices for @@ -777,7 +796,7 @@ def setup_extension_slice(self, extension_name, cpu_quota): extension_slice_path = os.path.join(unit_file_install_path, SystemdCgroupsApi.get_extension_slice_name(extension_name)) try: - cpu_quota = str(cpu_quota) + "%" if cpu_quota is not None else "" + cpu_quota = str(cpu_quota) + "%" if cpu_quota is not None else "" # setting an empty value resets to the default (infinity) slice_contents = _EXTENSION_SLICE_CONTENTS.format(extension_name=extension_name, cpu_quota=cpu_quota) CGroupConfigurator._Impl.__create_unit_file(extension_slice_path, slice_contents) except Exception as exception: @@ -823,13 +842,34 @@ def set_extension_services_cpu_memory_quota(self, services_list): files_to_create.append((drop_in_file_cpu_quota, cpu_quota_contents)) self.__create_all_files(files_to_create) + self.__reload_systemd_config() - # reload the systemd configuration; the new unit will be used once the service restarts - try: - logger.info("Executing systemctl daemon-reload...") - shellutil.run_command(["systemctl", "daemon-reload"]) - except Exception as exception: - _log_cgroup_warning("daemon-reload failed (create service unit files): {0}", ustr(exception)) + def __reset_extension_services_cpu_quota(self, services_list): + """ + Removes any CPUQuota on the extension service + + NOTE: This resets the quota on the extension service's default dropin file; any local overrides on the VM will take precedence + over this setting. + """ + try: + if self.enabled() and services_list is not None: + for service in services_list: + service_name = service.get('name', None) + unit_file_path = systemd.get_unit_file_install_path() + if service_name is not None and unit_file_path is not None: + files_to_create = [] + drop_in_path = os.path.join(unit_file_path, "{0}.d".format(service_name)) + cpu_quota = "" # setting an empty value resets to the default (infinity) + drop_in_file_cpu_quota = os.path.join(drop_in_path, _DROP_IN_FILE_CPU_QUOTA) + cpu_quota_contents = _DROP_IN_FILE_CPU_QUOTA_CONTENTS_FORMAT.format(cpu_quota) + if os.path.exists(drop_in_file_cpu_quota): + with open(drop_in_file_cpu_quota, "r") as file_: + if file_.read() == cpu_quota_contents: + return + files_to_create.append((drop_in_file_cpu_quota, cpu_quota_contents)) + self.__create_all_files(files_to_create) + except Exception as exception: + _log_cgroup_warning('Failed to set CPUQuota: {0}', ustr(exception)) def remove_extension_services_drop_in_files(self, services_list): """ @@ -873,6 +913,31 @@ def start_tracking_extension_services_cgroups(self, services_list): if service_name is not None: self.start_tracking_unit_cgroups(service_name) + @staticmethod + def get_extension_services_list(): + """ + ResourceLimits for extensions are coming from /HandlerManifest.json file. + Use this pattern to determine all the installed extension HandlerManifest files and + read the extension services if ResourceLimits are present. + """ + extensions_services = {} + for manifest_path in glob.iglob(os.path.join(conf.get_lib_dir(), "*/HandlerManifest.json")): + match = re.search("(?P[\\w+\\.-]+).HandlerManifest\\.json", manifest_path) + if match is not None: + extensions_name = match.group('extname') + if not extensions_name.startswith('WALinuxAgent'): + try: + data = json.loads(fileutil.read_file(manifest_path)) + resource_limits = data[0].get('resourceLimits', None) + services = resource_limits.get('services') if resource_limits else None + extensions_services[extensions_name] = services + except (IOError, OSError) as e: + _log_cgroup_warning( + 'Failed to load manifest file ({0}): {1}'.format(manifest_path, e.strerror)) + except ValueError: + _log_cgroup_warning('Malformed manifest file ({0}).'.format(manifest_path)) + return extensions_services + # unique instance for the singleton _instance = None diff --git a/azurelinuxagent/common/utils/extensionprocessutil.py b/azurelinuxagent/common/utils/extensionprocessutil.py index dda83fda2..33f759da7 100644 --- a/azurelinuxagent/common/utils/extensionprocessutil.py +++ b/azurelinuxagent/common/utils/extensionprocessutil.py @@ -21,13 +21,14 @@ import signal import time +from azurelinuxagent.common import logger from azurelinuxagent.common.exception import ExtensionErrorCodes, ExtensionOperationError, ExtensionError from azurelinuxagent.common.future import ustr TELEMETRY_MESSAGE_MAX_LEN = 3200 -def wait_for_process_completion_or_timeout(process, timeout): +def wait_for_process_completion_or_timeout(process, timeout, cpu_cgroup): """ Utility function that waits for the process to complete within the given time frame. This function will terminate the process if when the given time frame elapses. @@ -40,18 +41,20 @@ def wait_for_process_completion_or_timeout(process, timeout): timeout -= 1 return_code = None + throttled_time = 0 if timeout == 0: + throttled_time = get_cpu_throttled_time(cpu_cgroup) os.killpg(os.getpgid(process.pid), signal.SIGKILL) else: # process completed or forked; sleep 1 sec to give the child process (if any) a chance to start time.sleep(1) return_code = process.wait() - return timeout == 0, return_code + return timeout == 0, return_code, throttled_time -def handle_process_completion(process, command, timeout, stdout, stderr, error_code): +def handle_process_completion(process, command, timeout, stdout, stderr, error_code, cpu_cgroup=None): """ Utility function that waits for process completion and retrieves its output (stdout and stderr) if it completed before the timeout period. Otherwise, the process will get killed and an ExtensionError will be raised. @@ -62,13 +65,18 @@ def handle_process_completion(process, command, timeout, stdout, stderr, error_c :param stdout: Must be a file since we seek on it when parsing the subprocess output :param stderr: Must be a file since we seek on it when parsing the subprocess outputs :param error_code: The error code to set if we raise an ExtensionError + :param cpu_cgroup: Reference the cpu cgroup name and path :return: """ # Wait for process completion or timeout - timed_out, return_code = wait_for_process_completion_or_timeout(process, timeout) + timed_out, return_code, throttled_time = wait_for_process_completion_or_timeout(process, timeout, cpu_cgroup) process_output = read_output(stdout, stderr) if timed_out: + if cpu_cgroup is not None:# Report CPUThrottledTime when timeout happens + raise ExtensionError("Timeout({0});CPUThrottledTime({1}secs): {2}\n{3}".format(timeout, throttled_time, command, process_output), + code=ExtensionErrorCodes.PluginHandlerScriptTimedout) + raise ExtensionError("Timeout({0}): {1}\n{2}".format(timeout, command, process_output), code=ExtensionErrorCodes.PluginHandlerScriptTimedout) @@ -141,3 +149,17 @@ def to_s(captured_stdout, stdout_offset, captured_stderr, stderr_offset): return to_s(stdout, -1*stdout_len, stderr, 0) else: return to_s(stdout, -1*max_len_each, stderr, -1*max_len_each) + + +def get_cpu_throttled_time(cpu_cgroup): + """ + return the throttled time for the given cgroup. + """ + throttled_time = 0 + if cpu_cgroup is not None: + try: + throttled_time = float(cpu_cgroup.get_throttled_time() / 1E9) + except Exception as e: + logger.warn("Failed to get cpu throttled time for the extension: {0}", ustr(e)) + + return throttled_time diff --git a/tests/common/test_cgroupconfigurator.py b/tests/common/test_cgroupconfigurator.py index 6a945c5e1..e2f06a67a 100644 --- a/tests/common/test_cgroupconfigurator.py +++ b/tests/common/test_cgroupconfigurator.py @@ -254,7 +254,7 @@ def test_enable_should_not_track_throttled_time_when_setting_the_cpu_quota_fails self.assertFalse(CGroupsTelemetry.get_track_throttled_time(), "Throttle time should not be tracked") - def test_disable_should_reset_cpu_quota_and_tracked_cgroups(self): + def test_disable_should_reset_cpu_quota(self): with self._get_cgroup_configurator() as configurator: if len(CGroupsTelemetry._tracked) == 0: raise Exception("Test setup should have started tracking at least 1 cgroup (the agent's)") @@ -269,7 +269,55 @@ def test_disable_should_reset_cpu_quota_and_tracked_cgroups(self): fileutil.findre_in_file(agent_drop_in_file_cpu_quota, "^CPUQuota=$"), "CPUQuota was not set correctly. Expected an empty value. Got:\n{0}".format(fileutil.read_file(agent_drop_in_file_cpu_quota))) self.assertEqual(len(CGroupsTelemetry._tracked), 0, "No cgroups should be tracked after disable. Tracking: {0}".format(CGroupsTelemetry._tracked)) - self.assertFalse(CGroupsTelemetry._track_throttled_time, "Throttle Time should not be tracked after disable") + + def test_disable_should_reset_cpu_quota_for_all_cgroups(self): + service_list = [ + { + "name": "extension.service", + "cpuQuotaPercentage": 5 + } + ] + extension_name = "Microsoft.CPlat.Extension" + extension_services = {extension_name: service_list} + with self._get_cgroup_configurator() as configurator: + with patch.object(configurator, "get_extension_services_list", return_value=extension_services): + # get the paths to the mocked files + agent_drop_in_file_cpu_quota = configurator.mocks.get_mapped_path(UnitFilePaths.cpu_quota) + extension_slice_unit_file = configurator.mocks.get_mapped_path(UnitFilePaths.extensionslice) + extension_service_cpu_quota = configurator.mocks.get_mapped_path(UnitFilePaths.extension_service_cpu_quota) + + configurator.setup_extension_slice(extension_name=extension_name, cpu_quota=5) + configurator.set_extension_services_cpu_memory_quota(service_list) + CGroupsTelemetry._tracked['/sys/fs/cgroup/cpu,cpuacct/system.slice/extension.service'] = \ + CpuCgroup('extension.service', '/sys/fs/cgroup/cpu,cpuacct/system.slice/extension.service') + CGroupsTelemetry._tracked['/sys/fs/cgroup/cpu,cpuacct/azure.slice/azure-vmextensions.slice/' \ + 'azure-vmextensions-Microsoft.CPlat.Extension.slice'] = \ + CpuCgroup('Microsoft.CPlat.Extension', + '/sys/fs/cgroup/cpu,cpuacct/azure.slice/azure-vmextensions.slice/azure-vmextensions-Microsoft.CPlat.Extension.slice') + + configurator.disable("UNIT TEST", DisableCgroups.ALL) + + self.assertTrue(os.path.exists(agent_drop_in_file_cpu_quota), + "{0} was not created".format(agent_drop_in_file_cpu_quota)) + self.assertTrue( + fileutil.findre_in_file(agent_drop_in_file_cpu_quota, "^CPUQuota=$"), + "CPUQuota was not set correctly. Expected an empty value. Got:\n{0}".format( + fileutil.read_file(agent_drop_in_file_cpu_quota))) + self.assertTrue(os.path.exists(extension_slice_unit_file), + "{0} was not created".format(extension_slice_unit_file)) + self.assertTrue( + fileutil.findre_in_file(extension_slice_unit_file, "^CPUQuota=$"), + "CPUQuota was not set correctly. Expected an empty value. Got:\n{0}".format( + fileutil.read_file(extension_slice_unit_file))) + self.assertTrue(os.path.exists(extension_service_cpu_quota), + "{0} was not created".format(extension_service_cpu_quota)) + self.assertTrue( + fileutil.findre_in_file(extension_service_cpu_quota, "^CPUQuota=$"), + "CPUQuota was not set correctly. Expected an empty value. Got:\n{0}".format( + fileutil.read_file(extension_service_cpu_quota))) + self.assertEqual(len(CGroupsTelemetry._tracked), 0, + "No cgroups should be tracked after disable. Tracking: {0}".format( + CGroupsTelemetry._tracked)) @patch('time.sleep', side_effect=lambda _: mock_sleep()) def test_start_extension_command_should_not_use_systemd_when_cgroups_are_not_enabled(self, _): @@ -529,7 +577,7 @@ def test_start_extension_command_should_not_use_fallback_option_if_extension_tim with tempfile.TemporaryFile(dir=self.tmp_dir, mode="w+b") as stdout: with tempfile.TemporaryFile(dir=self.tmp_dir, mode="w+b") as stderr: with patch("azurelinuxagent.common.utils.extensionprocessutil.wait_for_process_completion_or_timeout", - return_value=[True, None]): + return_value=[True, None, 0]): with patch("azurelinuxagent.common.cgroupapi.SystemdCgroupsApi._is_systemd_failure", return_value=False): with self.assertRaises(ExtensionError) as context_manager: diff --git a/tests/common/test_cgroups.py b/tests/common/test_cgroups.py index 1f994b859..61c317854 100644 --- a/tests/common/test_cgroups.py +++ b/tests/common/test_cgroups.py @@ -183,7 +183,7 @@ def test_get_throttled_time_should_return_the_value_since_its_last_invocation(se cgroup.initialize_cpu_usage() shutil.copyfile(os.path.join(data_dir, "cgroups", "cpu.stat_t1"), test_file) # throttled_time = 2075541442327 - throttled_time = cgroup.get_throttled_time() + throttled_time = cgroup.get_cpu_throttled_time() self.assertEqual(throttled_time, float(2075541442327 - 50) / 1E9, "The value of throttled_time is incorrect") diff --git a/tests/common/test_cgroupstelemetry.py b/tests/common/test_cgroupstelemetry.py index 9d96a1d82..dc612244d 100644 --- a/tests/common/test_cgroupstelemetry.py +++ b/tests/common/test_cgroupstelemetry.py @@ -375,7 +375,7 @@ def test_extension_telemetry_not_sent_for_empty_perf_metrics(self, *args): # py self.assertEqual(0, len(metrics)) @patch("azurelinuxagent.common.cgroup.CpuCgroup.get_cpu_usage") - @patch("azurelinuxagent.common.cgroup.CpuCgroup.get_throttled_time") + @patch("azurelinuxagent.common.cgroup.CpuCgroup.get_cpu_throttled_time") @patch("azurelinuxagent.common.cgroup.CGroup.is_active") def test_cgroup_telemetry_should_not_report_cpu_negative_value(self, patch_is_active, path_get_throttled_time, patch_get_cpu_usage): diff --git a/tests/utils/test_extension_process_util.py b/tests/utils/test_extension_process_util.py index e950338fa..a74c4ff73 100644 --- a/tests/utils/test_extension_process_util.py +++ b/tests/utils/test_extension_process_util.py @@ -15,14 +15,16 @@ # Requires Python 2.6+ and Openssl 1.0+ # import os +import shutil import subprocess import tempfile +from azurelinuxagent.common.cgroup import CpuCgroup from azurelinuxagent.common.exception import ExtensionError, ExtensionErrorCodes from azurelinuxagent.common.future import ustr from azurelinuxagent.common.utils.extensionprocessutil import format_stdout_stderr, read_output, \ wait_for_process_completion_or_timeout, handle_process_completion -from tests.tools import AgentTestCase, patch +from tests.tools import AgentTestCase, patch, data_dir class TestProcessUtils(AgentTestCase): @@ -50,7 +52,7 @@ def test_wait_for_process_completion_or_timeout_should_terminate_cleanly(self): stdout=subprocess.PIPE, stderr=subprocess.PIPE) - timed_out, ret = wait_for_process_completion_or_timeout(process=process, timeout=5) + timed_out, ret, _ = wait_for_process_completion_or_timeout(process=process, timeout=5, cpu_cgroup=None) self.assertEqual(timed_out, False) self.assertEqual(ret, 0) @@ -68,7 +70,7 @@ def test_wait_for_process_completion_or_timeout_should_kill_process_on_timeout(s # We don't actually mock the kill, just wrap it so we can assert its call count with patch('azurelinuxagent.common.utils.extensionprocessutil.os.killpg', wraps=os.killpg) as patch_kill: with patch('time.sleep') as mock_sleep: - timed_out, ret = wait_for_process_completion_or_timeout(process=process, timeout=timeout) + timed_out, ret, _ = wait_for_process_completion_or_timeout(process=process, timeout=timeout, cpu_cgroup=None) # We're mocking sleep to avoid prolonging the test execution time, but we still want to make sure # we're "waiting" the correct amount of time before killing the process @@ -87,7 +89,7 @@ def test_handle_process_completion_should_return_nonzero_when_process_fails(self stdout=subprocess.PIPE, stderr=subprocess.PIPE) - timed_out, ret = wait_for_process_completion_or_timeout(process=process, timeout=5) + timed_out, ret, _ = wait_for_process_completion_or_timeout(process=process, timeout=5, cpu_cgroup=None) self.assertEqual(timed_out, False) self.assertEqual(ret, 2) @@ -143,6 +145,46 @@ def test_handle_process_completion_should_raise_on_timeout(self): self.assertEqual(context_manager.exception.code, ExtensionErrorCodes.PluginHandlerScriptTimedout) self.assertIn("Timeout({0})".format(timeout), ustr(context_manager.exception)) + self.assertNotIn("CPUThrottledTime({0}secs)".format(timeout), ustr(context_manager.exception)) #Extension not started in cpuCgroup + + + def test_handle_process_completion_should_log_throttled_time_on_timeout(self): + command = "sleep 1m" + timeout = 20 + with tempfile.TemporaryFile(dir=self.tmp_dir, mode="w+b") as stdout: + with tempfile.TemporaryFile(dir=self.tmp_dir, mode="w+b") as stderr: + with patch('time.sleep') as mock_sleep: + with self.assertRaises(ExtensionError) as context_manager: + test_file = os.path.join(self.tmp_dir, "cpu.stat") + shutil.copyfile(os.path.join(data_dir, "cgroups", "cpu.stat_t0"), + test_file) # throttled_time = 50 + cgroup = CpuCgroup("test", self.tmp_dir) + process = subprocess.Popen(command, # pylint: disable=subprocess-popen-preexec-fn + shell=True, + cwd=self.tmp_dir, + env={}, + stdout=stdout, + stderr=stderr, + preexec_fn=os.setsid) + + handle_process_completion(process=process, + command=command, + timeout=timeout, + stdout=stdout, + stderr=stderr, + error_code=42, + cpu_cgroup=cgroup) + + # We're mocking sleep to avoid prolonging the test execution time, but we still want to make sure + # we're "waiting" the correct amount of time before killing the process and raising an exception + # Due to an extra call to sleep at some point in the call stack which only happens sometimes, + # we are relaxing this assertion to allow +/- 2 sleep calls. + self.assertTrue(abs(mock_sleep.call_count - timeout) <= 2) + + self.assertEqual(context_manager.exception.code, ExtensionErrorCodes.PluginHandlerScriptTimedout) + self.assertIn("Timeout({0})".format(timeout), ustr(context_manager.exception)) + throttled_time = float(50 / 1E9) + self.assertIn("CPUThrottledTime({0}secs)".format(throttled_time), ustr(context_manager.exception)) def test_handle_process_completion_should_raise_on_nonzero_exit_code(self): command = "ls folder_does_not_exist" From 8e201357b4b535e3729596b3dd4e5a15f47e513e Mon Sep 17 00:00:00 2001 From: Nageswara Nandigam Date: Thu, 12 May 2022 18:13:51 -0700 Subject: [PATCH 2/3] address comments --- azurelinuxagent/common/cgroup.py | 10 +++++----- azurelinuxagent/common/cgroupconfigurator.py | 12 +++++++----- azurelinuxagent/common/utils/extensionprocessutil.py | 2 +- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/azurelinuxagent/common/cgroup.py b/azurelinuxagent/common/cgroup.py index bc9c8ea80..ff2957a35 100644 --- a/azurelinuxagent/common/cgroup.py +++ b/azurelinuxagent/common/cgroup.py @@ -139,7 +139,6 @@ def __init__(self, name, cgroup_path): self._current_system_cpu = None self._previous_throttled_time = None self._current_throttled_time = None - self._throttled_time_lock = threading.RLock() # Protect the get_throttled_time which is called from Monitor thread and cgroupconfigurator. def _get_cpu_ticks(self, allow_no_such_file_or_directory_error=False): """ @@ -174,7 +173,6 @@ def _get_cpu_ticks(self, allow_no_such_file_or_directory_error=False): return cpu_ticks def get_throttled_time(self): - self._throttled_time_lock.acquire() try: with open(os.path.join(self.path, 'cpu.stat')) as cpu_stat: # @@ -196,8 +194,6 @@ def get_throttled_time(self): raise CGroupsException("Failed to read cpu.stat: {0}".format(ustr(e))) except Exception as e: raise CGroupsException("Failed to read cpu.stat: {0}".format(ustr(e))) - finally: - self._throttled_time_lock.release() def _cpu_usage_initialized(self): return self._current_cgroup_cpu is not None and self._current_system_cpu is not None @@ -234,11 +230,15 @@ def get_cpu_usage(self): return round(100.0 * self._osutil.get_processor_cores() * float(cgroup_delta) / float(system_delta), 3) - def get_cpu_throttled_time(self): + def get_cpu_throttled_time(self, read_previous_throttled_time=True): """ Computes the throttled time (in seconds) since the last call to this function. NOTE: initialize_cpu_usage() must be invoked before calling this function + Compute only current throttled time if read_previous_throttled_time set to False """ + if not read_previous_throttled_time: + return float(self.get_throttled_time() / 1E9) + if not self._cpu_usage_initialized(): raise CGroupsException("initialize_cpu_usage() must be invoked before the first call to get_throttled_time()") diff --git a/azurelinuxagent/common/cgroupconfigurator.py b/azurelinuxagent/common/cgroupconfigurator.py index 7bfd4c62c..308838a73 100644 --- a/azurelinuxagent/common/cgroupconfigurator.py +++ b/azurelinuxagent/common/cgroupconfigurator.py @@ -490,6 +490,7 @@ def disable(self, reason, disable_cgroups): self.__reset_agent_cpu_quota() extension_services = self.get_extension_services_list() for extension in extension_services: + logger.info("Resetting extension : {0} and it's services: {1} CPUQuota".format(extension, extension_services[extension])) self.__reset_extension_cpu_quota(extension_name=extension) self.__reset_extension_services_cpu_quota(extension_services[extension]) self.__reload_systemd_config() @@ -800,7 +801,7 @@ def setup_extension_slice(self, extension_name, cpu_quota): slice_contents = _EXTENSION_SLICE_CONTENTS.format(extension_name=extension_name, cpu_quota=cpu_quota) CGroupConfigurator._Impl.__create_unit_file(extension_slice_path, slice_contents) except Exception as exception: - _log_cgroup_warning("Failed to create unit files for the extension slice: {0}", ustr(exception)) + _log_cgroup_warning("Failed to set the extension {0} slice and quotas: {1}", extension_name, ustr(exception)) CGroupConfigurator._Impl.__cleanup_unit_file(extension_slice_path) def remove_extension_slice(self, extension_name): @@ -851,8 +852,9 @@ def __reset_extension_services_cpu_quota(self, services_list): NOTE: This resets the quota on the extension service's default dropin file; any local overrides on the VM will take precedence over this setting. """ - try: - if self.enabled() and services_list is not None: + if self.enabled() and services_list is not None: + try: + service_name = None for service in services_list: service_name = service.get('name', None) unit_file_path = systemd.get_unit_file_install_path() @@ -868,8 +870,8 @@ def __reset_extension_services_cpu_quota(self, services_list): return files_to_create.append((drop_in_file_cpu_quota, cpu_quota_contents)) self.__create_all_files(files_to_create) - except Exception as exception: - _log_cgroup_warning('Failed to set CPUQuota: {0}', ustr(exception)) + except Exception as exception: + _log_cgroup_warning('Failed to reset CPUQuota for {0} : {1}', service_name, ustr(exception)) def remove_extension_services_drop_in_files(self, services_list): """ diff --git a/azurelinuxagent/common/utils/extensionprocessutil.py b/azurelinuxagent/common/utils/extensionprocessutil.py index 33f759da7..9038f6145 100644 --- a/azurelinuxagent/common/utils/extensionprocessutil.py +++ b/azurelinuxagent/common/utils/extensionprocessutil.py @@ -158,7 +158,7 @@ def get_cpu_throttled_time(cpu_cgroup): throttled_time = 0 if cpu_cgroup is not None: try: - throttled_time = float(cpu_cgroup.get_throttled_time() / 1E9) + throttled_time = cpu_cgroup.get_cpu_throttled_time(read_previous_throttled_time=False) except Exception as e: logger.warn("Failed to get cpu throttled time for the extension: {0}", ustr(e)) From ee6949985b66583ce228dd68dab6ed1d754481f6 Mon Sep 17 00:00:00 2001 From: Nageswara Nandigam Date: Thu, 12 May 2022 18:37:50 -0700 Subject: [PATCH 3/3] remove unused import --- azurelinuxagent/common/cgroup.py | 1 - 1 file changed, 1 deletion(-) diff --git a/azurelinuxagent/common/cgroup.py b/azurelinuxagent/common/cgroup.py index ff2957a35..5fadf6bfb 100644 --- a/azurelinuxagent/common/cgroup.py +++ b/azurelinuxagent/common/cgroup.py @@ -13,7 +13,6 @@ # limitations under the License. # # Requires Python 2.6+ and Openssl 1.0+ -import threading from collections import namedtuple import errno