Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update grpc WriteObject response handling to provide context when a failure happens #2532

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

/**
* This exception is used to preserve the caller's stacktrace when invoking an async task in a sync
* context. It will be added as a suppressed exception when propagating the async exception. This
* allows callers to catch ApiException thrown in an async operation, while still maintaining the
* call site.
*/
public final class AsyncStorageTaskException extends RuntimeException {
// mimic of com.google.api.gax.rpc.AsyncTaskException which doesn't have a public constructor
// if that class is ever made public, make this class extend it
AsyncStorageTaskException() {
super("Asynchronous task failed");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.api.gax.rpc.ClientStreamingCallable;
import com.google.api.gax.rpc.OutOfRangeException;
Expand Down Expand Up @@ -208,14 +209,19 @@ private void flush(
deps,
alg,
() -> {
Observer observer = new Observer(content, finalizing);
Observer observer = new Observer(content, finalizing, segments, internalContext);
ApiStreamObserver<WriteObjectRequest> write = callable.clientStreamingCall(observer);

for (WriteObjectRequest message : segments) {
write.onNext(message);
}
write.onCompleted();
observer.await();
try {
observer.await();
} catch (Throwable t) {
t.addSuppressed(new AsyncStorageTaskException());
throw t;
}
return null;
},
Decoder.identity());
Expand All @@ -230,13 +236,21 @@ class Observer implements ApiStreamObserver<WriteObjectResponse> {

private final RewindableContent content;
private final boolean finalizing;
private final List<WriteObjectRequest> segments;
private final GrpcCallContext context;

private final SettableApiFuture<Void> invocationHandle;
private volatile WriteObjectResponse last;

Observer(@Nullable RewindableContent content, boolean finalizing) {
Observer(
@Nullable RewindableContent content,
boolean finalizing,
@NonNull List<WriteObjectRequest> segments,
GrpcCallContext context) {
this.content = content;
this.finalizing = finalizing;
this.segments = segments;
this.context = context;
this.invocationHandle = SettableApiFuture.create();
}

Expand All @@ -250,10 +264,20 @@ public void onError(Throwable t) {
if (t instanceof OutOfRangeException) {
OutOfRangeException oore = (OutOfRangeException) t;
open = false;
invocationHandle.setException(
ResumableSessionFailureScenario.SCENARIO_5.toStorageException());
} else {
invocationHandle.setException(t);
StorageException storageException =
ResumableSessionFailureScenario.SCENARIO_5.toStorageException(
segments, null, context, oore);
invocationHandle.setException(storageException);
} else if (t instanceof ApiException) {
// use StorageExceptions logic to translate from ApiException to our status codes ensuring
// things fall in line with our retry handlers.
// This is suboptimal, as it will initialize a second exception, however this is the
// unusual case, and it should not cause a significant overhead given its rarity.
StorageException tmp = StorageException.asStorageException((ApiException) t);
StorageException storageException =
ResumableSessionFailureScenario.toStorageException(
tmp.getCode(), tmp.getMessage(), tmp.getReason(), segments, null, context, t);
invocationHandle.setException(storageException);
}
}

Expand All @@ -276,7 +300,8 @@ public void onCompleted() {
writeCtx.getTotalSentBytes().set(persistedSize);
writeCtx.getConfirmedBytes().set(persistedSize);
} else {
throw ResumableSessionFailureScenario.SCENARIO_7.toStorageException();
throw ResumableSessionFailureScenario.SCENARIO_7.toStorageException(
segments, last, context, null);
}
} else if (finalizing && last.hasResource()) {
long totalSentBytes = writeCtx.getTotalSentBytes().get();
Expand All @@ -285,22 +310,28 @@ public void onCompleted() {
writeCtx.getConfirmedBytes().set(finalSize);
resultFuture.set(last);
} else if (finalSize < totalSentBytes) {
throw ResumableSessionFailureScenario.SCENARIO_4_1.toStorageException();
throw ResumableSessionFailureScenario.SCENARIO_4_1.toStorageException(
segments, last, context, null);
} else {
throw ResumableSessionFailureScenario.SCENARIO_4_2.toStorageException();
throw ResumableSessionFailureScenario.SCENARIO_4_2.toStorageException(
segments, last, context, null);
}
} else if (!finalizing && last.hasResource()) {
throw ResumableSessionFailureScenario.SCENARIO_1.toStorageException();
throw ResumableSessionFailureScenario.SCENARIO_1.toStorageException(
segments, last, context, null);
} else if (finalizing && last.hasPersistedSize()) {
long totalSentBytes = writeCtx.getTotalSentBytes().get();
long persistedSize = last.getPersistedSize();
if (persistedSize < totalSentBytes) {
throw ResumableSessionFailureScenario.SCENARIO_3.toStorageException();
throw ResumableSessionFailureScenario.SCENARIO_3.toStorageException(
segments, last, context, null);
} else {
throw ResumableSessionFailureScenario.SCENARIO_2.toStorageException();
throw ResumableSessionFailureScenario.SCENARIO_2.toStorageException(
segments, last, context, null);
}
} else {
throw ResumableSessionFailureScenario.SCENARIO_0.toStorageException();
throw ResumableSessionFailureScenario.SCENARIO_0.toStorageException(
segments, last, context, null);
}
} catch (Throwable se) {
open = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,31 @@

package com.google.cloud.storage;

import static com.google.cloud.storage.Utils.ifNonNull;

import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.BaseServiceException;
import com.google.cloud.storage.StorageException.IOExceptionCallable;
import com.google.common.io.CharStreams;
import com.google.protobuf.MessageOrBuilder;
import com.google.storage.v2.ChecksummedData;
import com.google.storage.v2.ObjectChecksums;
import com.google.storage.v2.WriteObjectRequest;
import com.google.storage.v2.WriteObjectResponse;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Predicate;
import javax.annotation.ParametersAreNonnullByDefault;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

@ParametersAreNonnullByDefault
Expand Down Expand Up @@ -69,6 +81,10 @@ enum ResumableSessionFailureScenario {
private static final String PREFIX_I = "\t|< ";
private static final String PREFIX_O = "\t|> ";
private static final String PREFIX_X = "\t| ";
// define some constants for tab widths that are more compressed that the literals
private static final String T1 = "\t";
private static final String T2 = "\t\t";
private static final String T3 = "\t\t\t";

private static final Predicate<String> includedHeaders =
matches("Content-Length")
Expand All @@ -78,6 +94,7 @@ enum ResumableSessionFailureScenario {
.or(matches("Range"))
.or(startsWith("X-Goog-Stored-"))
.or(matches("X-Goog-GCS-Idempotency-Token"))
.or(matches("X-Goog-request-params"))
.or(matches("X-GUploader-UploadID"));

private static final Predicate<Map.Entry<String, ?>> includeHeader =
Expand Down Expand Up @@ -116,8 +133,12 @@ StorageException toStorageException(
return toStorageException(code, message, reason, uploadId, resp, cause, contentCallable);
}

StorageException toStorageException() {
return new StorageException(code, message, reason, null);
StorageException toStorageException(
@NonNull List<@NonNull WriteObjectRequest> reqs,
@Nullable WriteObjectResponse resp,
@NonNull GrpcCallContext context,
@Nullable Throwable cause) {
return toStorageException(code, message, reason, reqs, resp, context, cause);
}

static StorageException toStorageException(
Expand All @@ -136,6 +157,102 @@ static StorageException toStorageException(
return se;
}

static StorageException toStorageException(
int code,
String message,
@Nullable String reason,
@NonNull List<@NonNull WriteObjectRequest> reqs,
@Nullable WriteObjectResponse resp,
@NonNull GrpcCallContext context,
@Nullable Throwable cause) {
final StringBuilder sb = new StringBuilder();
sb.append(message);
// request context
Map<String, List<String>> extraHeaders = context.getExtraHeaders();
recordHeadersTo(extraHeaders, PREFIX_O, sb);
int length = reqs.size();
for (int i = 0; i < length; i++) {
if (i == 0) {
sb.append("\n").append(PREFIX_O).append("[");
} else {
sb.append(",");
}
WriteObjectRequest req = reqs.get(i);
sb.append("\n").append(PREFIX_O).append(T1).append(req.getClass().getName()).append("{");
if (req.hasUploadId()) {
sb.append("\n").append(PREFIX_O).append(T2).append("upload_id: ").append(req.getUploadId());
}
long writeOffset = req.getWriteOffset();
if (req.hasChecksummedData()) {
ChecksummedData checksummedData = req.getChecksummedData();
sb.append("\n").append(PREFIX_O).append(T2);
sb.append(
String.format(
"checksummed_data: {range: [%d:%d]",
writeOffset, writeOffset + checksummedData.getContent().size()));
if (checksummedData.hasCrc32C()) {
sb.append(", crc32c: ").append(checksummedData.getCrc32C());
}
sb.append("}");
} else {
sb.append("\n").append(PREFIX_O).append(T2).append("write_offset: ").append(writeOffset);
}
if (req.getFinishWrite()) {
sb.append("\n").append(PREFIX_O).append(T2).append("finish_write: true");
}
if (req.hasObjectChecksums()) {
ObjectChecksums objectChecksums = req.getObjectChecksums();
sb.append("\n").append(PREFIX_O).append(T2).append("object_checksums: ").append("{");
fmt(objectChecksums, PREFIX_O, T3, sb);
sb.append("\n").append(PREFIX_O).append(T2).append("}");
}
sb.append("\n").append(PREFIX_O).append("\t}");
if (i == length - 1) {
sb.append("\n").append(PREFIX_O).append("]");
}
}

sb.append("\n").append(PREFIX_X);

// response context
if (resp != null) {
sb.append("\n").append(PREFIX_I).append(resp.getClass().getName()).append("{");
fmt(resp, PREFIX_I, T1, sb);
sb.append("\n").append(PREFIX_I).append("}");
sb.append("\n").append(PREFIX_X);
}

if (cause != null) {
if (cause instanceof ApiException) {
ApiException apiException = (ApiException) cause;
Throwable cause1 = apiException.getCause();
if (cause1 instanceof StatusRuntimeException) {
StatusRuntimeException statusRuntimeException = (StatusRuntimeException) cause1;
sb.append("\n").append(PREFIX_I).append(statusRuntimeException.getStatus());
ifNonNull(
statusRuntimeException.getTrailers(),
t -> sb.append("\n").append(PREFIX_I).append(t));
} else {
sb.append("\n")
.append(PREFIX_I)
.append("code: ")
.append(apiException.getStatusCode().toString());
ifNonNull(
apiException.getReason(),
r -> sb.append("\n").append(PREFIX_I).append("reason: ").append(r));
ifNonNull(
apiException.getDomain(),
d -> sb.append("\n").append(PREFIX_I).append("domain: ").append(d));
ifNonNull(
apiException.getErrorDetails(),
e -> sb.append("\n").append(PREFIX_I).append("errorDetails: ").append(e));
}
sb.append("\n").append(PREFIX_X);
}
}
return new StorageException(code, sb.toString(), reason, cause);
}

static StorageException toStorageException(
int overrideCode,
String message,
Expand Down Expand Up @@ -213,14 +330,21 @@ private static Predicate<String> startsWith(String prefix) {
}

private static void recordHeaderTo(HttpHeaders h, String prefix, StringBuilder sb) {
h.entrySet().stream()
.filter(includeHeader)
.forEach(
e -> {
String key = e.getKey();
String value = headerValueToString(e.getValue());
sb.append("\n").append(prefix).append(key).append(": ").append(value);
});
h.entrySet().stream().filter(includeHeader).forEach(writeHeaderValue(prefix, sb));
}

private static void recordHeadersTo(
Map<String, List<String>> headers, String prefix, StringBuilder sb) {
headers.entrySet().stream().filter(includeHeader).forEach(writeHeaderValue(prefix, sb));
}

private static <V> Consumer<Map.Entry<String, V>> writeHeaderValue(
String prefix, StringBuilder sb) {
return e -> {
String key = e.getKey();
String value = headerValueToString(e.getValue());
sb.append("\n").append(prefix).append(key).append(": ").append(value);
};
}

private static String headerValueToString(Object o) {
Expand All @@ -233,4 +357,18 @@ private static String headerValueToString(Object o) {

return o.toString();
}

private static void fmt(
MessageOrBuilder msg,
@SuppressWarnings("SameParameterValue") String prefix,
String indentation,
StringBuilder sb) {
String string = msg.toString();
// drop the final new line before prefixing
string = string.replaceAll("\n$", "");
sb.append("\n")
.append(prefix)
.append(indentation)
.append(string.replaceAll("\r?\n", "\n" + prefix + indentation));
}
}
Loading
Loading