Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add info logs for delete operations #1595

Merged
merged 4 commits into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public CompletableFuture<Map<String, String>> upload(CheckpointId checkpointId,
CompletionStage<String> snapshotIndexBlobIdFuture =
snapshotIndexFuture
.thenComposeAsync(si -> {
LOG.trace("Uploading Snapshot index for task: {} store: {}", taskName, storeName);
LOG.info("Uploading Snapshot index: {} for task: {} store: {}", si, taskName, storeName);
return blobStoreUtil.putSnapshotIndex(si);
}, executor);

Expand Down Expand Up @@ -287,7 +287,7 @@ public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, St
// 1. remove TTL of index blob and all of its files and sub-dirs marked for retention
CompletionStage<Void> removeTTLFuture =
snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
LOG.debug("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}",
LOG.info("Removing TTL for index blob: {} and all of its files and sub-dirs for task: {} store :{}",
snapshotIndexBlobId, taskName, storeName);
return blobStoreUtil.removeTTL(snapshotIndexBlobId, snapshotIndex, requestMetadata);
}, executor);
Expand All @@ -296,7 +296,7 @@ public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, St
// 2. delete the files/subdirs marked for deletion in the snapshot index.
CompletionStage<Void> cleanupRemoteSnapshotFuture =
snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
LOG.debug("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}",
LOG.info("Deleting files and dirs to remove for current index blob: {} for task: {} store: {}",
snapshotIndexBlobId, taskName, storeName);
return blobStoreUtil.cleanUpDir(snapshotIndex.getDirIndex(), requestMetadata);
}, executor);
Expand All @@ -308,7 +308,7 @@ public CompletableFuture<Void> cleanUp(CheckpointId checkpointId, Map<String, St
snapshotIndexFuture.thenComposeAsync(snapshotIndex -> {
if (snapshotIndex.getPrevSnapshotIndexBlobId().isPresent()) {
String blobId = snapshotIndex.getPrevSnapshotIndexBlobId().get();
LOG.debug("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.",
LOG.info("Removing previous snapshot index blob: {} from blob store for task: {} store: {}.",
blobId, taskName, storeName);
return blobStoreUtil.deleteSnapshotIndexBlob(blobId, requestMetadata);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ static void deleteUnusedStoresFromBlobStore(String jobName, String jobId, String
List<CompletionStage<Void>> storeDeletionFutures = new ArrayList<>();
initialStoreSnapshotIndexes.forEach((storeName, scmAndSnapshotIndex) -> {
if (!storesToBackup.contains(storeName) && !storesToRestore.contains(storeName)) {
LOG.debug("Removing task: {} store: {} from blob store. It is either no longer used, " +
LOG.info("Removing task: {} store: {} from blob store. It is either no longer used, " +
"or is no longer configured to be backed up or restored with blob store.", taskName, storeName);
DirIndex dirIndex = scmAndSnapshotIndex.getRight().getDirIndex();
Metadata requestMetadata =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
package org.apache.samza.storage.blobstore.index;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;

Expand Down Expand Up @@ -48,6 +51,12 @@ public DirIndex(String dirName,
Preconditions.checkNotNull(filesRemoved);
Preconditions.checkNotNull(subDirsPresent);
Preconditions.checkNotNull(subDirsRemoved);
// Check to validate that a file is not present in file removed and file present list
Set<String> filesPresentSet = filesPresent.stream().map(FileIndex::getFileName).collect(Collectors.toSet());
Set<String> filesRemovedSet = filesRemoved.stream().map(FileIndex::getFileName).collect(Collectors.toSet());
Sets.SetView<String> presentAndRemovedFilesSet = Sets.intersection(filesPresentSet, filesRemovedSet);
Preconditions.checkState(presentAndRemovedFilesSet.isEmpty(),
String.format("File present in both filesPresent and filesRemoved set: %s", presentAndRemovedFilesSet));
this.dirName = dirName;
this.filesPresent = filesPresent;
this.filesRemoved = filesRemoved;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class TaskInstance(
// WARNING: cleanUp is NOT optional with blob stores since this is where we reset the TTL for
// tracked blobs. if this TTL reset is skipped, some of the blobs retained by future commits may
// be deleted in the background by the blob store, leading to data loss.
debug("Cleaning up stale state from previous run for taskName: %s" format taskName)
info("Cleaning up stale state from previous run for taskName: %s" format taskName)
commitManager.cleanUp(checkpointV2.getCheckpointId, checkpointV2.getStateCheckpointMarkers)
}
}
Expand Down Expand Up @@ -430,7 +430,7 @@ class TaskInstance(
new Function[util.Map[String, util.Map[String, String]], CompletableFuture[Void]] {
override def apply(uploadSCMs: util.Map[String, util.Map[String, String]]): CompletableFuture[Void] = {
// Perform cleanup on unused checkpoints
debug("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId))
info("Cleaning up old checkpoint state for taskName: %s checkpointId: %s" format(taskName, checkpointId))
val cleanUpStartTime = System.nanoTime()
try {
commitManager.cleanUp(checkpointId, uploadSCMs)
Expand Down