diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java index c4eb56bf84..beaf45512d 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java @@ -227,7 +227,7 @@ public CompletableFuture> upload(CheckpointId checkpointId, CompletionStage snapshotIndexBlobIdFuture = snapshotIndexFuture .thenComposeAsync(si -> { - LOG.trace("Uploading Snapshot index for task: {} store: {}", taskName, storeName); + LOG.info("Uploading Snapshot index: {} for task: {} store: {}", si, taskName, storeName); return blobStoreUtil.putSnapshotIndex(si); }, executor); @@ -287,7 +287,7 @@ public CompletableFuture cleanUp(CheckpointId checkpointId, Map removeTTLFuture = snapshotIndexFuture.thenComposeAsync(snapshotIndex -> { - LOG.debug("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}", + LOG.info("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}", snapshotIndexBlobId, taskName, storeName); return blobStoreUtil.removeTTL(snapshotIndexBlobId, snapshotIndex, requestMetadata); }, executor); @@ -296,7 +296,7 @@ public CompletableFuture cleanUp(CheckpointId checkpointId, Map cleanupRemoteSnapshotFuture = snapshotIndexFuture.thenComposeAsync(snapshotIndex -> { - LOG.debug("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}", + LOG.info("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}", snapshotIndexBlobId, taskName, storeName); return blobStoreUtil.cleanUpDir(snapshotIndex.getDirIndex(), requestMetadata); }, executor); @@ -308,7 +308,7 @@ public CompletableFuture cleanUp(CheckpointId checkpointId, Map { if (snapshotIndex.getPrevSnapshotIndexBlobId().isPresent()) { String blobId = snapshotIndex.getPrevSnapshotIndexBlobId().get(); - LOG.debug("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.", + LOG.info("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.", blobId, taskName, storeName); return blobStoreUtil.deleteSnapshotIndexBlob(blobId, requestMetadata); } else { diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java index 22aaecb323..2429a0ae5c 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java @@ -184,7 +184,7 @@ static void deleteUnusedStoresFromBlobStore(String jobName, String jobId, String List> storeDeletionFutures = new ArrayList<>(); initialStoreSnapshotIndexes.forEach((storeName, scmAndSnapshotIndex) -> { if (!storesToBackup.contains(storeName) && !storesToRestore.contains(storeName)) { - LOG.debug("Removing task: {} store: {} from blob store. It is either no longer used, " + + LOG.info("Removing task: {} store: {} from blob store. It is either no longer used, " + "or is no longer configured to be backed up or restored with blob store.", taskName, storeName); DirIndex dirIndex = scmAndSnapshotIndex.getRight().getDirIndex(); Metadata requestMetadata = diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/index/DirIndex.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/index/DirIndex.java index 31819be54e..bdc3c7e074 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/index/DirIndex.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/index/DirIndex.java @@ -20,7 +20,10 @@ package org.apache.samza.storage.blobstore.index; import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; @@ -48,6 +51,12 @@ public DirIndex(String dirName, Preconditions.checkNotNull(filesRemoved); Preconditions.checkNotNull(subDirsPresent); Preconditions.checkNotNull(subDirsRemoved); + // Check to validate that a file is not present in file removed and file present list + Set filesPresentSet = filesPresent.stream().map(FileIndex::getFileName).collect(Collectors.toSet()); + Set filesRemovedSet = filesRemoved.stream().map(FileIndex::getFileName).collect(Collectors.toSet()); + Sets.SetView presentAndRemovedFilesSet = Sets.intersection(filesPresentSet, filesRemovedSet); + Preconditions.checkState(presentAndRemovedFilesSet.isEmpty(), + String.format("File present in both filesPresent and filesRemoved set: %s", presentAndRemovedFilesSet)); this.dirName = dirName; this.filesPresent = filesPresent; this.filesRemoved = filesRemoved; diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 40229e0f9d..929864203e 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -158,7 +158,7 @@ class TaskInstance( // WARNING: cleanUp is NOT optional with blob stores since this is where we reset the TTL for // tracked blobs. if this TTL reset is skipped, some of the blobs retained by future commits may // be deleted in the background by the blob store, leading to data loss. - debug("Cleaning up stale state from previous run for taskName: %s" format taskName) + info("Cleaning up stale state from previous run for taskName: %s" format taskName) commitManager.cleanUp(checkpointV2.getCheckpointId, checkpointV2.getStateCheckpointMarkers) } } @@ -430,7 +430,7 @@ class TaskInstance( new Function[util.Map[String, util.Map[String, String]], CompletableFuture[Void]] { override def apply(uploadSCMs: util.Map[String, util.Map[String, String]]): CompletableFuture[Void] = { // Perform cleanup on unused checkpoints - debug("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId)) + info("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId)) val cleanUpStartTime = System.nanoTime() try { commitManager.cleanUp(checkpointId, uploadSCMs)