Skip to content

Commit

Permalink
Merge pull request #1595 from shekhars-li/log-delete
Browse files Browse the repository at this point in the history
Add info logs for blob store delete operations
  • Loading branch information
prateekm authored Apr 5, 2022
2 parents 50500ed + 309e718 commit 53dbc48
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public CompletableFuture<Map<String, String>> upload(CheckpointId checkpointId,
CompletionStage<String> snapshotIndexBlobIdFuture =
snapshotIndexFuture
.thenComposeAsync(si -> {
LOG.trace("Uploading Snapshot index for task: {} store: {}", taskName, storeName);
LOG.info("Uploading Snapshot index: {} for task: {} store: {}", si, taskName, storeName);
return blobStoreUtil.putSnapshotIndex(si);
}, executor);

Expand Down Expand Up @@ -287,7 +287,7 @@ public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, St
// 1. remove TTL of index blob and all of its files and sub-dirs marked for retention
CompletionStage<Void> removeTTLFuture =
snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
LOG.debug("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}",
LOG.info("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}",
snapshotIndexBlobId, taskName, storeName);
return blobStoreUtil.removeTTL(snapshotIndexBlobId, snapshotIndex, requestMetadata);
}, executor);
Expand All @@ -296,7 +296,7 @@ public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, St
// 2. delete the files/subdirs marked for deletion in the snapshot index.
CompletionStage<Void> cleanupRemoteSnapshotFuture =
snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
LOG.debug("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}",
LOG.info("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}",
snapshotIndexBlobId, taskName, storeName);
return blobStoreUtil.cleanUpDir(snapshotIndex.getDirIndex(), requestMetadata);
}, executor);
Expand All @@ -308,7 +308,7 @@ public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, St
snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
if (snapshotIndex.getPrevSnapshotIndexBlobId().isPresent()) {
String blobId = snapshotIndex.getPrevSnapshotIndexBlobId().get();
LOG.debug("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.",
LOG.info("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.",
blobId, taskName, storeName);
return blobStoreUtil.deleteSnapshotIndexBlob(blobId, requestMetadata);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ static void deleteUnusedStoresFromBlobStore(String jobName, String jobId, String
List<CompletionStage<Void>> storeDeletionFutures = new ArrayList<>();
initialStoreSnapshotIndexes.forEach((storeName, scmAndSnapshotIndex) -> {
if (!storesToBackup.contains(storeName) && !storesToRestore.contains(storeName)) {
LOG.debug("Removing task: {} store: {} from blob store. It is either no longer used, " +
LOG.info("Removing task: {} store: {} from blob store. It is either no longer used, " +
"or is no longer configured to be backed up or restored with blob store.", taskName, storeName);
DirIndex dirIndex = scmAndSnapshotIndex.getRight().getDirIndex();
Metadata requestMetadata =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
package org.apache.samza.storage.blobstore.index;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;

Expand Down Expand Up @@ -48,6 +51,12 @@ public DirIndex(String dirName,
Preconditions.checkNotNull(filesRemoved);
Preconditions.checkNotNull(subDirsPresent);
Preconditions.checkNotNull(subDirsRemoved);
// Check to validate that a file is not present in file removed and file present list
Set<String> filesPresentSet = filesPresent.stream().map(FileIndex::getFileName).collect(Collectors.toSet());
Set<String> filesRemovedSet = filesRemoved.stream().map(FileIndex::getFileName).collect(Collectors.toSet());
Sets.SetView<String> presentAndRemovedFilesSet = Sets.intersection(filesPresentSet, filesRemovedSet);
Preconditions.checkState(presentAndRemovedFilesSet.isEmpty(),
String.format("File present in both filesPresent and filesRemoved set: %s", presentAndRemovedFilesSet));
this.dirName = dirName;
this.filesPresent = filesPresent;
this.filesRemoved = filesRemoved;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class TaskInstance(
// WARNING: cleanUp is NOT optional with blob stores since this is where we reset the TTL for
// tracked blobs. if this TTL reset is skipped, some of the blobs retained by future commits may
// be deleted in the background by the blob store, leading to data loss.
debug("Cleaning up stale state from previous run for taskName: %s" format taskName)
info("Cleaning up stale state from previous run for taskName: %s" format taskName)
commitManager.cleanUp(checkpointV2.getCheckpointId, checkpointV2.getStateCheckpointMarkers)
}
}
Expand Down Expand Up @@ -430,7 +430,7 @@ class TaskInstance(
new Function[util.Map[String, util.Map[String, String]], CompletableFuture[Void]] {
override def apply(uploadSCMs: util.Map[String, util.Map[String, String]]): CompletableFuture[Void] = {
// Perform cleanup on unused checkpoints
debug("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId))
info("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId))
val cleanUpStartTime = System.nanoTime()
try {
commitManager.cleanUp(checkpointId, uploadSCMs)
Expand Down

0 comments on commit 53dbc48

Please sign in to comment.