Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(spanner): Avoid blocking thread in AsyncResultSet #3446

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -768,9 +768,14 @@ ResultSet executeQueryInternalWithOptions(
rpc.getExecuteQueryRetrySettings(),
rpc.getExecuteQueryRetryableCodes()) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken,
AsyncResultSet.StreamMessageListener streamListener) {
GrpcStreamIterator stream =
new GrpcStreamIterator(statement, prefetchChunks, cancelQueryWhenClientIsClosed);
if (streamListener != null) {
stream.registerListener(streamListener);
}
if (partitionToken != null) {
request.setPartitionToken(partitionToken);
}
Expand All @@ -791,8 +796,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
getTransactionChannelHint(),
isRouteToLeader());
session.markUsed(clock.instant());
call.request(prefetchChunks);
stream.setCall(call, request.getTransaction().hasBegin());
call.request(prefetchChunks);
return stream;
}

Expand Down Expand Up @@ -959,9 +964,14 @@ ResultSet readInternalWithOptions(
rpc.getReadRetrySettings(),
rpc.getReadRetryableCodes()) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken,
AsyncResultSet.StreamMessageListener streamListener) {
GrpcStreamIterator stream =
new GrpcStreamIterator(prefetchChunks, cancelQueryWhenClientIsClosed);
if (streamListener != null) {
stream.registerListener(streamListener);
}
TransactionSelector selector = null;
if (resumeToken != null) {
builder.setResumeToken(resumeToken);
Expand All @@ -980,8 +990,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
getTransactionChannelHint(),
isRouteToLeader());
session.markUsed(clock.instant());
call.request(prefetchChunks);
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
call.request(prefetchChunks);
return stream;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@ interface CloseableIterator<T> extends Iterator<T> {
void close(@Nullable String message);

boolean isWithBeginTransaction();

/**
* @param streamMessageListener A class object which implements StreamMessageListener
* @return true if streaming is supported by the iterator, otherwise false
*/
default boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
olavloite marked this conversation as resolved.
Show resolved Hide resolved
return false;
}
}

static double valueProtoToFloat64(com.google.protobuf.Value proto) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.api.core.ApiFuture;
import com.google.common.base.Function;
import com.google.spanner.v1.PartialResultSet;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -223,4 +224,12 @@ interface ReadyCallback {
* @param transformer function which will be used to transform the row. It should not return null.
*/
<T> List<T> toList(Function<StructReader, T> transformer) throws SpannerException;

/**
* An interface to register the listener for streaming gRPC request. It will be called when a
* chunk is received from gRPC streaming call.
*/
interface StreamMessageListener {
void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsFull);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ListenableFutureToApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.cloud.spanner.AbstractReadContext.ListenableAsyncResultSet;
Expand All @@ -29,13 +28,13 @@
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.ResultSetStats;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
Expand All @@ -45,12 +44,14 @@
import java.util.logging.Logger;

/** Default implementation for {@link AsyncResultSet}. */
class AsyncResultSetImpl extends ForwardingStructReader implements ListenableAsyncResultSet {
class AsyncResultSetImpl extends ForwardingStructReader
implements ListenableAsyncResultSet, AsyncResultSet.StreamMessageListener {
private static final Logger log = Logger.getLogger(AsyncResultSetImpl.class.getName());

/** State of an {@link AsyncResultSetImpl}. */
private enum State {
INITIALIZED,
STREAMING_INITIALIZED,
/** SYNC indicates that the {@link ResultSet} is used in sync pattern. */
SYNC,
CONSUMING,
Expand Down Expand Up @@ -115,12 +116,15 @@ private enum State {

private State state = State.INITIALIZED;

/** This variable indicates that produce rows thread is initiated */
private volatile boolean produceRowsInitiated;

/**
* This variable indicates whether all the results from the underlying result set have been read.
*/
private volatile boolean finished;

private volatile ApiFuture<Void> result;
private volatile SettableApiFuture<Void> result;

/**
* This variable indicates whether {@link #tryNext()} has returned {@link CursorState#DONE} or a
Expand Down Expand Up @@ -329,12 +333,12 @@ public void run() {
private final CallbackRunnable callbackRunnable = new CallbackRunnable();

/**
* {@link ProduceRowsCallable} reads data from the underlying {@link ResultSet}, places these in
* {@link ProduceRowsRunnable} reads data from the underlying {@link ResultSet}, places these in
* the buffer and dispatches the {@link CallbackRunnable} when data is ready to be consumed.
*/
private class ProduceRowsCallable implements Callable<Void> {
private class ProduceRowsRunnable implements Runnable {
@Override
public Void call() throws Exception {
public void run() {
boolean stop = false;
boolean hasNext = false;
try {
Expand Down Expand Up @@ -393,12 +397,17 @@ public Void call() throws Exception {
}
// Call the callback if there are still rows in the buffer that need to be processed.
while (!stop) {
waitIfPaused();
startCallbackIfNecessary();
// Make sure we wait until the callback runner has actually finished.
consumingLatch.await();
synchronized (monitor) {
stop = cursorReturnedDoneOrException;
try {
waitIfPaused();
startCallbackIfNecessary();
// Make sure we wait until the callback runner has actually finished.
consumingLatch.await();
synchronized (monitor) {
stop = cursorReturnedDoneOrException;
}
} catch (Throwable e) {
result.setException(e);
return;
}
}
} finally {
Expand All @@ -410,14 +419,14 @@ public Void call() throws Exception {
}
synchronized (monitor) {
if (executionException != null) {
throw executionException;
}
if (state == State.CANCELLED) {
throw CANCELLED_EXCEPTION;
result.setException(executionException);
} else if (state == State.CANCELLED) {
result.setException(CANCELLED_EXCEPTION);
} else {
result.set(null);
}
}
}
return null;
}

private void waitIfPaused() throws InterruptedException {
Expand Down Expand Up @@ -449,6 +458,26 @@ private void startCallbackWithBufferLatchIfNecessary(int bufferLatch) {
}
}

private class InitiateStreamingRunnable implements Runnable {

@Override
public void run() {
try {
// This method returns true if the underlying result set is a streaming result set (e.g. a
// GrpcResultSet).
// Those result sets will trigger initiateProduceRows() when the first results are received.
// Non-streaming result sets do not trigger this callback, and for those result sets, we
// need to eagerly start the ProduceRowsRunnable.
if (!initiateStreaming(AsyncResultSetImpl.this)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it would be good to add a comment here, something along the lines like:

// This method returns true if the underlying result set is a streaming result set (e.g. a GrpcResultSet).
// Those result sets will trigger initiateProduceRows() when the first results are received.
// Non-streaming result sets do not trigger this callback, and for those result sets, we need to eagerly
// start the ProduceRowsRunnable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

initiateProduceRows();
}
} catch (Throwable exception) {
executionException = SpannerExceptionFactory.asSpannerException(exception);
initiateProduceRows();
}
}
}

/** Sets the callback for this {@link AsyncResultSet}. */
@Override
public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
Expand All @@ -458,16 +487,24 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
this.state == State.INITIALIZED, "callback may not be set multiple times");

// Start to fetch data and buffer these.
this.result =
new ListenableFutureToApiFuture<>(this.service.submit(new ProduceRowsCallable()));
this.result = SettableApiFuture.create();
this.state = State.STREAMING_INITIALIZED;
this.service.execute(new InitiateStreamingRunnable());
this.executor = MoreExecutors.newSequentialExecutor(Preconditions.checkNotNull(exec));
this.callback = Preconditions.checkNotNull(cb);
this.state = State.RUNNING;
pausedLatch.countDown();
return result;
}
}

private void initiateProduceRows() {
if (this.state == State.STREAMING_INITIALIZED) {
this.state = State.RUNNING;
}
produceRowsInitiated = true;
this.service.execute(new ProduceRowsRunnable());
}

Future<Void> getResult() {
return result;
}
Expand Down Expand Up @@ -578,6 +615,10 @@ public ResultSetMetadata getMetadata() {
return delegateResultSet.get().getMetadata();
}

boolean initiateStreaming(StreamMessageListener streamMessageListener) {
return StreamingUtil.initiateStreaming(delegateResultSet.get(), streamMessageListener);
}

@Override
protected void checkValidState() {
synchronized (monitor) {
Expand All @@ -593,4 +634,22 @@ public Struct getCurrentRowAsStruct() {
checkValidState();
return currentRow;
}

@Override
public void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsFull) {
synchronized (monitor) {
if (produceRowsInitiated) {
return;
}
// if PartialResultSet contains a resume token or buffer size is full, or
// we have reached the end of the stream, we can start the thread.
boolean startJobThread =
!partialResultSet.getResumeToken().isEmpty()
|| bufferIsFull
|| partialResultSet == GrpcStreamIterator.END_OF_STREAM;
if (startJobThread || state != State.STREAMING_INITIALIZED) {
initiateProduceRows();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

package com.google.cloud.spanner;

import com.google.api.core.InternalApi;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.ResultSetStats;

/** Forwarding implementation of ResultSet that forwards all calls to a delegate. */
public class ForwardingResultSet extends ForwardingStructReader implements ProtobufResultSet {
public class ForwardingResultSet extends ForwardingStructReader
implements ProtobufResultSet, StreamingResultSet {

private Supplier<? extends ResultSet> delegate;

Expand Down Expand Up @@ -102,4 +104,10 @@ public ResultSetStats getStats() {
public ResultSetMetadata getMetadata() {
return delegate.get().getMetadata();
}

@Override
@InternalApi
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
sakthivelmanii marked this conversation as resolved.
Show resolved Hide resolved
return StreamingUtil.initiateStreaming(delegate.get(), streamMessageListener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
import static com.google.common.base.Preconditions.checkState;

import com.google.api.core.InternalApi;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Value;
import com.google.spanner.v1.PartialResultSet;
Expand All @@ -30,7 +31,8 @@
import javax.annotation.Nullable;

@VisibleForTesting
class GrpcResultSet extends AbstractResultSet<List<Object>> implements ProtobufResultSet {
class GrpcResultSet extends AbstractResultSet<List<Object>>
implements ProtobufResultSet, StreamingResultSet {
private final GrpcValueIterator iterator;
private final Listener listener;
private final DecodeMode decodeMode;
Expand Down Expand Up @@ -123,6 +125,12 @@ public ResultSetMetadata getMetadata() {
return metadata;
}

@Override
@InternalApi
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
sakthivelmanii marked this conversation as resolved.
Show resolved Hide resolved
return iterator.initiateStreaming(streamMessageListener);
}

@Override
public void close() {
synchronized (this) {
Expand Down
Loading
Loading