Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SAMZA-2800] A new Control Group Metric for Samza #1699

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ project(":samza-core_$scalaSuffix") {
compile "com.github.oshi:oshi-core:$oshiVersion"
compile "net.java.dev.jna:jna:$jnaVersion"
compile "net.java.dev.jna:jna-platform:$jnaVersion"
compile("org.apache.hadoop:hadoop-common:$yarnVersion") {
exclude module: 'servlet-api'
// Exclude because YARN's 3.4.5 ZK version is incompatbile with Kafka's 3.3.4.
exclude module: 'zookeeper'
}
testCompile "com.github.stefanbirkner:system-rules:$systemRulesVersion"
testCompile project(":samza-api").sourceSets.test.output
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,11 @@ <h1>Samza Metrics Reference</h1>
<td>system-cpu-usage</td>
<td>Current CPU usage of the all processes in the whole system as a percentage from 0 to 100. The percentage represents the proportion of executed ticks by all processes to the total ticks across all CPUs. A negative number indicates the value was not available from the operating system. For more detail, see the JavaDoc for com.sun.management.OperatingSystemMXBean.</td>
</tr>
<tr>
<td>cpu-throttle-ratio</td>
<td>This reports if Linux Control Groups are throttling the Samza container. Returned values are a ratio that range from 0 to 1, where 0 represents not being throttled. Negative results indicate the control group values were not available from the operating system.</td>
</tr>

<tr>
<td>open-file-descriptor-count</td>
<td>Current number of open file descriptors</td>
Expand Down
37 changes: 19 additions & 18 deletions docs/learn/documentation/versioned/operations/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -380,24 +380,25 @@ All \<system\>, \<stream\>, \<partition\>, \<store-name\>, \<topic\>, are popula
| | block-ns | Average time the run loop is blocked because all task instances are busy processing input; could indicate lag accumulating. |
| | container-startup-time | Time spent in starting the container. This includes time to start the JMX server, starting metrics reporters, starting system producers, consumers, system admins, offset manager, locality manager, disk space manager, security manager, statistics manager, and initializing all task instances. |

| **Group** | **Metric name** | **Meaning** |
| --- | --- | --- |
| **SamzaContainerMetrics (Counters and Gauges)** | commit-calls | Number of commits. Each commit includes input checkpointing, flushing producers, checkpointing KV stores, flushing side input stores, etc. |
| | window-calls | In case of WindowableTask, this measures the number of window invocations. |
| | timer-calls | Number of timer callbacks. |
| | process-calls | Number of process method invocations. |
| | process-envelopers | Number of input message envelopes processed. |
| | process-null-envelopes | Number of times no input message envelopes was available for the run loop to process. |
| | event-loop-utilization | The duty-cycle of the event loop. That is, the fraction of time of each event loop iteration that is spent in process(), window(), and commit. |
| | disk-usage-bytes | Total disk space size used by key-value stores (in bytes). |
| | disk-quota-bytes | Disk memory usage quota for key-value stores (in bytes). |
| | executor-work-factor | The work factor of the run loop. A work factor of 1 indicates full throughput, while a work factor of less than 1 will introduce delays into the execution to approximate the requested work factor. The work factor is set by the disk space monitor in accordance with the disk quota policy. Given the latest percentage of available disk quota, this policy returns the work factor that should be applied. |
| | total-process-cpu-usage | The process cpu usage percentage (in the [0, 100] interval) used by the Samza container process and all its child processes. |
| | physical-memory-mb | The physical memory used by the Samza container process (native + on heap) (in MBs). |
| | physical-memory-utilization | The ratio between the physical memory used by the Samza container process (native + on heap) and the total physical memory of the Samza container. |
| | container-thread-pool-size | The current size of a Samza container's thread pool. It may or may not be the same as job.container.thread.pool.size, depending on the implementation. |
| | container-active-threads | The approximate actively used threads in a Samza container's thread pool. |
| | <TaskName\>-<StoreName\>-restore-time | Time taken to restore task stores (per task store). |
| **Group** | **Metric name** | **Meaning** |
| --- |---------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **SamzaContainerMetrics (Counters and Gauges)** | commit-calls | Number of commits. Each commit includes input checkpointing, flushing producers, checkpointing KV stores, flushing side input stores, etc. |
| | window-calls | In case of WindowableTask, this measures the number of window invocations. |
| | timer-calls | Number of timer callbacks. |
| | process-calls | Number of process method invocations. |
| | process-envelopers | Number of input message envelopes processed. |
| | process-null-envelopes | Number of times no input message envelopes was available for the run loop to process. |
| | event-loop-utilization | The duty-cycle of the event loop. That is, the fraction of time of each event loop iteration that is spent in process(), window(), and commit. |
| | disk-usage-bytes | Total disk space size used by key-value stores (in bytes). |
| | disk-quota-bytes | Disk memory usage quota for key-value stores (in bytes). |
| | executor-work-factor | The work factor of the run loop. A work factor of 1 indicates full throughput, while a work factor of less than 1 will introduce delays into the execution to approximate the requested work factor. The work factor is set by the disk space monitor in accordance with the disk quota policy. Given the latest percentage of available disk quota, this policy returns the work factor that should be applied. |
| | total-process-cpu-usage | The process cpu usage percentage (in the [0, 100] interval) used by the Samza container process and all its child processes. |
| | cpu-throttle-ratio | This reports if Linux Control Groups are throttling the Samza container. Returned values are a ratio that range from 0 to 1, where 0 represents not being throttled. |
| | physical-memory-mb | The physical memory used by the Samza container process (native + on heap) (in MBs). |
| | physical-memory-utilization | The ratio between the physical memory used by the Samza container process (native + on heap) and the total physical memory of the Samza container. |
| | container-thread-pool-size | The current size of a Samza container's thread pool. It may or may not be the same as job.container.thread.pool.size, depending on the implementation. |
| | container-active-threads | The approximate actively used threads in a Samza container's thread pool. |
| | <TaskName\>-<StoreName\>-restore-time | Time taken to restore task stores (per task store). |


| **Group** | **Metric name** | **Meaning** |
Expand Down
3 changes: 2 additions & 1 deletion gradle/dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
scalaTestVersion = "3.0.1"
snappyVersion = "1.1.8.4"
slf4jVersion = "1.7.7"
yarnVersion = "2.10.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to split up the Hadoop version bump (and the related hadoop zookeeper gradle exclude) to a separate PR? It feels like it may be significant enough to stand alone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in the next push

yarnVersion = "2.10.2"
systemRulesVersion = "1.19.0"
zkClientVersion = "0.11"
zookeeperVersion = "3.6.3"
failsafeVersion = "2.4.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.samza.config.ClusterManagerConfig;
import org.apache.samza.config.Config;
import org.apache.samza.container.host.LinuxCgroupStatistics;
import org.apache.samza.container.host.ProcessCPUStatistics;
import org.apache.samza.container.host.SystemMemoryStatistics;
import org.apache.samza.container.host.SystemStatistics;
Expand Down Expand Up @@ -72,5 +73,14 @@ public void onUpdate(SystemStatistics sample) {
LOGGER.debug("Container active threads count: " + containerActiveThreads);
containerMetrics.containerActiveThreads().set(containerActiveThreads);
}

// Update CGroup related metrics
LinuxCgroupStatistics cpuThrottle = sample.getCgroupStatistics();
if (Objects.nonNull(cpuThrottle)) {
double cpuThrottleRatio = cpuThrottle.getCgroupCpuThrottleRatio();
LOGGER.debug("Container CGROUP Throttle Ratio: " + cpuThrottleRatio);
containerMetrics.cpuThrottleRatio().set(cpuThrottleRatio);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,28 @@


/**
* An default implementation of {@link SystemStatisticsGetter} that relies on {@link PosixCommandBasedStatisticsGetter}
* and {@link OshiBasedStatisticsGetter} implementations
* An default implementation of {@link SystemStatisticsGetter} that relies on {@link PosixCommandBasedStatisticsGetter},
* {@link OshiBasedStatisticsGetter}, and {@link LinuxCgroupStatisticsGetter}, implementations.
*/
public class DefaultSystemStatisticsGetter implements SystemStatisticsGetter {
private final OshiBasedStatisticsGetter oshiBasedStatisticsGetter;
private final PosixCommandBasedStatisticsGetter posixCommandBasedStatisticsGetter;
private final LinuxCgroupStatisticsGetter linuxCgroupStatisticsGetter;


public DefaultSystemStatisticsGetter() {
this(new OshiBasedStatisticsGetter(), new PosixCommandBasedStatisticsGetter());
this(new OshiBasedStatisticsGetter(), new PosixCommandBasedStatisticsGetter(), new LinuxCgroupStatisticsGetter());
}

@VisibleForTesting
DefaultSystemStatisticsGetter(OshiBasedStatisticsGetter oshiBasedStatisticsGetter,
PosixCommandBasedStatisticsGetter posixCommandBasedStatisticsGetter) {
DefaultSystemStatisticsGetter(
OshiBasedStatisticsGetter oshiBasedStatisticsGetter,
PosixCommandBasedStatisticsGetter posixCommandBasedStatisticsGetter,
LinuxCgroupStatisticsGetter linuxCgroupStatisticsGetter
) {
this.oshiBasedStatisticsGetter = oshiBasedStatisticsGetter;
this.posixCommandBasedStatisticsGetter = posixCommandBasedStatisticsGetter;
this.linuxCgroupStatisticsGetter = linuxCgroupStatisticsGetter;
}

@Override
Expand All @@ -49,4 +55,9 @@ public SystemMemoryStatistics getSystemMemoryStatistics() {
public ProcessCPUStatistics getProcessCPUStatistics() {
return oshiBasedStatisticsGetter.getProcessCPUStatistics();
}

@Override
public LinuxCgroupStatistics getProcessCgroupStatistics() {
return linuxCgroupStatisticsGetter.getProcessCgroupStatistics();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.container.host;

import java.util.Objects;

/**
* A {@link org.apache.samza.container.host.LinuxCgroupStatistics} object represents recent Cgroup CPU values
*/
public class LinuxCgroupStatistics {

/**
* The Cgroup CPU throttle Ratio for the Yarn container's control group, defined as nr_throttled over nr_periods.
*/
private final double cgroupCpuThrottleRatio;

LinuxCgroupStatistics(double cgroupCpuThrottleRatio) {
this.cgroupCpuThrottleRatio = cgroupCpuThrottleRatio;
}

public double getCgroupCpuThrottleRatio() {
return cgroupCpuThrottleRatio;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
org.apache.samza.container.host.LinuxCgroupStatistics that = (org.apache.samza.container.host.LinuxCgroupStatistics) o;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be fully qualified here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in the next push

return Double.compare(that.cgroupCpuThrottleRatio, cgroupCpuThrottleRatio) == 0;
}

@Override
public int hashCode() {
return Objects.hash(cgroupCpuThrottleRatio);
}

@Override
public String toString() {
return "LinuxCgroupStatistics{" + "cpuThrottleRatio=" + cgroupCpuThrottleRatio + '}';
}
}





Loading
Loading