Skip to content

Commit

Permalink
Merge oss (apache#31)
Browse files Browse the repository at this point in the history
* SAMZA-2797: Call flush during stop from CoordinatorStreamWriter (apache#1692)

* SAMZA-2798: Populate worker.opts in environment variable only if available (apache#1693)

Description
Populate worker.opts in the environment variable only if available in the configs.

Changes
Check if worker.opts is present and then add it to environment variable

Tests
Updated unit tests

* Add MAX_BACKGROUND_JOBS config for RocksDB (apache#1694)

* SAMZA-2784: Remove excessive commit logs (apache#1695)

* SAMZA-2799: Remove worker.opts handling in shell command builder (apache#1696)

---------

Co-authored-by: ajo thomas <[email protected]>
Co-authored-by: Bharath Kumarasubramanian <[email protected]>
Co-authored-by: Shekhar Sharma <[email protected]>
Co-authored-by: Daniel Chen <[email protected]>
  • Loading branch information
5 people authored Jan 22, 2024
1 parent 4887bad commit 4e60d5b
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ public Map<String, String> buildEnvironment() {
envBuilder.put(ShellCommandConfig.ENV_JAVA_OPTS, shellCommandConfig.getTaskOpts().orElse(""));
envBuilder.put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR,
shellCommandConfig.getAdditionalClasspathDir().orElse(""));
shellCommandConfig.getWorkerOpts()
.ifPresent(workerOpts -> envBuilder.put(ShellCommandConfig.WORKER_JVM_OPTS, workerOpts));
shellCommandConfig.getJavaHome().ifPresent(javaHome -> envBuilder.put(ShellCommandConfig.ENV_JAVA_HOME, javaHome));
return envBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public CompletableFuture<Map<String, String>> upload(CheckpointId checkpointId,
CompletionStage<String> snapshotIndexBlobIdFuture =
snapshotIndexFuture
.thenComposeAsync(si -> {
LOG.info("Uploading Snapshot index: {} for task: {} store: {}", si, taskName, storeName);
LOG.debug("Uploading Snapshot index: {} for task: {} store: {}", si, taskName, storeName);
return blobStoreUtil.putSnapshotIndex(si);
}, executor);

Expand Down Expand Up @@ -296,7 +296,7 @@ public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, St
// 1. remove TTL of index blob and all of its files and sub-dirs marked for retention
CompletionStage<Void> removeTTLFuture =
snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
LOG.info("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}",
LOG.debug("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}",
snapshotIndexBlobId, taskName, storeName);
return blobStoreUtil.removeTTL(snapshotIndexBlobId, snapshotIndex, requestMetadata);
}, executor);
Expand All @@ -305,7 +305,7 @@ public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, St
// 2. delete the files/subdirs marked for deletion in the snapshot index.
CompletionStage<Void> cleanupRemoteSnapshotFuture =
snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
LOG.info("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}",
LOG.debug("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}",
snapshotIndexBlobId, taskName, storeName);
return blobStoreUtil.cleanUpDir(snapshotIndex.getDirIndex(), requestMetadata);
}, executor);
Expand All @@ -317,7 +317,7 @@ public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, St
snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
if (snapshotIndex.getPrevSnapshotIndexBlobId().isPresent()) {
String blobId = snapshotIndex.getPrevSnapshotIndexBlobId().get();
LOG.info("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.",
LOG.debug("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.",
blobId, taskName, storeName);
return blobStoreUtil.deleteSnapshotIndexBlob(blobId, requestMetadata);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public static BiPredicate<File, FileIndex> areSameFile(boolean compareLargeFileC
if (!compareLargeFileChecksums && isLargeFile) {
// Since RocksDB SST files are immutable after creation, we can skip the expensive checksum computations
// which requires reading the entire file.
LOG.debug("Local file: {} and remote file: {} are same. " +
LOG.debug("Local file: {} and remote file: {} both present. " +
"Skipping checksum calculation for large file of size: {}.",
localFile.getAbsolutePath(), remoteFile.getFileName(), localFileAttrs.size());
return true;
Expand All @@ -234,7 +234,7 @@ public static BiPredicate<File, FileIndex> areSameFile(boolean compareLargeFileC

boolean areSameChecksum = localFileChecksum == remoteFile.getChecksum();
if (!areSameChecksum) {
LOG.warn("Local file: {} and remote file: {} are not same. " +
LOG.debug("Local file: {} and remote file: {} are not same. " +
"Local checksum: {}. Remote checksum: {}",
localFile.getAbsolutePath(), remoteFile.getFileName(), localFileChecksum, remoteFile.getChecksum());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class TaskInstance(
// WARNING: cleanUp is NOT optional with blob stores since this is where we reset the TTL for
// tracked blobs. if this TTL reset is skipped, some of the blobs retained by future commits may
// be deleted in the background by the blob store, leading to data loss.
info("Cleaning up stale state from previous run for taskName: %s" format taskName)
debug("Cleaning up stale state from previous run for taskName: %s" format taskName)
commitManager.cleanUp(checkpointV2.getCheckpointId, checkpointV2.getStateCheckpointMarkers)
}

Expand Down Expand Up @@ -474,7 +474,7 @@ class TaskInstance(
new Function[util.Map[String, util.Map[String, String]], CompletableFuture[Void]] {
override def apply(uploadSCMs: util.Map[String, util.Map[String, String]]): CompletableFuture[Void] = {
// Perform cleanup on unused checkpoints
info("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId))
debug("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId))
val cleanUpStartTime = System.nanoTime()
try {
commitManager.cleanUp(checkpointId, uploadSCMs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public void testBuildEnvironment() throws MalformedURLException {
Config config = new MapConfig(new ImmutableMap.Builder<String, String>()
.put(ShellCommandConfig.COMMAND_SHELL_EXECUTE, "foo")
.put(ShellCommandConfig.TASK_JVM_OPTS, "-Xmx4g")
.put(ShellCommandConfig.WORKER_JVM_OPTS, "-Xmx2g")
.put(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR, "/path/to/additional/classpath")
.put(ShellCommandConfig.TASK_JAVA_HOME, "/path/to/java/home")
.build());
Expand All @@ -72,7 +71,6 @@ public void testBuildEnvironment() throws MalformedURLException {
.put(ShellCommandConfig.ENV_CONTAINER_ID, "1")
.put(ShellCommandConfig.ENV_COORDINATOR_URL, URL_STRING)
.put(ShellCommandConfig.ENV_JAVA_OPTS, "-Xmx4g")
.put(ShellCommandConfig.WORKER_JVM_OPTS, "-Xmx2g")
.put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "/path/to/additional/classpath")
.put(ShellCommandConfig.ENV_JAVA_HOME, "/path/to/java/home")
.build();
Expand Down

0 comments on commit 4e60d5b

Please sign in to comment.