Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix exception message in BlobStoreUtil's getSnapshotIndex #1712

Merged
merged 1 commit into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId, Metadata
.thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor)
.handle((snapshotIndex, ex) -> {
if (ex != null) {
throw new SamzaException(String.format("Unable to deserialize SnapshotIndex bytes for blob ID: %s", blobId), ex);
throw new SamzaException(String.format("Unable to get SnapshotIndex blob. The blob ID is : %s", blobId), ex);
}
return snapshotIndex;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -50,6 +51,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.zip.CRC32;
Expand Down Expand Up @@ -78,6 +80,7 @@
import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
import org.apache.samza.util.FileUtil;
import org.apache.samza.util.FutureUtil;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -920,6 +923,24 @@ public void testGetSSIThrowsExceptionOnSyncBlobStoreErrors() {
checkpoint, storesToBackupOrRestore, false);
}

@Test
public void testSerdeException() throws ExecutionException, InterruptedException {
final String blobId = "foo";

final BlobStoreManager testBlobStoreManager = new DeserTestBlobStoreManager();
final BlobStoreUtil util = new BlobStoreUtil(testBlobStoreManager, Executors.newSingleThreadExecutor(), blobStoreConfig, null, null);

final CompletableFuture<SnapshotIndex> future = util.getSnapshotIndex(blobId, mock(Metadata.class), true)
.handle((snapshotIndex, throwable) -> {
if (throwable != null) {
Assert.assertEquals(throwable.getMessage(), String.format("Unable to get SnapshotIndex blob. The blob ID is : %s", blobId));
Assert.assertEquals(throwable.getCause().getMessage(), "org.apache.samza.SamzaException: Exception in deserializing SnapshotIndex bytes foobar");
}
return snapshotIndex;
});
future.get();
}

@Test
public void testGetSSIThrowsExceptionIfAnyNonIgnoredAsyncBlobStoreErrors() {
String store = "storeName1";
Expand Down Expand Up @@ -1045,4 +1066,57 @@ private CheckpointV2 createCheckpointV2(String stateBackendFactory, Map<String,
factoryStoreSCMs.put(stateBackendFactory, storeSCMs);
return new CheckpointV2(checkpointId, ImmutableMap.of(), factoryStoreSCMs);
}

/**
* Test {@link BlobStoreManager} to be used to assert SnapshotIndex deserialization failure
* exception message.
* We write a dummy string's bytes to the OutputStream parameter of get method instead of a SnapshotIndex
* blob. The OutputStream is used by SnapshotIndexSerde which will fail during deserialization.
* */
private static class DeserTestBlobStoreManager extends TestBlobStoreManager {
@Override
public CompletionStage<Void> get(String id, OutputStream outputStream, Metadata metadata, boolean getDeletedBlob) {
final String randBlob = "foobar";
final byte[] byteArray = randBlob.getBytes(StandardCharsets.UTF_8);
try {
outputStream.write(byteArray);
} catch (IOException e) {
throw new RuntimeException(e);
}
return CompletableFuture.completedFuture(null);
}
}

/**
* Test BlobStoreManager for unit tests.
* */
private static class TestBlobStoreManager implements BlobStoreManager {
@Override
public void init() {
}

@Override
public CompletionStage<String> put(InputStream inputStream, Metadata metadata) {
return null;
}

@Override
public CompletionStage<Void> get(String id, OutputStream outputStream, Metadata metadata, boolean getDeletedBlob) {
return null;
}

@Override
public CompletionStage<Void> delete(String id, Metadata metadata) {
return null;
}

@Override
public CompletionStage<Void> removeTTL(String blobId, Metadata metadata) {
return null;
}

@Override
public void close() {
}
}
}
Loading