From da25a21a43357193112b71c85a688233ebfe5580 Mon Sep 17 00:00:00 2001 From: Adam Faris Date: Thu, 6 Jun 2024 14:09:52 -0700 Subject: [PATCH] Updated exception handling --- .../host/LinuxCgroupStatisticsGetter.java | 23 ++++++++++++++----- .../host/TestLinuxCgroupStatistics.java | 23 +++++++++++++++++-- .../host/TestLinuxCgroupStatisticsGetter.java | 21 +++++++++++++++++ 3 files changed, 59 insertions(+), 8 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/container/host/LinuxCgroupStatisticsGetter.java b/samza-core/src/main/java/org/apache/samza/container/host/LinuxCgroupStatisticsGetter.java index d5f3ac589f..f38d356bb9 100644 --- a/samza-core/src/main/java/org/apache/samza/container/host/LinuxCgroupStatisticsGetter.java +++ b/samza-core/src/main/java/org/apache/samza/container/host/LinuxCgroupStatisticsGetter.java @@ -19,6 +19,7 @@ package org.apache.samza.container.host; import java.io.IOException; +import java.net.URISyntaxException; import java.util.Properties; import java.io.FileReader; import java.nio.file.Files; @@ -31,7 +32,7 @@ import java.util.Optional; import java.nio.file.Path; import java.nio.file.Paths; - +import java.io.File; public class LinuxCgroupStatisticsGetter implements SystemStatisticsGetter { private static final Logger LOG = LoggerFactory.getLogger(LinuxCgroupStatisticsGetter.class.getName()); @@ -76,6 +77,10 @@ public LinuxCgroupStatistics getProcessCgroupStatistics() { } private double getCPUStat() { + if (this.containerID.equals("NOT_DETECTED")) { + // return a sentinel value to signal this is not running on Hadoop + return -2.0; + } String[] controllers = {"cpu", "cpuacct", "cpu,cpuacct" }; double cpuThrottledRatio = -1.0; String cpuStatPath; @@ -93,7 +98,9 @@ private double getCPUStat() { cpuThrottledRatio = (double) nrThrottled / nrPeriod; break; } catch (IOException | RuntimeException e) { - throw new RuntimeException("Caught exception reading cpu.stat file: ", e); + LOG.debug("Caught exception reading cpu.stat file: ", e.getMessage()); + // return a sentinel value to signal an exception occurred. + return -1.0; } } } @@ -108,11 +115,15 @@ private boolean cpuStatExists(String cpuStatPath) { private Configuration getHadoopConf(String hConfDir) { Configuration hConf = new Configuration(); try { - String yarnSiteURI = "file://" + hConfDir + "/yarn-site.xml"; - LOG.debug("yarn-site.xml URI: " + yarnSiteURI); - URL yarnSiteUrl = URI.create(yarnSiteURI).toURL(); + URI yarnSiteURI = new URI("file://" + hConfDir + "/yarn-site.xml"); + LOG.debug("yarn-site.xml URI: " + yarnSiteURI.toString()); + File yarnSiteXml = new File(yarnSiteURI); + if (!yarnSiteXml.isFile() || !yarnSiteXml.canRead()) { + throw new RuntimeException("Unable to access yarn-site.xml: " + yarnSiteXml.toString()); + } + URL yarnSiteUrl = yarnSiteURI.toURL(); hConf.addResource(yarnSiteUrl); - } catch (MalformedURLException | IllegalArgumentException e) { + } catch (MalformedURLException | URISyntaxException | RuntimeException e) { LOG.error("Unable to construct URL to yarn-site.xml: " + e.getMessage()); } return hConf; diff --git a/samza-core/src/test/java/org/apache/samza/container/host/TestLinuxCgroupStatistics.java b/samza-core/src/test/java/org/apache/samza/container/host/TestLinuxCgroupStatistics.java index 58aea9ab79..93f819b7ef 100644 --- a/samza-core/src/test/java/org/apache/samza/container/host/TestLinuxCgroupStatistics.java +++ b/samza-core/src/test/java/org/apache/samza/container/host/TestLinuxCgroupStatistics.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.samza.container.host; import org.junit.Test; @@ -7,7 +26,7 @@ public class TestLinuxCgroupStatistics { @Test public void testGetCgroupCPUThrottleRatio() { - LinuxCgroupStatistics linuxCgroupStatistics = new LinuxCgroupStatistics(10.0); - assertEquals(linuxCgroupStatistics.getCgroupCpuThrottleRatio(), 10.0, 0.05); + LinuxCgroupStatistics linuxCgroupStatistics = new LinuxCgroupStatistics(-1.0); + assertEquals(linuxCgroupStatistics.getCgroupCpuThrottleRatio(), -1.0, 0.05); } } diff --git a/samza-core/src/test/java/org/apache/samza/container/host/TestLinuxCgroupStatisticsGetter.java b/samza-core/src/test/java/org/apache/samza/container/host/TestLinuxCgroupStatisticsGetter.java index 7cfbbbeac0..c9dd2a6193 100644 --- a/samza-core/src/test/java/org/apache/samza/container/host/TestLinuxCgroupStatisticsGetter.java +++ b/samza-core/src/test/java/org/apache/samza/container/host/TestLinuxCgroupStatisticsGetter.java @@ -93,6 +93,27 @@ public void testGetThrottleValue() { } + @Test + public void testRunTimeIsNotHadoop() { + // Validate that standalone applications return the expected sentinel of -2.0. + environmentVariables.clear("CONTAINER_ID"); + LinuxCgroupStatisticsGetter linuxCgroupStatisticsGetter = new LinuxCgroupStatisticsGetter(); + LinuxCgroupStatistics cpuStat = linuxCgroupStatisticsGetter.getProcessCgroupStatistics(); + double throttleRatio = cpuStat.getCgroupCpuThrottleRatio(); + assertEquals(throttleRatio, -2.0, 0.05); + } + + @Test + public void testExceptionReturnsNegativeOne() { + // Validate that exceptions return a sentinel of -1.0. + environmentVariables.set("CONTAINER_ID", "container_abc_123"); + environmentVariables.set("HADOOP_CONF_DIR", "/fake/path/to/non_existent/cgroup/directory"); + LinuxCgroupStatisticsGetter linuxCgroupStatisticsGetter = new LinuxCgroupStatisticsGetter(); + LinuxCgroupStatistics cpuStat = linuxCgroupStatisticsGetter.getProcessCgroupStatistics(); + double throttleRatio = cpuStat.getCgroupCpuThrottleRatio(); + assertEquals(throttleRatio, -1.0, 0.05); + } + @Test(expected = UnsupportedOperationException.class) public void testGetSystemMemoryStatistics() { LinuxCgroupStatisticsGetter linuxCgroupStatisticsGetter = new LinuxCgroupStatisticsGetter();