diff --git a/build.gradle b/build.gradle
index d40deeed55..e4c8828465 100644
--- a/build.gradle
+++ b/build.gradle
@@ -240,6 +240,12 @@ project(":samza-core_$scalaSuffix") {
compile "com.github.oshi:oshi-core:$oshiVersion"
compile "net.java.dev.jna:jna:$jnaVersion"
compile "net.java.dev.jna:jna-platform:$jnaVersion"
+ compile("org.apache.hadoop:hadoop-common:$yarnVersion") {
+ exclude module: 'servlet-api'
+ // Exclude because YARN's 3.4.5 ZK version is incompatbile with Kafka's 3.3.4.
+ exclude module: 'zookeeper'
+ }
+ testCompile "com.github.stefanbirkner:system-rules:$systemRulesVersion"
testCompile project(":samza-api").sourceSets.test.output
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
diff --git a/docs/learn/documentation/versioned/container/metrics-table.html b/docs/learn/documentation/versioned/container/metrics-table.html
index 682947392a..058f891ff1 100644
--- a/docs/learn/documentation/versioned/container/metrics-table.html
+++ b/docs/learn/documentation/versioned/container/metrics-table.html
@@ -367,6 +367,11 @@
Samza Metrics Reference
system-cpu-usage |
Current CPU usage of the all processes in the whole system as a percentage from 0 to 100. The percentage represents the proportion of executed ticks by all processes to the total ticks across all CPUs. A negative number indicates the value was not available from the operating system. For more detail, see the JavaDoc for com.sun.management.OperatingSystemMXBean. |
+
+ cpu-throttle-ratio |
+ This reports if Linux Control Groups are throttling the Samza container. Returned values are a ratio that range from 0 to 1, where 0 represents not being throttled. Negative results indicate the control group values were not available from the operating system. |
+
+
open-file-descriptor-count |
Current number of open file descriptors |
diff --git a/docs/learn/documentation/versioned/operations/monitoring.md b/docs/learn/documentation/versioned/operations/monitoring.md
index c7ffaf6009..999bbc344d 100644
--- a/docs/learn/documentation/versioned/operations/monitoring.md
+++ b/docs/learn/documentation/versioned/operations/monitoring.md
@@ -380,24 +380,25 @@ All \, \, \, \, \, are popula
| | block-ns | Average time the run loop is blocked because all task instances are busy processing input; could indicate lag accumulating. |
| | container-startup-time | Time spent in starting the container. This includes time to start the JMX server, starting metrics reporters, starting system producers, consumers, system admins, offset manager, locality manager, disk space manager, security manager, statistics manager, and initializing all task instances. |
-| **Group** | **Metric name** | **Meaning** |
-| --- | --- | --- |
-| **SamzaContainerMetrics (Counters and Gauges)** | commit-calls | Number of commits. Each commit includes input checkpointing, flushing producers, checkpointing KV stores, flushing side input stores, etc. |
-| | window-calls | In case of WindowableTask, this measures the number of window invocations. |
-| | timer-calls | Number of timer callbacks. |
-| | process-calls | Number of process method invocations. |
-| | process-envelopers | Number of input message envelopes processed. |
-| | process-null-envelopes | Number of times no input message envelopes was available for the run loop to process. |
-| | event-loop-utilization | The duty-cycle of the event loop. That is, the fraction of time of each event loop iteration that is spent in process(), window(), and commit. |
-| | disk-usage-bytes | Total disk space size used by key-value stores (in bytes). |
-| | disk-quota-bytes | Disk memory usage quota for key-value stores (in bytes). |
-| | executor-work-factor | The work factor of the run loop. A work factor of 1 indicates full throughput, while a work factor of less than 1 will introduce delays into the execution to approximate the requested work factor. The work factor is set by the disk space monitor in accordance with the disk quota policy. Given the latest percentage of available disk quota, this policy returns the work factor that should be applied. |
-| | total-process-cpu-usage | The process cpu usage percentage (in the [0, 100] interval) used by the Samza container process and all its child processes. |
-| | physical-memory-mb | The physical memory used by the Samza container process (native + on heap) (in MBs). |
-| | physical-memory-utilization | The ratio between the physical memory used by the Samza container process (native + on heap) and the total physical memory of the Samza container. |
-| | container-thread-pool-size | The current size of a Samza container's thread pool. It may or may not be the same as job.container.thread.pool.size, depending on the implementation. |
-| | container-active-threads | The approximate actively used threads in a Samza container's thread pool. |
-| | --restore-time | Time taken to restore task stores (per task store). |
+| **Group** | **Metric name** | **Meaning** |
+| --- |---------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **SamzaContainerMetrics (Counters and Gauges)** | commit-calls | Number of commits. Each commit includes input checkpointing, flushing producers, checkpointing KV stores, flushing side input stores, etc. |
+| | window-calls | In case of WindowableTask, this measures the number of window invocations. |
+| | timer-calls | Number of timer callbacks. |
+| | process-calls | Number of process method invocations. |
+| | process-envelopers | Number of input message envelopes processed. |
+| | process-null-envelopes | Number of times no input message envelopes was available for the run loop to process. |
+| | event-loop-utilization | The duty-cycle of the event loop. That is, the fraction of time of each event loop iteration that is spent in process(), window(), and commit. |
+| | disk-usage-bytes | Total disk space size used by key-value stores (in bytes). |
+| | disk-quota-bytes | Disk memory usage quota for key-value stores (in bytes). |
+| | executor-work-factor | The work factor of the run loop. A work factor of 1 indicates full throughput, while a work factor of less than 1 will introduce delays into the execution to approximate the requested work factor. The work factor is set by the disk space monitor in accordance with the disk quota policy. Given the latest percentage of available disk quota, this policy returns the work factor that should be applied. |
+| | total-process-cpu-usage | The process cpu usage percentage (in the [0, 100] interval) used by the Samza container process and all its child processes. |
+| | cpu-throttle-ratio | This reports if Linux Control Groups are throttling the Samza container. Returned values are a ratio that range from 0 to 1, where 0 represents not being throttled. |
+| | physical-memory-mb | The physical memory used by the Samza container process (native + on heap) (in MBs). |
+| | physical-memory-utilization | The ratio between the physical memory used by the Samza container process (native + on heap) and the total physical memory of the Samza container. |
+| | container-thread-pool-size | The current size of a Samza container's thread pool. It may or may not be the same as job.container.thread.pool.size, depending on the implementation. |
+| | container-active-threads | The approximate actively used threads in a Samza container's thread pool. |
+| | --restore-time | Time taken to restore task stores (per task store). |
| **Group** | **Metric name** | **Meaning** |
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 2a1bfbd711..adab575462 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -47,7 +47,8 @@
scalaTestVersion = "3.0.1"
snappyVersion = "1.1.8.4"
slf4jVersion = "1.7.7"
- yarnVersion = "2.10.1"
+ yarnVersion = "2.10.2"
+ systemRulesVersion = "1.19.0"
zkClientVersion = "0.11"
zookeeperVersion = "3.6.3"
failsafeVersion = "2.4.0"
diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerMonitorListener.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerMonitorListener.java
index cfc4aea995..abc11b3c94 100644
--- a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerMonitorListener.java
+++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerMonitorListener.java
@@ -23,6 +23,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
+import org.apache.samza.container.host.LinuxCgroupStatistics;
import org.apache.samza.container.host.ProcessCPUStatistics;
import org.apache.samza.container.host.SystemMemoryStatistics;
import org.apache.samza.container.host.SystemStatistics;
@@ -72,5 +73,14 @@ public void onUpdate(SystemStatistics sample) {
LOGGER.debug("Container active threads count: " + containerActiveThreads);
containerMetrics.containerActiveThreads().set(containerActiveThreads);
}
+
+ // Update CGroup related metrics
+ LinuxCgroupStatistics cpuThrottle = sample.getCgroupStatistics();
+ if (Objects.nonNull(cpuThrottle)) {
+ double cpuThrottleRatio = cpuThrottle.getCgroupCpuThrottleRatio();
+ LOGGER.debug("Container CGROUP Throttle Ratio: " + cpuThrottleRatio);
+ containerMetrics.cpuThrottleRatio().set(cpuThrottleRatio);
+ }
+
}
}
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/DefaultSystemStatisticsGetter.java b/samza-core/src/main/java/org/apache/samza/container/host/DefaultSystemStatisticsGetter.java
index ccce5830f3..e3dafa4d42 100644
--- a/samza-core/src/main/java/org/apache/samza/container/host/DefaultSystemStatisticsGetter.java
+++ b/samza-core/src/main/java/org/apache/samza/container/host/DefaultSystemStatisticsGetter.java
@@ -22,22 +22,28 @@
/**
- * An default implementation of {@link SystemStatisticsGetter} that relies on {@link PosixCommandBasedStatisticsGetter}
- * and {@link OshiBasedStatisticsGetter} implementations
+ * An default implementation of {@link SystemStatisticsGetter} that relies on {@link PosixCommandBasedStatisticsGetter},
+ * {@link OshiBasedStatisticsGetter}, and {@link LinuxCgroupStatisticsGetter}, implementations.
*/
public class DefaultSystemStatisticsGetter implements SystemStatisticsGetter {
private final OshiBasedStatisticsGetter oshiBasedStatisticsGetter;
private final PosixCommandBasedStatisticsGetter posixCommandBasedStatisticsGetter;
+ private final LinuxCgroupStatisticsGetter linuxCgroupStatisticsGetter;
+
public DefaultSystemStatisticsGetter() {
- this(new OshiBasedStatisticsGetter(), new PosixCommandBasedStatisticsGetter());
+ this(new OshiBasedStatisticsGetter(), new PosixCommandBasedStatisticsGetter(), new LinuxCgroupStatisticsGetter());
}
@VisibleForTesting
- DefaultSystemStatisticsGetter(OshiBasedStatisticsGetter oshiBasedStatisticsGetter,
- PosixCommandBasedStatisticsGetter posixCommandBasedStatisticsGetter) {
+ DefaultSystemStatisticsGetter(
+ OshiBasedStatisticsGetter oshiBasedStatisticsGetter,
+ PosixCommandBasedStatisticsGetter posixCommandBasedStatisticsGetter,
+ LinuxCgroupStatisticsGetter linuxCgroupStatisticsGetter
+ ) {
this.oshiBasedStatisticsGetter = oshiBasedStatisticsGetter;
this.posixCommandBasedStatisticsGetter = posixCommandBasedStatisticsGetter;
+ this.linuxCgroupStatisticsGetter = linuxCgroupStatisticsGetter;
}
@Override
@@ -49,4 +55,9 @@ public SystemMemoryStatistics getSystemMemoryStatistics() {
public ProcessCPUStatistics getProcessCPUStatistics() {
return oshiBasedStatisticsGetter.getProcessCPUStatistics();
}
+
+ @Override
+ public LinuxCgroupStatistics getProcessCgroupStatistics() {
+ return linuxCgroupStatisticsGetter.getProcessCgroupStatistics();
+ }
}
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/LinuxCgroupStatistics.java b/samza-core/src/main/java/org/apache/samza/container/host/LinuxCgroupStatistics.java
new file mode 100644
index 0000000000..962ee01305
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/host/LinuxCgroupStatistics.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.container.host;
+
+import java.util.Objects;
+
+/**
+ * A {@link org.apache.samza.container.host.LinuxCgroupStatistics} object represents recent Cgroup CPU values
+ */
+public class LinuxCgroupStatistics {
+
+ /**
+ * The Cgroup CPU throttle Ratio for the Yarn container's control group, defined as nr_throttled over nr_periods.
+ */
+ private final double cgroupCpuThrottleRatio;
+
+ LinuxCgroupStatistics(double cgroupCpuThrottleRatio) {
+ this.cgroupCpuThrottleRatio = cgroupCpuThrottleRatio;
+ }
+
+ public double getCgroupCpuThrottleRatio() {
+ return cgroupCpuThrottleRatio;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ org.apache.samza.container.host.LinuxCgroupStatistics that = (org.apache.samza.container.host.LinuxCgroupStatistics) o;
+ return Double.compare(that.cgroupCpuThrottleRatio, cgroupCpuThrottleRatio) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(cgroupCpuThrottleRatio);
+ }
+
+ @Override
+ public String toString() {
+ return "LinuxCgroupStatistics{" + "cpuThrottleRatio=" + cgroupCpuThrottleRatio + '}';
+ }
+}
+
+
+
+
+
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/LinuxCgroupStatisticsGetter.java b/samza-core/src/main/java/org/apache/samza/container/host/LinuxCgroupStatisticsGetter.java
new file mode 100644
index 0000000000..f38d356bb9
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/host/LinuxCgroupStatisticsGetter.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.host;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Properties;
+import java.io.FileReader;
+import java.nio.file.Files;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import java.net.URL;
+import java.net.URI;
+import java.net.MalformedURLException;
+import java.util.Optional;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.io.File;
+
+public class LinuxCgroupStatisticsGetter implements SystemStatisticsGetter {
+ private static final Logger LOG = LoggerFactory.getLogger(LinuxCgroupStatisticsGetter.class.getName());
+ private String cgroupMountPath;
+ private String cgroupHierarchy;
+ private String containerID;
+
+ @Override
+ public SystemMemoryStatistics getSystemMemoryStatistics() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public ProcessCPUStatistics getProcessCPUStatistics() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ LinuxCgroupStatisticsGetter() {
+ this.containerID = Optional.ofNullable(System.getenv("CONTAINER_ID")).orElse("NOT_DETECTED");
+ String hadoopConfDir = Optional.ofNullable(System.getenv("HADOOP_CONF_DIR")).orElse("NOT_DETECTED");
+ Configuration yarnSite;
+ if (!hadoopConfDir.equals("NOT_DETECTED")) {
+ yarnSite = getHadoopConf(hadoopConfDir);
+ this.cgroupHierarchy = yarnSite.get("yarn.nodemanager.linux-container-executor.cgroups.hierarchy", "NOT_DETECTED");
+ this.cgroupMountPath = yarnSite.get("yarn.nodemanager.linux-container-executor.cgroups.mount-path", "NOT_DETECTED");
+ }
+ LOG.debug("CONTAINER ID: " + this.containerID);
+ LOG.debug("HADOOP_CONF_DIR: " + hadoopConfDir);
+ LOG.debug("CGROUP_MOUNT_PATH: " + this.cgroupMountPath);
+ LOG.debug("CGROUP_MOUNT_HIERARCHY: " + this.cgroupHierarchy);
+ }
+
+ @Override
+ public LinuxCgroupStatistics getProcessCgroupStatistics() {
+ try {
+ double ratio = getCPUStat();
+ return new LinuxCgroupStatistics(ratio);
+ } catch (Exception e) {
+ LOG.debug("Error reading cgroups information: ", e);
+ return null;
+ }
+ }
+
+ private double getCPUStat() {
+ if (this.containerID.equals("NOT_DETECTED")) {
+ // return a sentinel value to signal this is not running on Hadoop
+ return -2.0;
+ }
+ String[] controllers = {"cpu", "cpuacct", "cpu,cpuacct" };
+ double cpuThrottledRatio = -1.0;
+ String cpuStatPath;
+ for (String controller : controllers) {
+ cpuStatPath = this.cgroupMountPath + "/" + controller + "/" + this.cgroupHierarchy + "/" + this.containerID + "/cpu.stat";
+ if (cpuStatExists(cpuStatPath)) {
+ LOG.debug("Found cpu.stat file: " + cpuStatPath);
+ try {
+ Properties cpuStatValues = new Properties(); // Treat cpu.stat as a properties file as content is space delimited key value data.
+ cpuStatValues.load(new FileReader(cpuStatPath));
+ long nrPeriod = Long.parseLong(cpuStatValues.getProperty("nr_periods", "-1.0"));
+ long nrThrottled = Long.parseLong(cpuStatValues.getProperty("nr_throttled", "-1.0"));
+ LOG.debug("cpu.stat nr_period value: " + nrPeriod);
+ LOG.debug("cpu.stat nr_throttled value: " + nrThrottled);
+ cpuThrottledRatio = (double) nrThrottled / nrPeriod;
+ break;
+ } catch (IOException | RuntimeException e) {
+ LOG.debug("Caught exception reading cpu.stat file: ", e.getMessage());
+ // return a sentinel value to signal an exception occurred.
+ return -1.0;
+ }
+ }
+ }
+ return cpuThrottledRatio;
+ }
+
+ private boolean cpuStatExists(String cpuStatPath) {
+ Path cpuStat = Paths.get(cpuStatPath);
+ return Files.exists(cpuStat);
+ }
+
+ private Configuration getHadoopConf(String hConfDir) {
+ Configuration hConf = new Configuration();
+ try {
+ URI yarnSiteURI = new URI("file://" + hConfDir + "/yarn-site.xml");
+ LOG.debug("yarn-site.xml URI: " + yarnSiteURI.toString());
+ File yarnSiteXml = new File(yarnSiteURI);
+ if (!yarnSiteXml.isFile() || !yarnSiteXml.canRead()) {
+ throw new RuntimeException("Unable to access yarn-site.xml: " + yarnSiteXml.toString());
+ }
+ URL yarnSiteUrl = yarnSiteURI.toURL();
+ hConf.addResource(yarnSiteUrl);
+ } catch (MalformedURLException | URISyntaxException | RuntimeException e) {
+ LOG.error("Unable to construct URL to yarn-site.xml: " + e.getMessage());
+ }
+ return hConf;
+ }
+}
+
+
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/OshiBasedStatisticsGetter.java b/samza-core/src/main/java/org/apache/samza/container/host/OshiBasedStatisticsGetter.java
index 22091b8a02..0520d5d9af 100644
--- a/samza-core/src/main/java/org/apache/samza/container/host/OshiBasedStatisticsGetter.java
+++ b/samza-core/src/main/java/org/apache/samza/container/host/OshiBasedStatisticsGetter.java
@@ -67,6 +67,11 @@ public SystemMemoryStatistics getSystemMemoryStatistics() {
throw new UnsupportedOperationException("Not implemented");
}
+ @Override
+ public LinuxCgroupStatistics getProcessCgroupStatistics() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
@Override
public ProcessCPUStatistics getProcessCPUStatistics() {
try {
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java b/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
index ec16791e7b..4019d1f3e4 100644
--- a/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
+++ b/samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java
@@ -90,4 +90,8 @@ public ProcessCPUStatistics getProcessCPUStatistics() {
throw new UnsupportedOperationException(
"No appropriate Posix command available for getting recent CPU usage information. For example, the CPU information exposed by ps command 'ps -o %cpu= -p ' represents the percentage of time spent running during the entire lifetime of a process not for the recent CPU usage");
}
+ @Override
+ public LinuxCgroupStatistics getProcessCgroupStatistics() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
}
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java b/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
index 1f5297ed41..69b0f16a98 100644
--- a/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/container/host/StatisticsMonitorImpl.java
@@ -120,13 +120,15 @@ public void run() {
private void sampleStatistics() {
SystemMemoryStatistics memoryStatistics = null;
ProcessCPUStatistics cpuStatistics = null;
+ LinuxCgroupStatistics cgroupStatistics = null;
try {
memoryStatistics = statisticsGetter.getSystemMemoryStatistics();
cpuStatistics = statisticsGetter.getProcessCPUStatistics();
+ cgroupStatistics = statisticsGetter.getProcessCgroupStatistics();
} catch (Throwable e) {
LOG.error("Error during obtaining statistics: ", e);
}
- SystemStatistics systemStatistics = new SystemStatistics(cpuStatistics, memoryStatistics);
+ SystemStatistics systemStatistics = new SystemStatistics(cpuStatistics, memoryStatistics, cgroupStatistics);
for (Listener listener : listenerSet.keySet()) {
try {
// catch all exceptions to shield one listener from exceptions thrown by others.
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/SystemStatistics.java b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatistics.java
index 5b0d45b741..383169693d 100644
--- a/samza-core/src/main/java/org/apache/samza/container/host/SystemStatistics.java
+++ b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatistics.java
@@ -28,10 +28,12 @@ public class SystemStatistics {
private final ProcessCPUStatistics cpuStatistics;
private final SystemMemoryStatistics memoryStatistics;
+ private final LinuxCgroupStatistics cgroupStatistics;
- public SystemStatistics(ProcessCPUStatistics cpuStatistics, SystemMemoryStatistics memoryStatistics) {
+ public SystemStatistics(ProcessCPUStatistics cpuStatistics, SystemMemoryStatistics memoryStatistics, LinuxCgroupStatistics cgroupStatistics) {
this.cpuStatistics = cpuStatistics;
this.memoryStatistics = memoryStatistics;
+ this.cgroupStatistics = cgroupStatistics;
}
public ProcessCPUStatistics getCpuStatistics() {
@@ -42,6 +44,10 @@ public SystemMemoryStatistics getMemoryStatistics() {
return memoryStatistics;
}
+ public LinuxCgroupStatistics getCgroupStatistics() {
+ return cgroupStatistics;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java
index f340896c6a..4963999638 100644
--- a/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java
+++ b/samza-core/src/main/java/org/apache/samza/container/host/SystemStatisticsGetter.java
@@ -38,4 +38,12 @@ public interface SystemStatisticsGetter {
* @return {@link ProcessCPUStatistics} for the Samza container process
*/
ProcessCPUStatistics getProcessCPUStatistics();
+
+ /**
+ * Returns the {@link LinuxCgroupStatistics} for the current Samza container process(includes its child processes). A
+ * 'null' value is returned if no statistics are available.
+ *
+ * @return {@link LinuxCgroupStatistics} for the Samza container process
+ */
+ LinuxCgroupStatistics getProcessCgroupStatistics();
}
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index 55bfa924a0..d773725f6c 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -53,6 +53,7 @@ class SamzaContainerMetrics(
val containerThreadPoolSize = newGauge("container-thread-pool-size", 0L)
val containerActiveThreads = newGauge("container-active-threads", 0L)
val containerRunning = newGauge("container-running", 0L)
+ val cpuThrottleRatio = newGauge("cpu-throttle-ratio", 0.0F)
val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]()
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerMonitorListener.java b/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerMonitorListener.java
index 5adb114f0e..ff1725126f 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerMonitorListener.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestSamzaContainerMonitorListener.java
@@ -22,6 +22,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.host.LinuxCgroupStatistics;
import org.apache.samza.container.host.ProcessCPUStatistics;
import org.apache.samza.container.host.SystemMemoryStatistics;
import org.apache.samza.container.host.SystemStatistics;
@@ -43,13 +44,15 @@ public class TestSamzaContainerMonitorListener {
private SystemMemoryStatistics memorySample;
@Mock
private ThreadPoolExecutor taskThreadPool;
+ @Mock
+ private LinuxCgroupStatistics cgroupSample;
private SystemStatistics sample;
-
private final double cpuUsage = 30.0;
private final int containerMemoryMb = 2048;
private final long physicalMemoryBytes = 1024000L;
private final int activeThreadCount = 2;
+ private final double throttleRatio = 0.53;
private final Config config =
new MapConfig(Collections.singletonMap("cluster-manager.container.memory.mb", String.valueOf(containerMemoryMb)));
@@ -64,8 +67,8 @@ public void setup() {
when(cpuSample.getProcessCPUUsagePercentage()).thenReturn(cpuUsage);
when(memorySample.getPhysicalMemoryBytes()).thenReturn(physicalMemoryBytes);
when(taskThreadPool.getActiveCount()).thenReturn(activeThreadCount);
-
- sample = new SystemStatistics(cpuSample, memorySample);
+ when(cgroupSample.getCgroupCpuThrottleRatio()).thenReturn(throttleRatio);
+ sample = new SystemStatistics(cpuSample, memorySample, cgroupSample);
samzaContainerMonitorListener = new SamzaContainerMonitorListener(config, containerMetrics, taskThreadPool);
}
@@ -77,5 +80,6 @@ public void testOnUpdate() {
assertEquals(physicalMemoryMb, containerMetrics.physicalMemoryMb().getValue());
assertEquals(physicalMemoryMb / containerMemoryMb, containerMetrics.physicalMemoryUtilization().getValue());
assertEquals(activeThreadCount, containerMetrics.containerActiveThreads().getValue());
+ assertEquals(throttleRatio, containerMetrics.cpuThrottleRatio().getValue());
}
}
diff --git a/samza-core/src/test/java/org/apache/samza/container/host/TestDefaultSystemStatisticsGetter.java b/samza-core/src/test/java/org/apache/samza/container/host/TestDefaultSystemStatisticsGetter.java
index 04f9669d0a..6c9cc0891c 100644
--- a/samza-core/src/test/java/org/apache/samza/container/host/TestDefaultSystemStatisticsGetter.java
+++ b/samza-core/src/test/java/org/apache/samza/container/host/TestDefaultSystemStatisticsGetter.java
@@ -33,13 +33,16 @@ public class TestDefaultSystemStatisticsGetter {
@Mock
private PosixCommandBasedStatisticsGetter posixCommandBasedStatisticsGetter;
+ @Mock
+ private LinuxCgroupStatisticsGetter linuxCgroupStatisticsGetter;
+
private DefaultSystemStatisticsGetter defaultSystemStatisticsGetter;
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
this.defaultSystemStatisticsGetter =
- new DefaultSystemStatisticsGetter(oshiBasedStatisticsGetter, posixCommandBasedStatisticsGetter);
+ new DefaultSystemStatisticsGetter(oshiBasedStatisticsGetter, posixCommandBasedStatisticsGetter, linuxCgroupStatisticsGetter);
}
@Test
@@ -53,4 +56,10 @@ public void testGetProcessCPUStatistics() {
defaultSystemStatisticsGetter.getProcessCPUStatistics();
verify(oshiBasedStatisticsGetter).getProcessCPUStatistics();
}
+
+ @Test
+ public void testGetProcessCgroupStatistics() {
+ defaultSystemStatisticsGetter.getProcessCgroupStatistics();
+ verify(linuxCgroupStatisticsGetter).getProcessCgroupStatistics();
+ }
}
diff --git a/samza-core/src/test/java/org/apache/samza/container/host/TestLinuxCgroupStatistics.java b/samza-core/src/test/java/org/apache/samza/container/host/TestLinuxCgroupStatistics.java
new file mode 100644
index 0000000000..93f819b7ef
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/host/TestLinuxCgroupStatistics.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.host;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+
+public class TestLinuxCgroupStatistics {
+ @Test
+ public void testGetCgroupCPUThrottleRatio() {
+ LinuxCgroupStatistics linuxCgroupStatistics = new LinuxCgroupStatistics(-1.0);
+ assertEquals(linuxCgroupStatistics.getCgroupCpuThrottleRatio(), -1.0, 0.05);
+ }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/container/host/TestLinuxCgroupStatisticsGetter.java b/samza-core/src/test/java/org/apache/samza/container/host/TestLinuxCgroupStatisticsGetter.java
new file mode 100644
index 0000000000..c9dd2a6193
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/host/TestLinuxCgroupStatisticsGetter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container.host;
+
+
+import static org.junit.Assert.*;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.contrib.java.lang.system.EnvironmentVariables;
+import java.util.logging.Logger;
+import org.junit.Assume;
+
+
+public class TestLinuxCgroupStatisticsGetter {
+ private static final Logger LOGGER = Logger.getLogger("TestLinuxCgroupStatisticsGetter");
+
+ @Rule
+ public TemporaryFolder myTempDir = new TemporaryFolder();
+
+ @Rule
+ public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
+
+ @Test
+ public void testGetThrottleValue() {
+ Assume.assumeTrue(System.getProperty("os.name").equals("Linux"));
+ environmentVariables.set("CONTAINER_ID", "container_abc_123");
+ environmentVariables.set("HADOOP_CONF_DIR", myTempDir.getRoot().toString());
+
+ File yarnSiteXml, cpuStatFile, cpuStatDirs;
+ try {
+ // Need stub data at my_temp_dir/yarn-site.xml
+ // my_temp_dir/cgroup/cpu/user.slice/container_abc_123/cpu.stat
+ yarnSiteXml = myTempDir.newFile("yarn-site.xml");
+
+ FileWriter fw1 = new FileWriter(yarnSiteXml);
+ BufferedWriter bw1 = new BufferedWriter(fw1);
+ bw1.write("");
+ bw1.write("yarn.nodemanager.linux-container-executor.cgroups.hierarchy");
+ bw1.write("user.slice");
+ bw1.write("yarn.nodemanager.linux-container-executor.cgroups.mount-path");
+ bw1.write("" + myTempDir.getRoot() + "/cgroup");
+ bw1.write("");
+ bw1.close();
+
+ String cpuStatPath = myTempDir.getRoot() + "/cgroup/cpu/user.slice/container_abc_123";
+ cpuStatDirs = new File(cpuStatPath);
+ cpuStatDirs.mkdirs();
+
+ cpuStatFile = new File(cpuStatDirs, "cpu.stat");
+ cpuStatFile.createNewFile();
+
+ FileWriter fw2 = new FileWriter(cpuStatFile);
+ BufferedWriter bw2 = new BufferedWriter(fw2);
+ bw2.write("nr_periods 340956467");
+ bw2.newLine();
+ bw2.write("nr_throttled 292501");
+ bw2.newLine();
+ bw2.write("throttled_time 5997018459867");
+ bw2.newLine();
+ bw2.close();
+
+ } catch (IOException e) {
+ System.err.println("Error creating temporary test files: " + e.getMessage());
+ }
+
+ LinuxCgroupStatisticsGetter linuxCgroupStatisticsGetter = new LinuxCgroupStatisticsGetter();
+ LinuxCgroupStatistics cpuStat = linuxCgroupStatisticsGetter.getProcessCgroupStatistics();
+
+ double throttleRatio = cpuStat.getCgroupCpuThrottleRatio();
+ assertEquals(throttleRatio, 0.00085788371334807384, 0.05);
+
+ }
+
+ @Test
+ public void testRunTimeIsNotHadoop() {
+ // Validate that standalone applications return the expected sentinel of -2.0.
+ environmentVariables.clear("CONTAINER_ID");
+ LinuxCgroupStatisticsGetter linuxCgroupStatisticsGetter = new LinuxCgroupStatisticsGetter();
+ LinuxCgroupStatistics cpuStat = linuxCgroupStatisticsGetter.getProcessCgroupStatistics();
+ double throttleRatio = cpuStat.getCgroupCpuThrottleRatio();
+ assertEquals(throttleRatio, -2.0, 0.05);
+ }
+
+ @Test
+ public void testExceptionReturnsNegativeOne() {
+ // Validate that exceptions return a sentinel of -1.0.
+ environmentVariables.set("CONTAINER_ID", "container_abc_123");
+ environmentVariables.set("HADOOP_CONF_DIR", "/fake/path/to/non_existent/cgroup/directory");
+ LinuxCgroupStatisticsGetter linuxCgroupStatisticsGetter = new LinuxCgroupStatisticsGetter();
+ LinuxCgroupStatistics cpuStat = linuxCgroupStatisticsGetter.getProcessCgroupStatistics();
+ double throttleRatio = cpuStat.getCgroupCpuThrottleRatio();
+ assertEquals(throttleRatio, -1.0, 0.05);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testGetSystemMemoryStatistics() {
+ LinuxCgroupStatisticsGetter linuxCgroupStatisticsGetter = new LinuxCgroupStatisticsGetter();
+ linuxCgroupStatisticsGetter.getSystemMemoryStatistics();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testGetProcessCPUStatistics() {
+ LinuxCgroupStatisticsGetter linuxCgroupStatisticsGetter = new LinuxCgroupStatisticsGetter();
+ linuxCgroupStatisticsGetter.getProcessCPUStatistics();
+ }
+
+}