diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java index bd78248cb9..3735d65297 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java @@ -185,7 +185,7 @@ public CompletableFuture getSnapshotIndex(String blobId, Metadata .thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor) .handle((snapshotIndex, ex) -> { if (ex != null) { - throw new SamzaException(String.format("Unable to deserialize SnapshotIndex bytes for blob ID: %s", blobId), ex); + throw new SamzaException(String.format("Unable to get SnapshotIndex blob. The blob ID is : %s", blobId), ex); } return snapshotIndex; }); diff --git a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java index a44f86e644..732fd472be 100644 --- a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java +++ b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -50,6 +51,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.zip.CRC32; @@ -78,6 +80,7 @@ import org.apache.samza.storage.blobstore.index.SnapshotMetadata; import org.apache.samza.util.FileUtil; import org.apache.samza.util.FutureUtil; +import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -920,6 +923,24 @@ public void testGetSSIThrowsExceptionOnSyncBlobStoreErrors() { checkpoint, storesToBackupOrRestore, false); } + @Test + public void testSerdeException() throws ExecutionException, InterruptedException { + final String blobId = "foo"; + + final BlobStoreManager testBlobStoreManager = new DeserTestBlobStoreManager(); + final BlobStoreUtil util = new BlobStoreUtil(testBlobStoreManager, Executors.newSingleThreadExecutor(), blobStoreConfig, null, null); + + final CompletableFuture future = util.getSnapshotIndex(blobId, mock(Metadata.class), true) + .handle((snapshotIndex, throwable) -> { + if (throwable != null) { + Assert.assertEquals(throwable.getMessage(), String.format("Unable to get SnapshotIndex blob. The blob ID is : %s", blobId)); + Assert.assertEquals(throwable.getCause().getMessage(), "org.apache.samza.SamzaException: Exception in deserializing SnapshotIndex bytes foobar"); + } + return snapshotIndex; + }); + future.get(); + } + @Test public void testGetSSIThrowsExceptionIfAnyNonIgnoredAsyncBlobStoreErrors() { String store = "storeName1"; @@ -1045,4 +1066,57 @@ private CheckpointV2 createCheckpointV2(String stateBackendFactory, Map get(String id, OutputStream outputStream, Metadata metadata, boolean getDeletedBlob) { + final String randBlob = "foobar"; + final byte[] byteArray = randBlob.getBytes(StandardCharsets.UTF_8); + try { + outputStream.write(byteArray); + } catch (IOException e) { + throw new RuntimeException(e); + } + return CompletableFuture.completedFuture(null); + } + } + + /** + * Test BlobStoreManager for unit tests. + * */ + private static class TestBlobStoreManager implements BlobStoreManager { + @Override + public void init() { + } + + @Override + public CompletionStage put(InputStream inputStream, Metadata metadata) { + return null; + } + + @Override + public CompletionStage get(String id, OutputStream outputStream, Metadata metadata, boolean getDeletedBlob) { + return null; + } + + @Override + public CompletionStage delete(String id, Metadata metadata) { + return null; + } + + @Override + public CompletionStage removeTTL(String blobId, Metadata metadata) { + return null; + } + + @Override + public void close() { + } + } }