Skip to content

Commit

Permalink
SAMZA-2763: Support worker JVM opts for Samza Beam portable mode (#1689)
Browse files Browse the repository at this point in the history
Summary: Support JVM options for worker process in Samza Beam portable mode
Description: With portable mode support for Samza Beam, we want to tune and configure the JVM options for worker process. In this PR, we add support by introducing worker.opts configuration and autosizing integration support.

Changes:
- Added worker.opts configuration
- Add autosizing integration support for Xmx
- Updated configuration table and website

API Changes: None

Usage Instructions: worker.opts can be used similar to other samza application configuration although it only applies to Samza Beam portable execution mode and is ignored otherwise.

Upgrade Instructions: None
  • Loading branch information
mynameborat authored Nov 20, 2023
1 parent 2e91dfe commit 65f31eb
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 16 deletions.
15 changes: 15 additions & 0 deletions docs/learn/documentation/versioned/jobs/configuration-table.html
Original file line number Diff line number Diff line change
Expand Up @@ -2048,6 +2048,21 @@ <h1>Samza Configuration Reference</h1>
</th>
</tr>

<tr>
<td class="property" id="worker-opts">worker.opts</td>
<td class="default"></td>
<td class="description">
Any JVM options to include in the command line when executing worker process in portable execution of Samza using beam. For example,
this can be used to set the JVM heap size, to tune the garbage collector, or to enable
<a href="/learn/tutorials/{{site.version}}/remote-debugging-samza.html">remote debugging</a>.
Anything you put in <code>worker.opts</code> gets forwarded directly to the commandline of worker process as part of the JVM invocation.
<b>Note:</b> The configuration only applies for Samza Beam portable mode.
<dl>
<dt>Example: <code>worker.opts=-XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC</code></dt>
</dl>
</td>
</tr>

<tr>
<td class="property" id="yarn-package-path">yarn.package.path</td>
<td class="default"></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public class JobConfig extends MapConfig {
public static final String JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB = JOB_AUTOSIZING_CONFIG_PREFIX + "container.maxheap.mb";
public static final String JOB_AUTOSIZING_CONTAINER_MEMORY_MB = JOB_AUTOSIZING_CONFIG_PREFIX + "container.memory.mb";
public static final String JOB_AUTOSIZING_CONTAINER_MAX_CORES = JOB_AUTOSIZING_CONFIG_PREFIX + "container.cpu.cores";
public static final String JOB_AUTOSIZING_WORKER_MAX_HEAP_MB = JOB_AUTOSIZING_CONFIG_PREFIX + "worker.maxheap.mb";

public static final String COORDINATOR_STREAM_FACTORY = "job.coordinatorstream.config.factory";
public static final String DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY = "org.apache.samza.util.DefaultCoordinatorStreamConfigFactory";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.samza.config;

import com.google.common.annotations.VisibleForTesting;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;


public class ShellCommandConfig extends MapConfig {
Expand Down Expand Up @@ -77,6 +79,7 @@ public class ShellCommandConfig extends MapConfig {

public static final String COMMAND_SHELL_EXECUTE = "task.execute";
public static final String TASK_JVM_OPTS = "task.opts";
public static final String WORKER_JVM_OPTS = "worker.opts";
public static final String TASK_JAVA_HOME = "task.java.home";

/**
Expand All @@ -97,20 +100,19 @@ public String getCommand() {
}

public Optional<String> getTaskOpts() {
Optional<String> jvmOpts = Optional.ofNullable(get(ShellCommandConfig.TASK_JVM_OPTS));
Optional<String> maxHeapMbOptional = Optional.ofNullable(get(JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB));
if (new JobConfig(this).getAutosizingEnabled() && maxHeapMbOptional.isPresent()) {
String maxHeapMb = maxHeapMbOptional.get();
String xmxSetting = "-Xmx" + maxHeapMb + "m";
if (jvmOpts.isPresent() && jvmOpts.get().contains("-Xmx")) {
jvmOpts = Optional.of(jvmOpts.get().replaceAll("-Xmx\\S+", xmxSetting));
} else if (jvmOpts.isPresent()) {
jvmOpts = Optional.of(jvmOpts.get().concat(" " + xmxSetting));
} else {
jvmOpts = Optional.of(xmxSetting);
}
}
return jvmOpts;
String taskOpts = get(ShellCommandConfig.TASK_JVM_OPTS);
String autosizingContainerMaxHeap = get(JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB);

return Optional.ofNullable(getFinalJvmOptions(taskOpts, autosizingContainerMaxHeap));
}

/**
* Returns the worker opts for the application if available.
*/
public Optional<String> getWorkerOpts() {
String autosizingWorkerHeapMb = get(JobConfig.JOB_AUTOSIZING_WORKER_MAX_HEAP_MB);
String workerOpts = get(ShellCommandConfig.WORKER_JVM_OPTS);
return Optional.ofNullable(getFinalJvmOptions(workerOpts, autosizingWorkerHeapMb));
}

public Optional<String> getJavaHome() {
Expand All @@ -120,4 +122,26 @@ public Optional<String> getJavaHome() {
public Optional<String> getAdditionalClasspathDir() {
return Optional.ofNullable(get(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR));
}

/**
* Returns the final JVM options by applying the heap override if available to the jvm opts
*/
@VisibleForTesting
String getFinalJvmOptions(String jvmOpts, String maxHeapOverride) {
String finalJvmOpts = jvmOpts;
if (new JobConfig(this).getAutosizingEnabled() && StringUtils.isNotEmpty(maxHeapOverride)) {
String xmxSetting = "-Xmx" + maxHeapOverride + "m";
if (StringUtils.isNotBlank(jvmOpts)) {
if (jvmOpts.contains("-Xmx")) {
finalJvmOpts = jvmOpts.replaceAll("-Xmx\\S+", xmxSetting);
} else {
finalJvmOpts = jvmOpts.concat(" " + xmxSetting);
}
} else {
finalJvmOpts = xmxSetting;
}
}

return finalJvmOpts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public Map<String, String> buildEnvironment() {
envBuilder.put(ShellCommandConfig.ENV_CONTAINER_ID, this.id);
envBuilder.put(ShellCommandConfig.ENV_COORDINATOR_URL, this.url.toString());
envBuilder.put(ShellCommandConfig.ENV_JAVA_OPTS, shellCommandConfig.getTaskOpts().orElse(""));
envBuilder.put(ShellCommandConfig.WORKER_JVM_OPTS, shellCommandConfig.getWorkerOpts().orElse(""));
envBuilder.put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR,
shellCommandConfig.getAdditionalClasspathDir().orElse(""));
shellCommandConfig.getJavaHome().ifPresent(javaHome -> envBuilder.put(ShellCommandConfig.ENV_JAVA_HOME, javaHome));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import com.google.common.collect.ImmutableMap;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.*;


public class TestShellCommandConfig {
Expand Down Expand Up @@ -81,6 +80,66 @@ public void testGetTaskOptsAutosizingEnabled() {
assertEquals(Optional.of("-Dproperty=value -Xmx1024m"), shellCommandConfig.getTaskOpts());
}

@Test
public void testGetWorkerOptsAutosizingDisabled() {
ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new MapConfig(
ImmutableMap.of(JobConfig.JOB_AUTOSIZING_WORKER_MAX_HEAP_MB,
"1024", "worker.opts", "-Xmx10m -Dproperty=value")));

String workerOpts = shellCommandConfig.getWorkerOpts()
.orElse(null);
String expectedOpts = "-Xmx10m -Dproperty=value";

assertNotNull(workerOpts);
assertEquals(expectedOpts, workerOpts);
}

@Test
public void testGetWorkerOptsAutosizingEnabled() {
ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new MapConfig(
ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true", JobConfig.JOB_AUTOSIZING_WORKER_MAX_HEAP_MB,
"1024", "worker.opts", "-Xmx10m -Dproperty=value")));

String workerOpts = shellCommandConfig.getWorkerOpts()
.orElse(null);
String expectedOpts = "-Xmx1024m -Dproperty=value";

assertNotNull(workerOpts);
assertEquals(expectedOpts, workerOpts);
}

@Test
public void testGetFinalJvmOptionsAutosizingDisabled() {
ShellCommandConfig shellCommandConfig =
new ShellCommandConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "false")));
String jvmOptions = "";
String expectedJvmOptions = "";

// no override passed
assertEquals(expectedJvmOptions, shellCommandConfig.getFinalJvmOptions(jvmOptions, ""));

// ignore override since autosizing is disabled
assertEquals(expectedJvmOptions, shellCommandConfig.getFinalJvmOptions(jvmOptions, "2048"));
}

@Test
public void testGetFinalJvmOptionsAutosizingEnabled() {
ShellCommandConfig shellCommandConfig =
new ShellCommandConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true")));
String jvmOptions = "-Xmx1024m";
String expectedJvmOptions = "-Xmx1024m";
assertEquals(expectedJvmOptions, shellCommandConfig.getFinalJvmOptions(jvmOptions, ""));

// override should take effect with autosizing enabled
expectedJvmOptions = "-Xmx2048m";
assertEquals(expectedJvmOptions, shellCommandConfig.getFinalJvmOptions(jvmOptions, "2048"));

// override should take effect even if xmx is not set
jvmOptions = "-Dproperty=value";
expectedJvmOptions = "-Dproperty=value -Xmx2048m";
assertEquals(expectedJvmOptions, shellCommandConfig.getFinalJvmOptions(jvmOptions, "2048"));
}

@Test
public void testGetJavaHome() {
ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new MapConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public void testBasicBuild() throws MalformedURLException {
ShellCommandConfig.ENV_CONTAINER_ID, "1",
ShellCommandConfig.ENV_COORDINATOR_URL, URL_STRING,
ShellCommandConfig.ENV_JAVA_OPTS, "",
ShellCommandConfig.WORKER_JVM_OPTS, "",
ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
// assertions when command path is not set
assertEquals("foo", shellCommandBuilder.buildCommand());
Expand All @@ -60,6 +61,7 @@ public void testBuildEnvironment() throws MalformedURLException {
Config config = new MapConfig(new ImmutableMap.Builder<String, String>()
.put(ShellCommandConfig.COMMAND_SHELL_EXECUTE, "foo")
.put(ShellCommandConfig.TASK_JVM_OPTS, "-Xmx4g")
.put(ShellCommandConfig.WORKER_JVM_OPTS, "-Xmx2g")
.put(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR, "/path/to/additional/classpath")
.put(ShellCommandConfig.TASK_JAVA_HOME, "/path/to/java/home")
.build());
Expand All @@ -71,6 +73,7 @@ public void testBuildEnvironment() throws MalformedURLException {
.put(ShellCommandConfig.ENV_CONTAINER_ID, "1")
.put(ShellCommandConfig.ENV_COORDINATOR_URL, URL_STRING)
.put(ShellCommandConfig.ENV_JAVA_OPTS, "-Xmx4g")
.put(ShellCommandConfig.WORKER_JVM_OPTS, "-Xmx2g")
.put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "/path/to/additional/classpath")
.put(ShellCommandConfig.ENV_JAVA_HOME, "/path/to/java/home")
.build();
Expand Down

0 comments on commit 65f31eb

Please sign in to comment.