Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: automatically detect database dialect #1677

Merged
merged 3 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.Dialect getDialect()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.Dialect getDialect()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/BatchReadOnlyTransaction</className>
Expand All @@ -15,4 +20,19 @@
<differenceType>8001</differenceType>
<className>com/google/cloud/spanner/connection/StatementParser</className>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/spanner/SpannerOptions</className>
<method>com.google.cloud.spanner.Dialect getDialect()</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/spanner/SpannerOptions$Builder</className>
<method>com.google.cloud.spanner.SpannerOptions$Builder setDialect(com.google.cloud.spanner.Dialect)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/spanner/connection/ConnectionOptions</className>
<method>com.google.cloud.spanner.Dialect getDialect()</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@
*/
public interface DatabaseClient {

/**
* Returns the SQL dialect that is used by the database.
*
* @return the SQL dialect that is used by the database.
*/
default Dialect getDialect() {
throw new UnsupportedOperationException("method should be overwritten");
}

/**
* Writes the given mutations atomically to the database.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ PooledSessionFuture getSession() {
return pool.getSession();
}

@Override
public Dialect getDialect() {
return pool.getDialect();
}

@Override
public Timestamp write(final Iterable<Mutation> mutations) throws SpannerException {
return writeWithOptions(mutations).getCommitTimestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1484,6 +1484,34 @@ private void keepAlive() {
}
}

private void determineDialectAsync(final SettableFuture<Dialect> dialect) {
Preconditions.checkNotNull(dialect);
executor.submit(
() -> {
try {
dialect.set(determineDialect());
} catch (Throwable t) {
// Catch-all as we want to propagate all exceptions to anyone who might be interested
// in the database dialect, and there's nothing sensible that we can do with it here.
dialect.setException(t);
} finally {
releaseSession(this, Position.FIRST);
}
});
}

private Dialect determineDialect() {
try (ResultSet dialectResultSet =
delegate.singleUse().executeQuery(DETERMINE_DIALECT_STATEMENT)) {
if (dialectResultSet.next()) {
return Dialect.fromName(dialectResultSet.getString(0));
} else {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND, "No dialect found for database");
}
}
}

private void markBusy(Span span) {
this.delegate.setCurrentSpan(span);
this.state = SessionState.BUSY;
Expand Down Expand Up @@ -1724,7 +1752,27 @@ private enum Position {
RANDOM
}

/**
* This statement is (currently) used to determine the dialect of the database that is used by the
* session pool. This statement is subject to change when the INFORMATION_SCHEMA contains a table
* where the dialect of the database can be read directly, and any tests that want to detect the
* specific 'determine dialect statement' should rely on this constant instead of the actual
* value.
*/
@VisibleForTesting
static final Statement DETERMINE_DIALECT_STATEMENT =
Statement.newBuilder(
"SELECT 'POSTGRESQL' AS DIALECT\n"
+ "FROM INFORMATION_SCHEMA.SCHEMATA\n"
+ "WHERE SCHEMA_NAME='pg_catalog'\n"
+ "UNION ALL\n"
+ "SELECT 'GOOGLE_STANDARD_SQL' AS DIALECT\n"
+ "FROM INFORMATION_SCHEMA.SCHEMATA\n"
+ "WHERE SCHEMA_NAME='INFORMATION_SCHEMA' AND CATALOG_NAME=''")
.build();

private final SessionPoolOptions options;
private final SettableFuture<Dialect> dialect = SettableFuture.create();
private final SessionClient sessionClient;
private final ScheduledExecutorService executor;
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
Expand All @@ -1734,6 +1782,9 @@ private enum Position {
private final Object lock = new Object();
private final Random random = new Random();

@GuardedBy("lock")
private boolean detectDialectStarted;

@GuardedBy("lock")
private int pendingClosure;

Expand Down Expand Up @@ -1861,6 +1912,39 @@ private SessionPool(
this.initMetricsCollection(metricRegistry, labelValues);
}

/**
* @return the {@link Dialect} of the underlying database. This method will block until the
* dialect is available. It will potentially execute one or two RPCs to get the dialect if
* necessary: One to create a session if there are no sessions in the pool (yet), and one to
* query the database for the dialect that is used. It is recommended that clients that always
* need to know the dialect set {@link
* SessionPoolOptions.Builder#setAutoDetectDialect(boolean)} to true. This will ensure that
* the dialect is fetched automatically in a background task when a session pool is created.
*/
Dialect getDialect() {
boolean mustDetectDialect = false;
synchronized (lock) {
if (!detectDialectStarted) {
mustDetectDialect = true;
detectDialectStarted = true;
}
}
if (mustDetectDialect) {
try (PooledSessionFuture session = getSession()) {
dialect.set(session.get().determineDialect());
}
}
try {
return dialect.get(60L, TimeUnit.SECONDS);
} catch (ExecutionException executionException) {
throw SpannerExceptionFactory.asSpannerException(executionException);
} catch (InterruptedException interruptedException) {
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
} catch (TimeoutException timeoutException) {
throw SpannerExceptionFactory.propagateTimeout(timeoutException);
}
}

@VisibleForTesting
int getNumberOfSessionsInUse() {
synchronized (lock) {
Expand Down Expand Up @@ -2290,10 +2374,17 @@ public void onSessionReady(SessionImpl session) {
} else {
Preconditions.checkState(totalSessions() <= options.getMaxSessions() - 1);
allSessions.add(pooledSession);
// Release the session to a random position in the pool to prevent the case that a batch
// of sessions that are affiliated with the same channel are all placed sequentially in
// the pool.
releaseSession(pooledSession, Position.RANDOM);
if (options.isAutoDetectDialect() && !detectDialectStarted) {
// Get the dialect of the underlying database if that has not yet been done. Note that
// this method will release the session into the pool once it is done.
detectDialectStarted = true;
pooledSession.determineDialectAsync(SessionPool.this.dialect);
} else {
// Release the session to a random position in the pool to prevent the case that a batch
// of sessions that are affiliated with the same channel are all placed sequentially in
// the pool.
releaseSession(pooledSession, Position.RANDOM);
}
}
}
if (closeSession) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class SessionPoolOptions {
private final ActionOnSessionNotFound actionOnSessionNotFound;
private final ActionOnSessionLeak actionOnSessionLeak;
private final long initialWaitForSessionTimeoutMillis;
private final boolean autoDetectDialect;

private SessionPoolOptions(Builder builder) {
// minSessions > maxSessions is only possible if the user has only set a value for maxSessions.
Expand All @@ -67,6 +68,7 @@ private SessionPoolOptions(Builder builder) {
this.loopFrequency = builder.loopFrequency;
this.keepAliveIntervalMinutes = builder.keepAliveIntervalMinutes;
this.removeInactiveSessionAfter = builder.removeInactiveSessionAfter;
this.autoDetectDialect = builder.autoDetectDialect;
}

@Override
Expand All @@ -87,7 +89,8 @@ public boolean equals(Object o) {
this.initialWaitForSessionTimeoutMillis, other.initialWaitForSessionTimeoutMillis)
&& Objects.equals(this.loopFrequency, other.loopFrequency)
&& Objects.equals(this.keepAliveIntervalMinutes, other.keepAliveIntervalMinutes)
&& Objects.equals(this.removeInactiveSessionAfter, other.removeInactiveSessionAfter);
&& Objects.equals(this.removeInactiveSessionAfter, other.removeInactiveSessionAfter)
&& Objects.equals(this.autoDetectDialect, other.autoDetectDialect);
}

@Override
Expand All @@ -104,7 +107,8 @@ public int hashCode() {
this.initialWaitForSessionTimeoutMillis,
this.loopFrequency,
this.keepAliveIntervalMinutes,
this.removeInactiveSessionAfter);
this.removeInactiveSessionAfter,
this.autoDetectDialect);
}

public Builder toBuilder() {
Expand Down Expand Up @@ -163,6 +167,10 @@ public boolean isBlockIfPoolExhausted() {
return actionOnExhaustion == ActionOnExhaustion.BLOCK;
}

public boolean isAutoDetectDialect() {
return autoDetectDialect;
}

@VisibleForTesting
long getInitialWaitForSessionTimeoutMillis() {
return initialWaitForSessionTimeoutMillis;
Expand Down Expand Up @@ -220,6 +228,7 @@ public static class Builder {
private long loopFrequency = 10 * 1000L;
private int keepAliveIntervalMinutes = 30;
private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L);
private boolean autoDetectDialect = false;

public Builder() {}

Expand All @@ -237,6 +246,7 @@ private Builder(SessionPoolOptions options) {
this.loopFrequency = options.loopFrequency;
this.keepAliveIntervalMinutes = options.keepAliveIntervalMinutes;
this.removeInactiveSessionAfter = options.removeInactiveSessionAfter;
this.autoDetectDialect = options.autoDetectDialect;
}

/**
Expand Down Expand Up @@ -327,6 +337,24 @@ public Builder setBlockIfPoolExhausted() {
return this;
}

/**
* Sets whether the client should automatically execute a background query to detect the dialect
* that is used by the database or not. Set this option to true if you do not know what the
* dialect of the database will be.
*
* <p>Note that you can always call {@link DatabaseClient#getDialect()} to get the dialect of a
* database regardless of this setting, but by setting this to true, the value will be
* pre-populated and cached in the client.
*
* @param autoDetectDialect Whether the client should automatically execute a background query
* to detect the dialect of the underlying database
* @return this builder for chaining
*/
public Builder setAutoDetectDialect(boolean autoDetectDialect) {
this.autoDetectDialect = autoDetectDialect;
return this;
}

/**
* The initial number of milliseconds to wait for a session to become available when one is
* requested. The session pool will keep retrying to get a session, and the timeout will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final CallCredentialsProvider callCredentialsProvider;
private final CloseableExecutorProvider asyncExecutorProvider;
private final String compressorName;
private final Dialect dialect;

/**
* Interface that can be used to provide {@link CallCredentials} instead of {@link Credentials} to
Expand Down Expand Up @@ -593,7 +592,6 @@ private SpannerOptions(Builder builder) {
callCredentialsProvider = builder.callCredentialsProvider;
asyncExecutorProvider = builder.asyncExecutorProvider;
compressorName = builder.compressorName;
dialect = builder.dialect;
}

/**
Expand Down Expand Up @@ -693,7 +691,6 @@ public static class Builder
private CloseableExecutorProvider asyncExecutorProvider;
private String compressorName;
private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST");
private Dialect dialect = Dialect.GOOGLE_STANDARD_SQL;

private Builder() {
// Manually set retry and polling settings that work.
Expand Down Expand Up @@ -748,7 +745,6 @@ private Builder() {
this.channelProvider = options.channelProvider;
this.channelConfigurator = options.channelConfigurator;
this.interceptorProvider = options.interceptorProvider;
this.dialect = options.dialect;
}

@Override
Expand Down Expand Up @@ -779,7 +775,6 @@ protected Set<String> getAllowedClientLibTokens() {
* <li>{@link #setHost(String)}
* <li>{@link #setNumChannels(int)}
* <li>{@link #setInterceptorProvider(GrpcInterceptorProvider)}
* <li>{@link #setDialect(Dialect)}
* <li>{@link #setHeaderProvider(com.google.api.gax.rpc.HeaderProvider)}
* </ol>
*/
Expand Down Expand Up @@ -1139,16 +1134,6 @@ public Builder setEmulatorHost(String emulatorHost) {
return this;
}

/**
* Sets the {@link Dialect} to use with Cloud Spanner. The default is {@link
* Dialect#GOOGLE_STANDARD_SQL}.
*/
public Builder setDialect(Dialect dialect) {
Preconditions.checkNotNull(dialect);
this.dialect = dialect;
return this;
}

@SuppressWarnings("rawtypes")
@Override
public SpannerOptions build() {
Expand Down Expand Up @@ -1276,10 +1261,6 @@ public String getCompressorName() {
return compressorName;
}

public Dialect getDialect() {
return dialect;
}

/** Returns the default query options to use for the specific database. */
public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) {
// Use the specific query options for the database if any have been specified. These have
Expand Down
Loading