diff --git a/samza-api/src/main/java/org/apache/samza/storage/blobstore/exceptions/BlobNotFoundException.java b/samza-api/src/main/java/org/apache/samza/storage/blobstore/exceptions/BlobNotFoundException.java new file mode 100644 index 0000000000..007015c1cd --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/storage/blobstore/exceptions/BlobNotFoundException.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.blobstore.exceptions; + +/** + * Future should complete with this exception to indicate that the exception occurred due to the request for an + * a blob that is not present. + * + */ +public class BlobNotFoundException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public BlobNotFoundException(String message, Throwable cause) { + super(message, cause); + } + + public BlobNotFoundException(String message) { + super(message); + } + + public BlobNotFoundException(Throwable cause) { + super(cause); + } + + public BlobNotFoundException() { + } +} diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java index beaf45512d..9beb355a4f 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java @@ -50,6 +50,7 @@ import org.apache.samza.storage.blobstore.index.SnapshotIndex; import org.apache.samza.storage.blobstore.index.SnapshotMetadata; import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics; +import org.apache.samza.storage.blobstore.metrics.BlobStoreManagerMetrics; import org.apache.samza.storage.blobstore.util.BlobStoreUtil; import org.apache.samza.storage.blobstore.util.DirDiffUtil; import org.apache.samza.util.Clock; @@ -76,6 +77,7 @@ public class BlobStoreBackupManager implements TaskBackupManager { private final BlobStoreManager blobStoreManager; private final BlobStoreUtil blobStoreUtil; private final BlobStoreBackupManagerMetrics metrics; + private final BlobStoreManagerMetrics blobStoreManagerMetrics; /** * Map of store name to a Pair of blob id of {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} from @@ -101,8 +103,9 @@ public class BlobStoreBackupManager implements TaskBackupManager { prevStoreSnapshotIndexesFuture; public BlobStoreBackupManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel, - ExecutorService backupExecutor, BlobStoreBackupManagerMetrics blobStoreTaskBackupMetrics, Config config, - Clock clock, File loggedStoreBaseDir, StorageManagerUtil storageManagerUtil, BlobStoreManager blobStoreManager) { + ExecutorService backupExecutor, BlobStoreManagerMetrics blobStoreTaskManagerMetrics, + BlobStoreBackupManagerMetrics blobStoreTaskBackupMetrics, Config config, Clock clock, File loggedStoreBaseDir, + StorageManagerUtil storageManagerUtil, BlobStoreManager blobStoreManager) { this.jobModel = jobModel; this.jobName = new JobConfig(config).getName().get(); this.jobId = new JobConfig(config).getJobId(); @@ -122,6 +125,8 @@ public BlobStoreBackupManager(JobModel jobModel, ContainerModel containerModel, this.prevStoreSnapshotIndexesFuture = CompletableFuture.completedFuture(ImmutableMap.of()); this.metrics = blobStoreTaskBackupMetrics; metrics.initStoreMetrics(storesToBackup); + this.blobStoreManagerMetrics = blobStoreTaskManagerMetrics; + blobStoreTaskManagerMetrics.initStoreMetrics(storesToBackup); } @Override @@ -332,7 +337,7 @@ public void close() { @VisibleForTesting protected BlobStoreUtil createBlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor, BlobStoreBackupManagerMetrics metrics) { - return new BlobStoreUtil(blobStoreManager, executor, metrics, null); + return new BlobStoreUtil(blobStoreManager, executor, blobStoreManagerMetrics, metrics, null); } private void updateStoreDiffMetrics(String storeName, DirDiff.Stats stats) { diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java index 2429a0ae5c..49b2e6519a 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java @@ -49,8 +49,10 @@ import org.apache.samza.job.model.TaskModel; import org.apache.samza.storage.StorageManagerUtil; import org.apache.samza.storage.TaskRestoreManager; +import org.apache.samza.storage.blobstore.exceptions.BlobNotFoundException; import org.apache.samza.storage.blobstore.index.DirIndex; import org.apache.samza.storage.blobstore.index.SnapshotIndex; +import org.apache.samza.storage.blobstore.metrics.BlobStoreManagerMetrics; import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics; import org.apache.samza.storage.blobstore.util.BlobStoreUtil; import org.apache.samza.storage.blobstore.util.DirDiffUtil; @@ -85,7 +87,8 @@ public class BlobStoreRestoreManager implements TaskRestoreManager { private final File nonLoggedBaseDir; private final String taskName; private final Set storesToRestore; - private final BlobStoreRestoreManagerMetrics metrics; + private final BlobStoreManagerMetrics blobStoreManagerMetrics; + private final BlobStoreRestoreManagerMetrics blobStoreRestoreManagerMetrics; private BlobStoreManager blobStoreManager; @@ -96,8 +99,8 @@ public class BlobStoreRestoreManager implements TaskRestoreManager { private Map> prevStoreSnapshotIndexes; public BlobStoreRestoreManager(TaskModel taskModel, ExecutorService restoreExecutor, Set storesToRestore, - BlobStoreRestoreManagerMetrics metrics, Config config, File loggedBaseDir, File nonLoggedBaseDir, - StorageManagerUtil storageManagerUtil, BlobStoreManager blobStoreManager) { + BlobStoreManagerMetrics blobStoreManagerMetrics, BlobStoreRestoreManagerMetrics metrics, Config config, + File loggedBaseDir, File nonLoggedBaseDir, StorageManagerUtil storageManagerUtil, BlobStoreManager blobStoreManager) { this.taskModel = taskModel; this.jobName = new JobConfig(config).getName().get(); this.jobId = new JobConfig(config).getJobId(); @@ -107,14 +110,15 @@ public BlobStoreRestoreManager(TaskModel taskModel, ExecutorService restoreExecu this.blobStoreConfig = new BlobStoreConfig(config); this.storageManagerUtil = storageManagerUtil; this.blobStoreManager = blobStoreManager; - this.blobStoreUtil = createBlobStoreUtil(blobStoreManager, executor, metrics); + this.blobStoreUtil = createBlobStoreUtil(blobStoreManager, executor, blobStoreManagerMetrics, metrics); this.dirDiffUtil = new DirDiffUtil(); this.prevStoreSnapshotIndexes = new HashMap<>(); this.loggedBaseDir = loggedBaseDir; this.nonLoggedBaseDir = nonLoggedBaseDir; this.taskName = taskModel.getTaskName().getTaskName(); this.storesToRestore = storesToRestore; - this.metrics = metrics; + this.blobStoreRestoreManagerMetrics = metrics; + this.blobStoreManagerMetrics = blobStoreManagerMetrics; } @Override @@ -126,16 +130,17 @@ public void init(Checkpoint checkpoint) { // get previous SCMs from checkpoint prevStoreSnapshotIndexes = blobStoreUtil.getStoreSnapshotIndexes(jobName, jobId, taskName, checkpoint, storesToRestore); - metrics.getSnapshotIndexNs.set(System.nanoTime() - startTime); + blobStoreRestoreManagerMetrics.getSnapshotIndexNs.set(System.nanoTime() - startTime); LOG.trace("Found previous snapshot index during blob store restore manager init for task: {} to be: {}", taskName, prevStoreSnapshotIndexes); - metrics.initStoreMetrics(storesToRestore); + blobStoreRestoreManagerMetrics.initStoreMetrics(storesToRestore); + blobStoreManagerMetrics.initStoreMetrics(storesToRestore); // Note: blocks the caller thread. deleteUnusedStoresFromBlobStore(jobName, jobId, taskName, storageConfig, blobStoreConfig, prevStoreSnapshotIndexes, blobStoreUtil, executor); - metrics.initNs.set(System.nanoTime() - startTime); + blobStoreRestoreManagerMetrics.initNs.set(System.nanoTime() - startTime); } /** @@ -150,7 +155,7 @@ public void init(Checkpoint checkpoint) { @Override public CompletableFuture restore() { return restoreStores(jobName, jobId, taskModel.getTaskName(), storesToRestore, prevStoreSnapshotIndexes, loggedBaseDir, - storageConfig, metrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, executor); + storageConfig, blobStoreManagerMetrics, blobStoreRestoreManagerMetrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, executor); } @Override @@ -160,8 +165,8 @@ public void close() { @VisibleForTesting protected BlobStoreUtil createBlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor, - BlobStoreRestoreManagerMetrics metrics) { - return new BlobStoreUtil(blobStoreManager, executor, null, metrics); + BlobStoreManagerMetrics blobStoreManagerMetrics, BlobStoreRestoreManagerMetrics metrics) { + return new BlobStoreUtil(blobStoreManager, executor, blobStoreManagerMetrics, null, metrics); } /** @@ -209,9 +214,9 @@ static void deleteUnusedStoresFromBlobStore(String jobName, String jobId, String @VisibleForTesting static CompletableFuture restoreStores(String jobName, String jobId, TaskName taskName, Set storesToRestore, Map> prevStoreSnapshotIndexes, - File loggedBaseDir, StorageConfig storageConfig, BlobStoreRestoreManagerMetrics metrics, - StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil, DirDiffUtil dirDiffUtil, - ExecutorService executor) { + File loggedBaseDir, StorageConfig storageConfig, BlobStoreManagerMetrics blobStoreManagerMetrics, + BlobStoreRestoreManagerMetrics metrics, StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil, + DirDiffUtil dirDiffUtil, ExecutorService executor) { long restoreStartTime = System.nanoTime(); List> restoreFutures = new ArrayList<>(); @@ -266,7 +271,7 @@ static CompletableFuture restoreStores(String jobName, String jobId, TaskN metrics.storePreRestoreNs.get(storeName).set(System.nanoTime() - storeRestoreStartTime); enqueueRestore(jobName, jobId, taskName.toString(), storeName, storeDir, dirIndex, storeRestoreStartTime, - restoreFutures, blobStoreUtil, dirDiffUtil, metrics, executor); + restoreFutures, blobStoreUtil, dirDiffUtil, blobStoreManagerMetrics, metrics, executor); } else { LOG.debug("Renaming store checkpoint directory: {} to store directory: {} since its contents are identical " + "to the remote snapshot.", storeCheckpointDir, storeDir); @@ -327,25 +332,28 @@ static boolean shouldRestore(String taskName, String storeName, DirIndex dirInde @VisibleForTesting static void enqueueRestore(String jobName, String jobId, String taskName, String storeName, File storeDir, DirIndex dirIndex, long storeRestoreStartTime, List> restoreFutures, BlobStoreUtil blobStoreUtil, - DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics, ExecutorService executor) { + DirDiffUtil dirDiffUtil, BlobStoreManagerMetrics blobStoreManagerMetrics, BlobStoreRestoreManagerMetrics metrics, + ExecutorService executor) { Metadata requestMetadata = new Metadata(storeDir.getAbsolutePath(), Optional.empty(), jobName, jobId, taskName, storeName); CompletableFuture restoreFuture = - blobStoreUtil.restoreDir(storeDir, dirIndex, requestMetadata).thenRunAsync(() -> { - metrics.storeRestoreNs.get(storeName).set(System.nanoTime() - storeRestoreStartTime); - - long postRestoreStartTime = System.nanoTime(); - LOG.trace("Comparing restored store directory: {} and remote directory to verify restore.", storeDir); - if (!dirDiffUtil.areSameDir(FILES_TO_IGNORE, true).test(storeDir, dirIndex)) { - metrics.storePostRestoreNs.get(storeName).set(System.nanoTime() - postRestoreStartTime); - throw new SamzaException( - String.format("Restored store directory: %s contents " + "are not the same as the remote snapshot.", - storeDir.getAbsolutePath())); - } else { - metrics.storePostRestoreNs.get(storeName).set(System.nanoTime() - postRestoreStartTime); - LOG.info("Restore from remote snapshot completed for store: {}", storeDir); - } - }, executor); + blobStoreUtil.restoreDir(storeDir, dirIndex, requestMetadata) + .whenCompleteAsync((res, ex) -> updateMetricIfBlobStoreNotFoundError(storeName, blobStoreManagerMetrics, ex)) + .thenRunAsync(() -> { + metrics.storeRestoreNs.get(storeName).set(System.nanoTime() - storeRestoreStartTime); + + long postRestoreStartTime = System.nanoTime(); + LOG.trace("Comparing restored store directory: {} and remote directory to verify restore.", storeDir); + if (!dirDiffUtil.areSameDir(FILES_TO_IGNORE, true).test(storeDir, dirIndex)) { + metrics.storePostRestoreNs.get(storeName).set(System.nanoTime() - postRestoreStartTime); + throw new SamzaException( + String.format("Restored store directory: %s contents " + "are not the same as the remote snapshot.", + storeDir.getAbsolutePath())); + } else { + metrics.storePostRestoreNs.get(storeName).set(System.nanoTime() - postRestoreStartTime); + LOG.info("Restore from remote snapshot completed for store: {}", storeDir); + } + }, executor); restoreFutures.add(restoreFuture); } @@ -364,4 +372,12 @@ private static void deleteCheckpointDirs(TaskName taskName, String storeName, Fi taskName, storeName), e); } } + + private static void updateMetricIfBlobStoreNotFoundError(String storeName, BlobStoreManagerMetrics blobStoreManagerMetrics, Throwable throwable) { + Throwable maybeBlobNotFoundException = FutureUtil.unwrapExceptions(BlobNotFoundException.class, throwable); + if (maybeBlobNotFoundException instanceof BlobNotFoundException) { + blobStoreManagerMetrics.storeBlobNotFoundError.get(storeName) + .set(blobStoreManagerMetrics.storeBlobNotFoundError.get(storeName).getValue() + 1); + } + } } diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java index e2512b4890..bb3c08a285 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java @@ -40,6 +40,7 @@ import org.apache.samza.storage.TaskBackupManager; import org.apache.samza.storage.TaskRestoreManager; import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics; +import org.apache.samza.storage.blobstore.metrics.BlobStoreManagerMetrics; import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics; import org.apache.samza.util.Clock; import org.apache.samza.util.ReflectionUtil; @@ -62,9 +63,10 @@ public TaskBackupManager getBackupManager( Preconditions.checkState(StringUtils.isNotBlank(blobStoreManagerFactory)); BlobStoreManagerFactory factory = ReflectionUtil.getObj(blobStoreManagerFactory, BlobStoreManagerFactory.class); BlobStoreManager blobStoreManager = factory.getBackupBlobStoreManager(config, backupExecutor); - BlobStoreBackupManagerMetrics metrics = new BlobStoreBackupManagerMetrics(metricsRegistry); + BlobStoreManagerMetrics blobStoreManagerMetrics = new BlobStoreManagerMetrics(metricsRegistry); + BlobStoreBackupManagerMetrics blobStoreBackupManagerMetrics = new BlobStoreBackupManagerMetrics(metricsRegistry); return new BlobStoreBackupManager(jobContext.getJobModel(), containerModel, taskModel, backupExecutor, - metrics, config, clock, loggedStoreBaseDir, new StorageManagerUtil(), blobStoreManager); + blobStoreManagerMetrics, blobStoreBackupManagerMetrics, config, clock, loggedStoreBaseDir, new StorageManagerUtil(), blobStoreManager); } @Override @@ -85,9 +87,10 @@ public TaskRestoreManager getRestoreManager( Preconditions.checkState(StringUtils.isNotBlank(blobStoreManagerFactory)); BlobStoreManagerFactory factory = ReflectionUtil.getObj(blobStoreManagerFactory, BlobStoreManagerFactory.class); BlobStoreManager blobStoreManager = factory.getRestoreBlobStoreManager(config, restoreExecutor); - BlobStoreRestoreManagerMetrics metrics = new BlobStoreRestoreManagerMetrics(metricsRegistry); - return new BlobStoreRestoreManager(taskModel, restoreExecutor, storesToRestore, metrics, config, loggedStoreBaseDir, - nonLoggedStoreBaseDir, new StorageManagerUtil(), blobStoreManager); + BlobStoreManagerMetrics blobStoreManagerMetrics = new BlobStoreManagerMetrics(metricsRegistry); + BlobStoreRestoreManagerMetrics blobStoreRestoreManagerMetrics = new BlobStoreRestoreManagerMetrics(metricsRegistry); + return new BlobStoreRestoreManager(taskModel, restoreExecutor, storesToRestore, blobStoreManagerMetrics, + blobStoreRestoreManagerMetrics, config, loggedStoreBaseDir, nonLoggedStoreBaseDir, new StorageManagerUtil(), blobStoreManager); } @Override diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/metrics/BlobStoreManagerMetrics.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/metrics/BlobStoreManagerMetrics.java new file mode 100644 index 0000000000..2f04bf82d8 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/metrics/BlobStoreManagerMetrics.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.blobstore.metrics; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.samza.metrics.Gauge; +import org.apache.samza.metrics.MetricsRegistry; + + +public class BlobStoreManagerMetrics { + private static final String GROUP = BlobStoreManagerMetrics.class.getName(); + private final MetricsRegistry metricsRegistry; + + public final Map> storeBlobNotFoundError; + + public BlobStoreManagerMetrics(MetricsRegistry metricsRegistry) { + this.metricsRegistry = metricsRegistry; + + storeBlobNotFoundError = new ConcurrentHashMap<>(); + } + + public void initStoreMetrics(Collection storeNames) { + for (String storeName : storeNames) { + storeBlobNotFoundError.putIfAbsent(storeName, + metricsRegistry.newGauge(GROUP, String.format("%s-blob-not-found", storeName), 0L)); + } + } +} diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java index 7e9553c2d8..a3cff268dd 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java @@ -57,6 +57,7 @@ import org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory; import org.apache.samza.storage.blobstore.Metadata; import org.apache.samza.storage.blobstore.diff.DirDiff; +import org.apache.samza.storage.blobstore.exceptions.BlobNotFoundException; import org.apache.samza.storage.blobstore.exceptions.DeletedException; import org.apache.samza.storage.blobstore.exceptions.RetriableException; import org.apache.samza.storage.blobstore.index.DirIndex; @@ -67,6 +68,7 @@ import org.apache.samza.storage.blobstore.index.SnapshotMetadata; import org.apache.samza.storage.blobstore.index.serde.SnapshotIndexSerde; import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics; +import org.apache.samza.storage.blobstore.metrics.BlobStoreManagerMetrics; import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics; import org.apache.samza.util.FutureUtil; import org.slf4j.Logger; @@ -82,14 +84,17 @@ public class BlobStoreUtil { private final BlobStoreManager blobStoreManager; private final ExecutorService executor; + private final BlobStoreManagerMetrics blobStoreManagerMetrics; private final BlobStoreBackupManagerMetrics backupMetrics; private final BlobStoreRestoreManagerMetrics restoreMetrics; private final SnapshotIndexSerde snapshotIndexSerde; public BlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor, - BlobStoreBackupManagerMetrics backupMetrics, BlobStoreRestoreManagerMetrics restoreMetrics) { + BlobStoreManagerMetrics blobStoreManagerMetrics, BlobStoreBackupManagerMetrics backupMetrics, + BlobStoreRestoreManagerMetrics restoreMetrics) { this.blobStoreManager = blobStoreManager; this.executor = executor; + this.blobStoreManagerMetrics = blobStoreManagerMetrics; this.backupMetrics = backupMetrics; this.restoreMetrics = restoreMetrics; this.snapshotIndexSerde = new SnapshotIndexSerde(); @@ -156,14 +161,18 @@ public Map> getStoreSnapshotIndexes( } try { - return FutureUtil.toFutureOfMap(t -> { + return FutureUtil.toFutureOfMap((k, t) -> { Throwable unwrappedException = FutureUtil.unwrapExceptions(CompletionException.class, t); if (unwrappedException instanceof DeletedException) { LOG.warn("Ignoring already deleted snapshot index for taskName: {}", taskName, t); return true; - } else { - return false; } + Throwable maybeBlobNotFoundException = FutureUtil.unwrapExceptions(BlobNotFoundException.class, t); + if (maybeBlobNotFoundException instanceof BlobNotFoundException) { + blobStoreManagerMetrics.storeBlobNotFoundError.get(k) + .set(blobStoreManagerMetrics.storeBlobNotFoundError.get(k).getValue() + 1); + } + return false; }, storeSnapshotIndexFutures).join(); } catch (Exception e) { throw new SamzaException( diff --git a/samza-core/src/main/java/org/apache/samza/util/FutureUtil.java b/samza-core/src/main/java/org/apache/samza/util/FutureUtil.java index f2da896824..19480b0413 100644 --- a/samza-core/src/main/java/org/apache/samza/util/FutureUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/FutureUtil.java @@ -19,6 +19,8 @@ package org.apache.samza.util; +import java.util.concurrent.CompletionException; +import java.util.function.BiPredicate; import net.jodah.failsafe.Failsafe; import net.jodah.failsafe.RetryPolicy; @@ -118,7 +120,7 @@ public static CompletableFuture> toFutureOfMap(Map CompletableFuture> toFutureOfMap( - Predicate ignoreError, Map> keyToValueFutures) { + BiPredicate ignoreError, Map> keyToValueFutures) { CompletableFuture allEntriesFuture = CompletableFuture.allOf(keyToValueFutures.values().toArray(new CompletableFuture[]{})); @@ -130,7 +132,7 @@ public static CompletableFuture> toFutureOfMap( V value = entry.getValue().join(); successfulResults.put(key, value); } catch (Throwable th) { - if (ignoreError.test(th)) { + if (ignoreError.test(key, th)) { // else ignore and continue LOG.warn("Ignoring value future completion error for key: {}", key, th); } else { diff --git a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java index ac64b43507..c2f225eeeb 100644 --- a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java +++ b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java @@ -64,6 +64,7 @@ import org.apache.samza.storage.blobstore.index.SnapshotIndex; import org.apache.samza.storage.blobstore.index.SnapshotMetadata; import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics; +import org.apache.samza.storage.blobstore.metrics.BlobStoreManagerMetrics; import org.apache.samza.storage.blobstore.util.BlobStoreTestUtil; import org.apache.samza.storage.blobstore.util.BlobStoreUtil; import org.apache.samza.storage.blobstore.util.DirDiffUtil; @@ -108,6 +109,7 @@ public class TestBlobStoreBackupManager { private final Gauge atomicLongGauge = mock(Gauge.class); private BlobStoreBackupManager blobStoreBackupManager; + private BlobStoreManagerMetrics blobStoreTaskManagerMetrics; private BlobStoreBackupManagerMetrics blobStoreTaskBackupMetrics; // Remote and local snapshot definitions @@ -147,10 +149,12 @@ public void setup() throws Exception { when(atomicLongGauge.getValue()).thenReturn(new AtomicLong()); when(metricsRegistry.newTimer(anyString(), anyString())).thenReturn(timer); blobStoreTaskBackupMetrics = new BlobStoreBackupManagerMetrics(metricsRegistry); + blobStoreTaskManagerMetrics = new BlobStoreManagerMetrics(metricsRegistry); + blobStoreBackupManager = new MockBlobStoreBackupManager(jobModel, containerModel, taskModel, mockExecutor, - blobStoreTaskBackupMetrics, config, + blobStoreTaskManagerMetrics, blobStoreTaskBackupMetrics, config, Files.createTempDirectory("logged-store-").toFile(), storageManagerUtil, blobStoreManager); } @@ -524,10 +528,11 @@ private Map> setupRemoteAndLocalSnapshots(bo private class MockBlobStoreBackupManager extends BlobStoreBackupManager { public MockBlobStoreBackupManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel, - ExecutorService backupExecutor, BlobStoreBackupManagerMetrics blobStoreTaskBackupMetrics, Config config, + ExecutorService backupExecutor, BlobStoreManagerMetrics blobStoreManagerMetrics, + BlobStoreBackupManagerMetrics blobStoreTaskBackupMetrics, Config config, File loggedStoreBaseDir, StorageManagerUtil storageManagerUtil, BlobStoreManager blobStoreManager) { - super(jobModel, containerModel, taskModel, backupExecutor, blobStoreTaskBackupMetrics, config, clock, + super(jobModel, containerModel, taskModel, backupExecutor, blobStoreManagerMetrics, blobStoreTaskBackupMetrics, config, clock, loggedStoreBaseDir, storageManagerUtil, blobStoreManager); } diff --git a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java index ddc0c8e19a..57345f377d 100644 --- a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java +++ b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java @@ -44,6 +44,7 @@ import org.apache.samza.storage.blobstore.index.DirIndex; import org.apache.samza.storage.blobstore.index.SnapshotIndex; import org.apache.samza.storage.blobstore.index.SnapshotMetadata; +import org.apache.samza.storage.blobstore.metrics.BlobStoreManagerMetrics; import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics; import org.apache.samza.storage.blobstore.util.BlobStoreTestUtil; import org.apache.samza.storage.blobstore.util.BlobStoreUtil; @@ -175,8 +176,9 @@ public void testRestoreDeletesStoreDir() throws IOException { String jobName = "testJobName"; String jobId = "testJobId"; TaskName taskName = mock(TaskName.class); - BlobStoreRestoreManagerMetrics metrics = new BlobStoreRestoreManagerMetrics(new MetricsRegistryMap()); - metrics.initStoreMetrics(ImmutableList.of("storeName")); + BlobStoreManagerMetrics blobStoreManagerMetrics = new BlobStoreManagerMetrics(new MetricsRegistryMap()); + BlobStoreRestoreManagerMetrics blobStoreRestoreManagerMetrics = new BlobStoreRestoreManagerMetrics(new MetricsRegistryMap()); + blobStoreRestoreManagerMetrics.initStoreMetrics(ImmutableList.of("storeName")); Set storesToRestore = ImmutableSet.of("storeName"); SnapshotIndex snapshotIndex = mock(SnapshotIndex.class); Map> prevStoreSnapshotIndexes = @@ -206,7 +208,7 @@ public void testRestoreDeletesStoreDir() throws IOException { when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, arg2) -> true); BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, storesToRestore, prevStoreSnapshotIndexes, - loggedBaseDir.toFile(), storageConfig, metrics, + loggedBaseDir.toFile(), storageConfig, blobStoreManagerMetrics, blobStoreRestoreManagerMetrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR); // verify that the store directory restore was called and skipped (i.e. shouldRestore == true) @@ -221,8 +223,9 @@ public void testRestoreDeletesCheckpointDirsIfRestoring() throws IOException { String jobName = "testJobName"; String jobId = "testJobId"; TaskName taskName = mock(TaskName.class); - BlobStoreRestoreManagerMetrics metrics = new BlobStoreRestoreManagerMetrics(new MetricsRegistryMap()); - metrics.initStoreMetrics(ImmutableList.of("storeName")); + BlobStoreManagerMetrics blobStoreManagerMetrics = new BlobStoreManagerMetrics(new MetricsRegistryMap()); + BlobStoreRestoreManagerMetrics blobStoreRestoreManagerMetrics = new BlobStoreRestoreManagerMetrics(new MetricsRegistryMap()); + blobStoreRestoreManagerMetrics.initStoreMetrics(ImmutableList.of("storeName")); Set storesToRestore = ImmutableSet.of("storeName"); SnapshotIndex snapshotIndex = mock(SnapshotIndex.class); Map> prevStoreSnapshotIndexes = @@ -258,7 +261,7 @@ public void testRestoreDeletesCheckpointDirsIfRestoring() throws IOException { .thenReturn(CompletableFuture.completedFuture(null)); BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, storesToRestore, prevStoreSnapshotIndexes, - loggedBaseDir.toFile(), storageConfig, metrics, + loggedBaseDir.toFile(), storageConfig, blobStoreManagerMetrics, blobStoreRestoreManagerMetrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR); // verify that the store directory restore was called and skipped (i.e. shouldRestore == true) @@ -273,8 +276,9 @@ public void testRestoreRetainsCheckpointDirsIfValid() throws IOException { String jobName = "testJobName"; String jobId = "testJobId"; TaskName taskName = mock(TaskName.class); - BlobStoreRestoreManagerMetrics metrics = new BlobStoreRestoreManagerMetrics(new MetricsRegistryMap()); - metrics.initStoreMetrics(ImmutableList.of("storeName")); + BlobStoreManagerMetrics blobStoreManagerMetrics = new BlobStoreManagerMetrics(new MetricsRegistryMap()); + BlobStoreRestoreManagerMetrics blobStoreRestoreManagerMetrics = new BlobStoreRestoreManagerMetrics(new MetricsRegistryMap()); + blobStoreRestoreManagerMetrics.initStoreMetrics(ImmutableList.of("storeName")); Set storesToRestore = ImmutableSet.of("storeName"); SnapshotIndex snapshotIndex = mock(SnapshotIndex.class); Map> prevStoreSnapshotIndexes = @@ -314,7 +318,7 @@ public void testRestoreRetainsCheckpointDirsIfValid() throws IOException { .thenReturn(CompletableFuture.completedFuture(null)); BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, storesToRestore, prevStoreSnapshotIndexes, - loggedBaseDir.toFile(), storageConfig, metrics, + loggedBaseDir.toFile(), storageConfig, blobStoreManagerMetrics, blobStoreRestoreManagerMetrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR); // verify that the store directory restore was not called (should have restored from checkpoint dir) @@ -331,8 +335,9 @@ public void testRestoreSkipsStoresWithMissingCheckpointSCM() { String jobName = "testJobName"; String jobId = "testJobId"; TaskName taskName = mock(TaskName.class); - BlobStoreRestoreManagerMetrics metrics = new BlobStoreRestoreManagerMetrics(new MetricsRegistryMap()); - metrics.initStoreMetrics(ImmutableList.of("newStoreName")); + BlobStoreManagerMetrics blobStoreManagerMetrics = new BlobStoreManagerMetrics(new MetricsRegistryMap()); + BlobStoreRestoreManagerMetrics blobStoreRestoreManagerMetrics = new BlobStoreRestoreManagerMetrics(new MetricsRegistryMap()); + blobStoreRestoreManagerMetrics.initStoreMetrics(ImmutableList.of("newStoreName")); Set storesToRestore = ImmutableSet.of("newStoreName"); // new store in config SnapshotIndex snapshotIndex = mock(SnapshotIndex.class); Map> prevStoreSnapshotIndexes = mock(Map.class); @@ -351,7 +356,7 @@ public void testRestoreSkipsStoresWithMissingCheckpointSCM() { DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class); BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, storesToRestore, prevStoreSnapshotIndexes, - loggedBaseDir.toFile(), storageConfig, metrics, + loggedBaseDir.toFile(), storageConfig, blobStoreManagerMetrics, blobStoreRestoreManagerMetrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR); // verify that we checked the previously checkpointed SCMs. diff --git a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java index 28514fa474..78207ad885 100644 --- a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java +++ b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java @@ -132,7 +132,7 @@ public void testPutDir() throws IOException, InterruptedException, ExecutionExce }); // Execute - BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, EXECUTOR, null, null); + BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, EXECUTOR, null, null, null); CompletionStage dirIndexFuture = blobStoreUtil.putDir(dirDiff, snapshotMetadata); DirIndex dirIndex = null; try { @@ -185,7 +185,7 @@ public void testPutDirFailsIfAnyFileUploadFails() throws IOException, TimeoutExc }); // Execute - BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, EXECUTOR, null, null); + BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, EXECUTOR, null, null, null); CompletionStage dirIndexFuture = blobStoreUtil.putDir(dirDiff, snapshotMetadata); try { // should be already complete. if not, future composition in putDir is broken. @@ -232,7 +232,7 @@ public void testPutDirFailsIfAnySubDirFileUploadFails() throws IOException, Time }); // Execute - BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, EXECUTOR, null, null); + BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, EXECUTOR, null, null, null); CompletionStage dirIndexFuture = blobStoreUtil.putDir(dirDiff, snapshotMetadata); try { // should be already complete. if not, future composition in putDir is broken. @@ -268,7 +268,7 @@ public void testCleanup() throws IOException, ExecutionException, InterruptedExc DirDiff dirDiff = DirDiffUtil.getDirDiff(localSnapshotDir.toFile(), remoteSnapshotDir, (localFile, remoteFile) -> localFile.getName().equals(remoteFile.getFileName())); - BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, EXECUTOR, null, null); + BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, EXECUTOR, null,null, null); when(blobStoreManager.put(any(InputStream.class), any(Metadata.class))) .thenReturn(CompletableFuture.completedFuture("blobId")); CompletionStage dirIndexFuture = blobStoreUtil.putDir(dirDiff, snapshotMetadata); @@ -321,7 +321,7 @@ public void testCleanUpFailsIfAnyFileDeleteFails() DirDiff dirDiff = DirDiffUtil.getDirDiff(localSnapshotDir.toFile(), remoteSnapshotDir, (localFile, remoteFile) -> localFile.getName().equals(remoteFile.getFileName())); - BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, EXECUTOR, null, null); + BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, EXECUTOR, null,null, null); when(blobStoreManager.put(any(InputStream.class), any(Metadata.class))) .thenReturn(CompletableFuture.completedFuture("blobId")); CompletionStage dirIndexFuture = blobStoreUtil.putDir(dirDiff, snapshotMetadata); @@ -381,7 +381,7 @@ public void testCleanUpFailsIfAnySubDirFileDeleteFails() DirDiff dirDiff = DirDiffUtil.getDirDiff(localSnapshotDir.toFile(), remoteSnapshotDir, (localFile, remoteFile) -> localFile.getName().equals(remoteFile.getFileName())); - BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, EXECUTOR, null, null); + BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, EXECUTOR, null, null, null); when(blobStoreManager.put(any(InputStream.class), any(Metadata.class))) .thenReturn(CompletableFuture.completedFuture("blobId")); CompletionStage dirIndexFuture = blobStoreUtil.putDir(dirDiff, snapshotMetadata); @@ -453,7 +453,7 @@ public void testRemoveTTL() throws IOException, ExecutionException, InterruptedE return CompletableFuture.completedFuture(fileName); }); - BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, EXECUTOR, null, null); + BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, EXECUTOR, null, null, null); CompletionStage dirIndexFuture = blobStoreUtil.putDir(dirDiff, snapshotMetadata); DirIndex dirIndex = null; try { @@ -506,7 +506,7 @@ public void testPutFileChecksumAndMetadata() throws IOException, ExecutionExcept return CompletableFuture.completedFuture("blobId"); }); - BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, EXECUTOR, null, null); + BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, EXECUTOR, null, null, null); CompletionStage fileIndexFuture = blobStoreUtil.putFile(path.toFile(), snapshotMetadata); FileIndex fileIndex = null; @@ -619,7 +619,7 @@ public void testRestoreDirRestoresMultiPartFilesCorrectly() throws IOException { return CompletableFuture.completedFuture(null); }); - BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager, EXECUTOR, null, null); + BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager, EXECUTOR, null,null, null); blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex, metadata).join(); assertTrue( @@ -677,7 +677,7 @@ public void testRestoreDirRetriesFileRestoreOnRetriableExceptions() throws IOExc return CompletableFuture.completedFuture(null); // success }); - BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager, EXECUTOR, null, null); + BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager, EXECUTOR, null,null, null); blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex, metadata).join(); assertTrue( @@ -730,7 +730,7 @@ public void testRestoreDirFailsRestoreOnNonRetriableExceptions() throws IOExcept return CompletableFuture.completedFuture(null); }); - BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager, EXECUTOR, null, null); + BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager, EXECUTOR, null, null, null); try { blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex, metadata).join(); fail("Should have failed on non-retriable errors during file restore"); @@ -782,7 +782,7 @@ public void testRestoreDirCreatesCorrectDirectoryStructure() throws IOException }); Path restoreDirBasePath = Files.createTempDirectory(BlobStoreTestUtil.TEMP_DIR_PREFIX); - BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager, EXECUTOR, null, null); + BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager, EXECUTOR, null, null, null); blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), dirIndex, metadata).join(); assertTrue(new DirDiffUtil().areSameDir(Collections.emptySet(), false).test(restoreDirBasePath.toFile(), dirIndex)); @@ -795,7 +795,7 @@ public void testRestoreDirCreatesCorrectDirectoryStructure() throws IOException @Test public void testGetSSIReturnsEmptyMapForNullCheckpoint() { BlobStoreUtil blobStoreUtil = - new BlobStoreUtil(mock(BlobStoreManager.class), MoreExecutors.newDirectExecutorService(), null, null); + new BlobStoreUtil(mock(BlobStoreManager.class), MoreExecutors.newDirectExecutorService(), null,null, null); Map> snapshotIndexes = blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", "taskName", null, new HashSet<>()); assertTrue(snapshotIndexes.isEmpty()); @@ -805,7 +805,7 @@ public void testGetSSIThrowsExceptionForCheckpointV1() { Checkpoint mockCheckpoint = mock(Checkpoint.class); when(mockCheckpoint.getVersion()).thenReturn((short) 1); BlobStoreUtil blobStoreUtil = - new BlobStoreUtil(mock(BlobStoreManager.class), MoreExecutors.newDirectExecutorService(), null, null); + new BlobStoreUtil(mock(BlobStoreManager.class), MoreExecutors.newDirectExecutorService(), null, null, null); Map> prevSnapshotIndexes = blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", "taskName", mockCheckpoint, new HashSet<>()); assertEquals(prevSnapshotIndexes.size(), 0); @@ -819,7 +819,7 @@ public void testGetSSIReturnsEmptyMapIfNoEntryForBlobStoreBackendFactory() { ImmutableMap.of("com.OtherStateBackendFactory", ImmutableMap.of("storeName", "otherSCM"))); BlobStoreUtil blobStoreUtil = - new BlobStoreUtil(mock(BlobStoreManager.class), MoreExecutors.newDirectExecutorService(), null, null); + new BlobStoreUtil(mock(BlobStoreManager.class), MoreExecutors.newDirectExecutorService(), null, null, null); Map> snapshotIndexes = blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", "taskName", mockCheckpoint, new HashSet<>()); assertTrue(snapshotIndexes.isEmpty()); @@ -833,7 +833,7 @@ public void testGetSSIReturnsEmptyMapIfNoStoreForBlobStoreBackendFactory() { ImmutableMap.of(BlobStoreStateBackendFactory.class.getName(), ImmutableMap.of())); BlobStoreUtil blobStoreUtil = - new BlobStoreUtil(mock(BlobStoreManager.class), MoreExecutors.newDirectExecutorService(), null, null); + new BlobStoreUtil(mock(BlobStoreManager.class), MoreExecutors.newDirectExecutorService(), null, null, null); Map> snapshotIndexes = blobStoreUtil.getStoreSnapshotIndexes("testJobName", "testJobId", "taskName", mockCheckpoint, new HashSet<>()); assertTrue(snapshotIndexes.isEmpty()); @@ -981,7 +981,7 @@ public void testPutFileRetriedMorethanThreeTimes() throws Exception { return FutureUtil.failedFuture(new RetriableException()); // retriable error }).thenAnswer((Answer>) invocation -> CompletableFuture.completedFuture("blobId")); - BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, EXECUTOR, null, null); + BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, EXECUTOR, null,null, null); blobStoreUtil.putFile(path.toFile(), snapshotMetadata).join(); // Verify put operation is retried 4 times diff --git a/samza-core/src/test/java/org/apache/samza/util/TestFutureUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestFutureUtil.java index 815eb34a71..bed8a4ef09 100644 --- a/samza-core/src/test/java/org/apache/samza/util/TestFutureUtil.java +++ b/samza-core/src/test/java/org/apache/samza/util/TestFutureUtil.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; +import java.util.function.BiPredicate; import java.util.function.Predicate; import org.apache.samza.SamzaException; import org.junit.Test; @@ -146,7 +147,7 @@ public void testFutureOfMapCompletesSuccessfullyIfNoErrors() { map.put("1", CompletableFuture.completedFuture("1")); map.put("2", CompletableFuture.completedFuture("2")); - CompletableFuture> result = FutureUtil.toFutureOfMap(t -> true, map); + CompletableFuture> result = FutureUtil.toFutureOfMap((k, t) -> true, map); assertTrue(result.isDone()); assertFalse(result.isCompletedExceptionally()); } @@ -158,7 +159,7 @@ public void testFutureOfMapCompletesSuccessfullyIfOnlyIgnoredErrors() { map.put("2", FutureUtil.failedFuture(new SamzaException())); CompletableFuture> result = FutureUtil - .toFutureOfMap(t -> FutureUtil.unwrapExceptions(CompletionException.class, t) instanceof SamzaException, map); + .toFutureOfMap((k, t) -> FutureUtil.unwrapExceptions(CompletionException.class, t) instanceof SamzaException, map); assertTrue(result.isDone()); result.join(); assertFalse(result.isCompletedExceptionally()); @@ -173,15 +174,15 @@ public void testFutureOfMapCompletesExceptionallyIfAnyNonIgnoredErrors() { SamzaException samzaException = new SamzaException(); map.put("2", FutureUtil.failedFuture(samzaException)); - Predicate mockPredicate = mock(Predicate.class); - when(mockPredicate.test(any())) + BiPredicate mockPredicate = mock(BiPredicate.class); + when(mockPredicate.test(any(), any())) .thenReturn(true) .thenReturn(false); CompletableFuture> result = FutureUtil.toFutureOfMap(mockPredicate, map); assertTrue(result.isDone()); assertTrue(result.isCompletedExceptionally()); - verify(mockPredicate, times(2)).test(any()); // verify that each failed value future is tested + verify(mockPredicate, times(2)).test(any(), any()); // verify that each failed value future is tested try { result.join();