From d6ffeebbe4775c0b88c47c0a09fecd3e98505bd8 Mon Sep 17 00:00:00 2001 From: ajo thomas Date: Wed, 9 Oct 2024 15:46:09 -0700 Subject: [PATCH] Log bob deserialization exception for snapshot blobs (#1711) --- .../samza/storage/blobstore/util/BlobStoreUtil.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java index 327eb7f33c..bd78248cb9 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java @@ -182,7 +182,13 @@ public CompletableFuture getSnapshotIndex(String blobId, Metadata return FutureUtil.executeAsyncWithRetries(opName, () -> { ByteArrayOutputStream indexBlobStream = new ByteArrayOutputStream(); // no need to close ByteArrayOutputStream return blobStoreManager.get(blobId, indexBlobStream, metadata, getDeleted).toCompletableFuture() - .thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor); + .thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor) + .handle((snapshotIndex, ex) -> { + if (ex != null) { + throw new SamzaException(String.format("Unable to deserialize SnapshotIndex bytes for blob ID: %s", blobId), ex); + } + return snapshotIndex; + }); }, isCauseNonRetriable(), executor, retryPolicyConfig); }