From 4e60d5b674e91fed95b19b9acb99f940857077f9 Mon Sep 17 00:00:00 2001 From: Bharath Kumarasubramanian Date: Mon, 22 Jan 2024 14:41:24 -0800 Subject: [PATCH] Merge oss (#31) * SAMZA-2797: Call flush during stop from CoordinatorStreamWriter (#1692) * SAMZA-2798: Populate worker.opts in environment variable only if available (#1693) Description Populate worker.opts in the environment variable only if available in the configs. Changes Check if worker.opts is present and then add it to environment variable Tests Updated unit tests * Add MAX_BACKGROUND_JOBS config for RocksDB (#1694) * SAMZA-2784: Remove excessive commit logs (#1695) * SAMZA-2799: Remove worker.opts handling in shell command builder (#1696) --------- Co-authored-by: ajo thomas Co-authored-by: Bharath Kumarasubramanian Co-authored-by: Shekhar Sharma <72765053+shekhars-li@users.noreply.github.com> Co-authored-by: Daniel Chen --- .../java/org/apache/samza/job/ShellCommandBuilder.java | 2 -- .../samza/storage/blobstore/BlobStoreBackupManager.java | 8 ++++---- .../apache/samza/storage/blobstore/util/DirDiffUtil.java | 4 ++-- .../scala/org/apache/samza/container/TaskInstance.scala | 4 ++-- .../org/apache/samza/job/TestShellCommandBuilder.java | 2 -- 5 files changed, 8 insertions(+), 12 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java b/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java index 4262145eef..37253442a4 100644 --- a/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java +++ b/samza-core/src/main/java/org/apache/samza/job/ShellCommandBuilder.java @@ -46,8 +46,6 @@ public Map buildEnvironment() { envBuilder.put(ShellCommandConfig.ENV_JAVA_OPTS, shellCommandConfig.getTaskOpts().orElse("")); envBuilder.put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, shellCommandConfig.getAdditionalClasspathDir().orElse("")); - shellCommandConfig.getWorkerOpts() - .ifPresent(workerOpts -> envBuilder.put(ShellCommandConfig.WORKER_JVM_OPTS, workerOpts)); shellCommandConfig.getJavaHome().ifPresent(javaHome -> envBuilder.put(ShellCommandConfig.ENV_JAVA_HOME, javaHome)); return envBuilder.build(); } diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java index 997c5e6ca4..bf26eea6f7 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java @@ -236,7 +236,7 @@ public CompletableFuture> upload(CheckpointId checkpointId, CompletionStage snapshotIndexBlobIdFuture = snapshotIndexFuture .thenComposeAsync(si -> { - LOG.info("Uploading Snapshot index: {} for task: {} store: {}", si, taskName, storeName); + LOG.debug("Uploading Snapshot index: {} for task: {} store: {}", si, taskName, storeName); return blobStoreUtil.putSnapshotIndex(si); }, executor); @@ -296,7 +296,7 @@ public CompletableFuture cleanUp(CheckpointId checkpointId, Map removeTTLFuture = snapshotIndexFuture.thenComposeAsync(snapshotIndex -> { - LOG.info("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}", + LOG.debug("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}", snapshotIndexBlobId, taskName, storeName); return blobStoreUtil.removeTTL(snapshotIndexBlobId, snapshotIndex, requestMetadata); }, executor); @@ -305,7 +305,7 @@ public CompletableFuture cleanUp(CheckpointId checkpointId, Map cleanupRemoteSnapshotFuture = snapshotIndexFuture.thenComposeAsync(snapshotIndex -> { - LOG.info("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}", + LOG.debug("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}", snapshotIndexBlobId, taskName, storeName); return blobStoreUtil.cleanUpDir(snapshotIndex.getDirIndex(), requestMetadata); }, executor); @@ -317,7 +317,7 @@ public CompletableFuture cleanUp(CheckpointId checkpointId, Map { if (snapshotIndex.getPrevSnapshotIndexBlobId().isPresent()) { String blobId = snapshotIndex.getPrevSnapshotIndexBlobId().get(); - LOG.info("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.", + LOG.debug("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.", blobId, taskName, storeName); return blobStoreUtil.deleteSnapshotIndexBlob(blobId, requestMetadata); } else { diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java index b89f42a31e..9b84ac7560 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java @@ -219,7 +219,7 @@ public static BiPredicate areSameFile(boolean compareLargeFileC if (!compareLargeFileChecksums && isLargeFile) { // Since RocksDB SST files are immutable after creation, we can skip the expensive checksum computations // which requires reading the entire file. - LOG.debug("Local file: {} and remote file: {} are same. " + + LOG.debug("Local file: {} and remote file: {} both present. " + "Skipping checksum calculation for large file of size: {}.", localFile.getAbsolutePath(), remoteFile.getFileName(), localFileAttrs.size()); return true; @@ -234,7 +234,7 @@ public static BiPredicate areSameFile(boolean compareLargeFileC boolean areSameChecksum = localFileChecksum == remoteFile.getChecksum(); if (!areSameChecksum) { - LOG.warn("Local file: {} and remote file: {} are not same. " + + LOG.debug("Local file: {} and remote file: {} are not same. " + "Local checksum: {}. Remote checksum: {}", localFile.getAbsolutePath(), remoteFile.getFileName(), localFileChecksum, remoteFile.getChecksum()); } else { diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 89738e2de0..70d9ca3800 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -186,7 +186,7 @@ class TaskInstance( // WARNING: cleanUp is NOT optional with blob stores since this is where we reset the TTL for // tracked blobs. if this TTL reset is skipped, some of the blobs retained by future commits may // be deleted in the background by the blob store, leading to data loss. - info("Cleaning up stale state from previous run for taskName: %s" format taskName) + debug("Cleaning up stale state from previous run for taskName: %s" format taskName) commitManager.cleanUp(checkpointV2.getCheckpointId, checkpointV2.getStateCheckpointMarkers) } @@ -474,7 +474,7 @@ class TaskInstance( new Function[util.Map[String, util.Map[String, String]], CompletableFuture[Void]] { override def apply(uploadSCMs: util.Map[String, util.Map[String, String]]): CompletableFuture[Void] = { // Perform cleanup on unused checkpoints - info("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId)) + debug("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId)) val cleanUpStartTime = System.nanoTime() try { commitManager.cleanUp(checkpointId, uploadSCMs) diff --git a/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java b/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java index 4acba9487c..ca7be0e4a3 100644 --- a/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java +++ b/samza-core/src/test/java/org/apache/samza/job/TestShellCommandBuilder.java @@ -60,7 +60,6 @@ public void testBuildEnvironment() throws MalformedURLException { Config config = new MapConfig(new ImmutableMap.Builder() .put(ShellCommandConfig.COMMAND_SHELL_EXECUTE, "foo") .put(ShellCommandConfig.TASK_JVM_OPTS, "-Xmx4g") - .put(ShellCommandConfig.WORKER_JVM_OPTS, "-Xmx2g") .put(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR, "/path/to/additional/classpath") .put(ShellCommandConfig.TASK_JAVA_HOME, "/path/to/java/home") .build()); @@ -72,7 +71,6 @@ public void testBuildEnvironment() throws MalformedURLException { .put(ShellCommandConfig.ENV_CONTAINER_ID, "1") .put(ShellCommandConfig.ENV_COORDINATOR_URL, URL_STRING) .put(ShellCommandConfig.ENV_JAVA_OPTS, "-Xmx4g") - .put(ShellCommandConfig.WORKER_JVM_OPTS, "-Xmx2g") .put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "/path/to/additional/classpath") .put(ShellCommandConfig.ENV_JAVA_HOME, "/path/to/java/home") .build();