Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics for BlobNotFound error #1612

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.storage.blobstore.exceptions;

/**
* Future should complete with this exception to indicate that the exception occurred due to the request for an
* a blob that is not present.
*
*/
public class BlobNotFoundException extends RuntimeException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NotFoundException to match naming convention for DeletedException etc.


private static final long serialVersionUID = 1L;

public BlobNotFoundException(String message, Throwable cause) {
super(message, cause);
}

public BlobNotFoundException(String message) {
super(message);
}

public BlobNotFoundException(Throwable cause) {
super(cause);
}

public BlobNotFoundException() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.samza.storage.blobstore.index.SnapshotIndex;
import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
import org.apache.samza.storage.blobstore.metrics.BlobStoreManagerMetrics;
import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
import org.apache.samza.storage.blobstore.util.DirDiffUtil;
import org.apache.samza.util.Clock;
Expand All @@ -76,6 +77,7 @@ public class BlobStoreBackupManager implements TaskBackupManager {
private final BlobStoreManager blobStoreManager;
private final BlobStoreUtil blobStoreUtil;
private final BlobStoreBackupManagerMetrics metrics;
private final BlobStoreManagerMetrics blobStoreManagerMetrics;

/**
* Map of store name to a Pair of blob id of {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} from
Expand All @@ -101,8 +103,9 @@ public class BlobStoreBackupManager implements TaskBackupManager {
prevStoreSnapshotIndexesFuture;

public BlobStoreBackupManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel,
ExecutorService backupExecutor, BlobStoreBackupManagerMetrics blobStoreTaskBackupMetrics, Config config,
Clock clock, File loggedStoreBaseDir, StorageManagerUtil storageManagerUtil, BlobStoreManager blobStoreManager) {
ExecutorService backupExecutor, BlobStoreManagerMetrics blobStoreTaskManagerMetrics,
BlobStoreBackupManagerMetrics blobStoreTaskBackupMetrics, Config config, Clock clock, File loggedStoreBaseDir,
StorageManagerUtil storageManagerUtil, BlobStoreManager blobStoreManager) {
this.jobModel = jobModel;
this.jobName = new JobConfig(config).getName().get();
this.jobId = new JobConfig(config).getJobId();
Expand All @@ -122,6 +125,8 @@ public BlobStoreBackupManager(JobModel jobModel, ContainerModel containerModel,
this.prevStoreSnapshotIndexesFuture = CompletableFuture.completedFuture(ImmutableMap.of());
this.metrics = blobStoreTaskBackupMetrics;
metrics.initStoreMetrics(storesToBackup);
this.blobStoreManagerMetrics = blobStoreTaskManagerMetrics;
blobStoreTaskManagerMetrics.initStoreMetrics(storesToBackup);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are blob store manager metrics being passed and intialized here instead of in BlobStoreManager init?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for all other occurences.

}

@Override
Expand Down Expand Up @@ -332,7 +337,7 @@ public void close() {
@VisibleForTesting
protected BlobStoreUtil createBlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor,
BlobStoreBackupManagerMetrics metrics) {
return new BlobStoreUtil(blobStoreManager, executor, metrics, null);
return new BlobStoreUtil(blobStoreManager, executor, blobStoreManagerMetrics, metrics, null);
}

private void updateStoreDiffMetrics(String storeName, DirDiff.Stats stats) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.storage.StorageManagerUtil;
import org.apache.samza.storage.TaskRestoreManager;
import org.apache.samza.storage.blobstore.exceptions.BlobNotFoundException;
import org.apache.samza.storage.blobstore.index.DirIndex;
import org.apache.samza.storage.blobstore.index.SnapshotIndex;
import org.apache.samza.storage.blobstore.metrics.BlobStoreManagerMetrics;
import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics;
import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
import org.apache.samza.storage.blobstore.util.DirDiffUtil;
Expand Down Expand Up @@ -85,7 +87,8 @@ public class BlobStoreRestoreManager implements TaskRestoreManager {
private final File nonLoggedBaseDir;
private final String taskName;
private final Set<String> storesToRestore;
private final BlobStoreRestoreManagerMetrics metrics;
private final BlobStoreManagerMetrics blobStoreManagerMetrics;
private final BlobStoreRestoreManagerMetrics blobStoreRestoreManagerMetrics;

private BlobStoreManager blobStoreManager;

Expand All @@ -96,8 +99,8 @@ public class BlobStoreRestoreManager implements TaskRestoreManager {
private Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes;

public BlobStoreRestoreManager(TaskModel taskModel, ExecutorService restoreExecutor, Set<String> storesToRestore,
BlobStoreRestoreManagerMetrics metrics, Config config, File loggedBaseDir, File nonLoggedBaseDir,
StorageManagerUtil storageManagerUtil, BlobStoreManager blobStoreManager) {
BlobStoreManagerMetrics blobStoreManagerMetrics, BlobStoreRestoreManagerMetrics metrics, Config config,
File loggedBaseDir, File nonLoggedBaseDir, StorageManagerUtil storageManagerUtil, BlobStoreManager blobStoreManager) {
this.taskModel = taskModel;
this.jobName = new JobConfig(config).getName().get();
this.jobId = new JobConfig(config).getJobId();
Expand All @@ -107,14 +110,15 @@ public BlobStoreRestoreManager(TaskModel taskModel, ExecutorService restoreExecu
this.blobStoreConfig = new BlobStoreConfig(config);
this.storageManagerUtil = storageManagerUtil;
this.blobStoreManager = blobStoreManager;
this.blobStoreUtil = createBlobStoreUtil(blobStoreManager, executor, metrics);
this.blobStoreUtil = createBlobStoreUtil(blobStoreManager, executor, blobStoreManagerMetrics, metrics);
this.dirDiffUtil = new DirDiffUtil();
this.prevStoreSnapshotIndexes = new HashMap<>();
this.loggedBaseDir = loggedBaseDir;
this.nonLoggedBaseDir = nonLoggedBaseDir;
this.taskName = taskModel.getTaskName().getTaskName();
this.storesToRestore = storesToRestore;
this.metrics = metrics;
this.blobStoreRestoreManagerMetrics = metrics;
this.blobStoreManagerMetrics = blobStoreManagerMetrics;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, why is this updating metrics for an unrelated class?

}

@Override
Expand All @@ -126,16 +130,17 @@ public void init(Checkpoint checkpoint) {

// get previous SCMs from checkpoint
prevStoreSnapshotIndexes = blobStoreUtil.getStoreSnapshotIndexes(jobName, jobId, taskName, checkpoint, storesToRestore);
metrics.getSnapshotIndexNs.set(System.nanoTime() - startTime);
blobStoreRestoreManagerMetrics.getSnapshotIndexNs.set(System.nanoTime() - startTime);
LOG.trace("Found previous snapshot index during blob store restore manager init for task: {} to be: {}",
taskName, prevStoreSnapshotIndexes);

metrics.initStoreMetrics(storesToRestore);
blobStoreRestoreManagerMetrics.initStoreMetrics(storesToRestore);
blobStoreManagerMetrics.initStoreMetrics(storesToRestore);

// Note: blocks the caller thread.
deleteUnusedStoresFromBlobStore(jobName, jobId, taskName, storageConfig, blobStoreConfig, prevStoreSnapshotIndexes,
blobStoreUtil, executor);
metrics.initNs.set(System.nanoTime() - startTime);
blobStoreRestoreManagerMetrics.initNs.set(System.nanoTime() - startTime);
}

/**
Expand All @@ -150,7 +155,7 @@ public void init(Checkpoint checkpoint) {
@Override
public CompletableFuture<Void> restore() {
return restoreStores(jobName, jobId, taskModel.getTaskName(), storesToRestore, prevStoreSnapshotIndexes, loggedBaseDir,
storageConfig, metrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, executor);
storageConfig, blobStoreManagerMetrics, blobStoreRestoreManagerMetrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, executor);
}

@Override
Expand All @@ -160,8 +165,8 @@ public void close() {

@VisibleForTesting
protected BlobStoreUtil createBlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor,
BlobStoreRestoreManagerMetrics metrics) {
return new BlobStoreUtil(blobStoreManager, executor, null, metrics);
BlobStoreManagerMetrics blobStoreManagerMetrics, BlobStoreRestoreManagerMetrics metrics) {
return new BlobStoreUtil(blobStoreManager, executor, blobStoreManagerMetrics, null, metrics);
}

/**
Expand Down Expand Up @@ -209,9 +214,9 @@ static void deleteUnusedStoresFromBlobStore(String jobName, String jobId, String
@VisibleForTesting
static CompletableFuture<Void> restoreStores(String jobName, String jobId, TaskName taskName, Set<String> storesToRestore,
Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes,
File loggedBaseDir, StorageConfig storageConfig, BlobStoreRestoreManagerMetrics metrics,
StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil, DirDiffUtil dirDiffUtil,
ExecutorService executor) {
File loggedBaseDir, StorageConfig storageConfig, BlobStoreManagerMetrics blobStoreManagerMetrics,
BlobStoreRestoreManagerMetrics metrics, StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil,
DirDiffUtil dirDiffUtil, ExecutorService executor) {
long restoreStartTime = System.nanoTime();
List<CompletionStage<Void>> restoreFutures = new ArrayList<>();

Expand Down Expand Up @@ -266,7 +271,7 @@ static CompletableFuture<Void> restoreStores(String jobName, String jobId, TaskN

metrics.storePreRestoreNs.get(storeName).set(System.nanoTime() - storeRestoreStartTime);
enqueueRestore(jobName, jobId, taskName.toString(), storeName, storeDir, dirIndex, storeRestoreStartTime,
restoreFutures, blobStoreUtil, dirDiffUtil, metrics, executor);
restoreFutures, blobStoreUtil, dirDiffUtil, blobStoreManagerMetrics, metrics, executor);
} else {
LOG.debug("Renaming store checkpoint directory: {} to store directory: {} since its contents are identical " +
"to the remote snapshot.", storeCheckpointDir, storeDir);
Expand Down Expand Up @@ -327,25 +332,28 @@ static boolean shouldRestore(String taskName, String storeName, DirIndex dirInde
@VisibleForTesting
static void enqueueRestore(String jobName, String jobId, String taskName, String storeName, File storeDir, DirIndex dirIndex,
long storeRestoreStartTime, List<CompletionStage<Void>> restoreFutures, BlobStoreUtil blobStoreUtil,
DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics, ExecutorService executor) {
DirDiffUtil dirDiffUtil, BlobStoreManagerMetrics blobStoreManagerMetrics, BlobStoreRestoreManagerMetrics metrics,
ExecutorService executor) {

Metadata requestMetadata = new Metadata(storeDir.getAbsolutePath(), Optional.empty(), jobName, jobId, taskName, storeName);
CompletableFuture<Void> restoreFuture =
blobStoreUtil.restoreDir(storeDir, dirIndex, requestMetadata).thenRunAsync(() -> {
metrics.storeRestoreNs.get(storeName).set(System.nanoTime() - storeRestoreStartTime);

long postRestoreStartTime = System.nanoTime();
LOG.trace("Comparing restored store directory: {} and remote directory to verify restore.", storeDir);
if (!dirDiffUtil.areSameDir(FILES_TO_IGNORE, true).test(storeDir, dirIndex)) {
metrics.storePostRestoreNs.get(storeName).set(System.nanoTime() - postRestoreStartTime);
throw new SamzaException(
String.format("Restored store directory: %s contents " + "are not the same as the remote snapshot.",
storeDir.getAbsolutePath()));
} else {
metrics.storePostRestoreNs.get(storeName).set(System.nanoTime() - postRestoreStartTime);
LOG.info("Restore from remote snapshot completed for store: {}", storeDir);
}
}, executor);
blobStoreUtil.restoreDir(storeDir, dirIndex, requestMetadata)
.whenCompleteAsync((res, ex) -> updateMetricIfBlobStoreNotFoundError(storeName, blobStoreManagerMetrics, ex))
.thenRunAsync(() -> {
metrics.storeRestoreNs.get(storeName).set(System.nanoTime() - storeRestoreStartTime);

long postRestoreStartTime = System.nanoTime();
LOG.trace("Comparing restored store directory: {} and remote directory to verify restore.", storeDir);
if (!dirDiffUtil.areSameDir(FILES_TO_IGNORE, true).test(storeDir, dirIndex)) {
metrics.storePostRestoreNs.get(storeName).set(System.nanoTime() - postRestoreStartTime);
throw new SamzaException(
String.format("Restored store directory: %s contents " + "are not the same as the remote snapshot.",
storeDir.getAbsolutePath()));
} else {
metrics.storePostRestoreNs.get(storeName).set(System.nanoTime() - postRestoreStartTime);
LOG.info("Restore from remote snapshot completed for store: {}", storeDir);
}
}, executor);

restoreFutures.add(restoreFuture);
}
Expand All @@ -364,4 +372,12 @@ private static void deleteCheckpointDirs(TaskName taskName, String storeName, Fi
taskName, storeName), e);
}
}

private static void updateMetricIfBlobStoreNotFoundError(String storeName, BlobStoreManagerMetrics blobStoreManagerMetrics, Throwable throwable) {
Throwable maybeBlobNotFoundException = FutureUtil.unwrapExceptions(BlobNotFoundException.class, throwable);
if (maybeBlobNotFoundException instanceof BlobNotFoundException) {
blobStoreManagerMetrics.storeBlobNotFoundError.get(storeName)
.set(blobStoreManagerMetrics.storeBlobNotFoundError.get(storeName).getValue() + 1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.samza.storage.TaskBackupManager;
import org.apache.samza.storage.TaskRestoreManager;
import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
import org.apache.samza.storage.blobstore.metrics.BlobStoreManagerMetrics;
import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics;
import org.apache.samza.util.Clock;
import org.apache.samza.util.ReflectionUtil;
Expand All @@ -62,9 +63,10 @@ public TaskBackupManager getBackupManager(
Preconditions.checkState(StringUtils.isNotBlank(blobStoreManagerFactory));
BlobStoreManagerFactory factory = ReflectionUtil.getObj(blobStoreManagerFactory, BlobStoreManagerFactory.class);
BlobStoreManager blobStoreManager = factory.getBackupBlobStoreManager(config, backupExecutor);
BlobStoreBackupManagerMetrics metrics = new BlobStoreBackupManagerMetrics(metricsRegistry);
BlobStoreManagerMetrics blobStoreManagerMetrics = new BlobStoreManagerMetrics(metricsRegistry);
BlobStoreBackupManagerMetrics blobStoreBackupManagerMetrics = new BlobStoreBackupManagerMetrics(metricsRegistry);
return new BlobStoreBackupManager(jobContext.getJobModel(), containerModel, taskModel, backupExecutor,
metrics, config, clock, loggedStoreBaseDir, new StorageManagerUtil(), blobStoreManager);
blobStoreManagerMetrics, blobStoreBackupManagerMetrics, config, clock, loggedStoreBaseDir, new StorageManagerUtil(), blobStoreManager);
}

@Override
Expand All @@ -85,9 +87,10 @@ public TaskRestoreManager getRestoreManager(
Preconditions.checkState(StringUtils.isNotBlank(blobStoreManagerFactory));
BlobStoreManagerFactory factory = ReflectionUtil.getObj(blobStoreManagerFactory, BlobStoreManagerFactory.class);
BlobStoreManager blobStoreManager = factory.getRestoreBlobStoreManager(config, restoreExecutor);
BlobStoreRestoreManagerMetrics metrics = new BlobStoreRestoreManagerMetrics(metricsRegistry);
return new BlobStoreRestoreManager(taskModel, restoreExecutor, storesToRestore, metrics, config, loggedStoreBaseDir,
nonLoggedStoreBaseDir, new StorageManagerUtil(), blobStoreManager);
BlobStoreManagerMetrics blobStoreManagerMetrics = new BlobStoreManagerMetrics(metricsRegistry);
BlobStoreRestoreManagerMetrics blobStoreRestoreManagerMetrics = new BlobStoreRestoreManagerMetrics(metricsRegistry);
return new BlobStoreRestoreManager(taskModel, restoreExecutor, storesToRestore, blobStoreManagerMetrics,
blobStoreRestoreManagerMetrics, config, loggedStoreBaseDir, nonLoggedStoreBaseDir, new StorageManagerUtil(), blobStoreManager);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.storage.blobstore.metrics;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;


public class BlobStoreManagerMetrics {
private static final String GROUP = BlobStoreManagerMetrics.class.getName();
private final MetricsRegistry metricsRegistry;

public final Map<String, Gauge<Long>> storeBlobNotFoundError;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this the only metric handled specially in blob store manager? What about get/put/delete count and timers etc?


public BlobStoreManagerMetrics(MetricsRegistry metricsRegistry) {
this.metricsRegistry = metricsRegistry;

storeBlobNotFoundError = new ConcurrentHashMap<>();
}

public void initStoreMetrics(Collection<String> storeNames) {
for (String storeName : storeNames) {
storeBlobNotFoundError.putIfAbsent(storeName,
metricsRegistry.newGauge(GROUP, String.format("%s-blob-not-found", storeName), 0L));
}
}
}
Loading