Skip to content

Commit

Permalink
feat: automatically detect database dialect (#1677)
Browse files Browse the repository at this point in the history
The dialect of the database that a `DatabaseClient` is connected to can be automatically detected:
1. `DatabaseClient#getDialect()` has been added. This method always returns the dialect of the underlying database. It will do so by executing a query that detects the dialect. This query can also be executed and cached automatically in the background during startup (see below).
2. `SessionPoolOptions#setAutoDetectDialect(true)` will cause the dialect detection query to be executed in the background automatically when a new client is created. This is disabled by default, except for when a Connection API connection (or anything that depends on that, such as JDBC) is opened. The reason for this default behavior is that a normal Spanner instance does normally not need to know what the dialect of the underlying database is, while the Connection API does. This reduces the number of times the detection query will be executed during production use.
  • Loading branch information
olavloite authored Feb 15, 2022
1 parent 7095f94 commit 9eccfc4
Show file tree
Hide file tree
Showing 22 changed files with 350 additions and 149 deletions.
20 changes: 20 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.Dialect getDialect()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.Dialect getDialect()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/BatchReadOnlyTransaction</className>
Expand All @@ -15,4 +20,19 @@
<differenceType>8001</differenceType>
<className>com/google/cloud/spanner/connection/StatementParser</className>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/spanner/SpannerOptions</className>
<method>com.google.cloud.spanner.Dialect getDialect()</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/spanner/SpannerOptions$Builder</className>
<method>com.google.cloud.spanner.SpannerOptions$Builder setDialect(com.google.cloud.spanner.Dialect)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/spanner/connection/ConnectionOptions</className>
<method>com.google.cloud.spanner.Dialect getDialect()</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@
*/
public interface DatabaseClient {

/**
* Returns the SQL dialect that is used by the database.
*
* @return the SQL dialect that is used by the database.
*/
default Dialect getDialect() {
throw new UnsupportedOperationException("method should be overwritten");
}

/**
* Writes the given mutations atomically to the database.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ PooledSessionFuture getSession() {
return pool.getSession();
}

@Override
public Dialect getDialect() {
return pool.getDialect();
}

@Override
public Timestamp write(final Iterable<Mutation> mutations) throws SpannerException {
return writeWithOptions(mutations).getCommitTimestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1484,6 +1484,34 @@ private void keepAlive() {
}
}

private void determineDialectAsync(final SettableFuture<Dialect> dialect) {
Preconditions.checkNotNull(dialect);
executor.submit(
() -> {
try {
dialect.set(determineDialect());
} catch (Throwable t) {
// Catch-all as we want to propagate all exceptions to anyone who might be interested
// in the database dialect, and there's nothing sensible that we can do with it here.
dialect.setException(t);
} finally {
releaseSession(this, Position.FIRST);
}
});
}

private Dialect determineDialect() {
try (ResultSet dialectResultSet =
delegate.singleUse().executeQuery(DETERMINE_DIALECT_STATEMENT)) {
if (dialectResultSet.next()) {
return Dialect.fromName(dialectResultSet.getString(0));
} else {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.NOT_FOUND, "No dialect found for database");
}
}
}

private void markBusy(Span span) {
this.delegate.setCurrentSpan(span);
this.state = SessionState.BUSY;
Expand Down Expand Up @@ -1724,7 +1752,27 @@ private enum Position {
RANDOM
}

/**
* This statement is (currently) used to determine the dialect of the database that is used by the
* session pool. This statement is subject to change when the INFORMATION_SCHEMA contains a table
* where the dialect of the database can be read directly, and any tests that want to detect the
* specific 'determine dialect statement' should rely on this constant instead of the actual
* value.
*/
@VisibleForTesting
static final Statement DETERMINE_DIALECT_STATEMENT =
Statement.newBuilder(
"SELECT 'POSTGRESQL' AS DIALECT\n"
+ "FROM INFORMATION_SCHEMA.SCHEMATA\n"
+ "WHERE SCHEMA_NAME='pg_catalog'\n"
+ "UNION ALL\n"
+ "SELECT 'GOOGLE_STANDARD_SQL' AS DIALECT\n"
+ "FROM INFORMATION_SCHEMA.SCHEMATA\n"
+ "WHERE SCHEMA_NAME='INFORMATION_SCHEMA' AND CATALOG_NAME=''")
.build();

private final SessionPoolOptions options;
private final SettableFuture<Dialect> dialect = SettableFuture.create();
private final SessionClient sessionClient;
private final ScheduledExecutorService executor;
private final ExecutorFactory<ScheduledExecutorService> executorFactory;
Expand All @@ -1734,6 +1782,9 @@ private enum Position {
private final Object lock = new Object();
private final Random random = new Random();

@GuardedBy("lock")
private boolean detectDialectStarted;

@GuardedBy("lock")
private int pendingClosure;

Expand Down Expand Up @@ -1861,6 +1912,39 @@ private SessionPool(
this.initMetricsCollection(metricRegistry, labelValues);
}

/**
* @return the {@link Dialect} of the underlying database. This method will block until the
* dialect is available. It will potentially execute one or two RPCs to get the dialect if
* necessary: One to create a session if there are no sessions in the pool (yet), and one to
* query the database for the dialect that is used. It is recommended that clients that always
* need to know the dialect set {@link
* SessionPoolOptions.Builder#setAutoDetectDialect(boolean)} to true. This will ensure that
* the dialect is fetched automatically in a background task when a session pool is created.
*/
Dialect getDialect() {
boolean mustDetectDialect = false;
synchronized (lock) {
if (!detectDialectStarted) {
mustDetectDialect = true;
detectDialectStarted = true;
}
}
if (mustDetectDialect) {
try (PooledSessionFuture session = getSession()) {
dialect.set(session.get().determineDialect());
}
}
try {
return dialect.get(60L, TimeUnit.SECONDS);
} catch (ExecutionException executionException) {
throw SpannerExceptionFactory.asSpannerException(executionException);
} catch (InterruptedException interruptedException) {
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
} catch (TimeoutException timeoutException) {
throw SpannerExceptionFactory.propagateTimeout(timeoutException);
}
}

@VisibleForTesting
int getNumberOfSessionsInUse() {
synchronized (lock) {
Expand Down Expand Up @@ -2290,10 +2374,17 @@ public void onSessionReady(SessionImpl session) {
} else {
Preconditions.checkState(totalSessions() <= options.getMaxSessions() - 1);
allSessions.add(pooledSession);
// Release the session to a random position in the pool to prevent the case that a batch
// of sessions that are affiliated with the same channel are all placed sequentially in
// the pool.
releaseSession(pooledSession, Position.RANDOM);
if (options.isAutoDetectDialect() && !detectDialectStarted) {
// Get the dialect of the underlying database if that has not yet been done. Note that
// this method will release the session into the pool once it is done.
detectDialectStarted = true;
pooledSession.determineDialectAsync(SessionPool.this.dialect);
} else {
// Release the session to a random position in the pool to prevent the case that a batch
// of sessions that are affiliated with the same channel are all placed sequentially in
// the pool.
releaseSession(pooledSession, Position.RANDOM);
}
}
}
if (closeSession) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class SessionPoolOptions {
private final ActionOnSessionNotFound actionOnSessionNotFound;
private final ActionOnSessionLeak actionOnSessionLeak;
private final long initialWaitForSessionTimeoutMillis;
private final boolean autoDetectDialect;

private SessionPoolOptions(Builder builder) {
// minSessions > maxSessions is only possible if the user has only set a value for maxSessions.
Expand All @@ -67,6 +68,7 @@ private SessionPoolOptions(Builder builder) {
this.loopFrequency = builder.loopFrequency;
this.keepAliveIntervalMinutes = builder.keepAliveIntervalMinutes;
this.removeInactiveSessionAfter = builder.removeInactiveSessionAfter;
this.autoDetectDialect = builder.autoDetectDialect;
}

@Override
Expand All @@ -87,7 +89,8 @@ public boolean equals(Object o) {
this.initialWaitForSessionTimeoutMillis, other.initialWaitForSessionTimeoutMillis)
&& Objects.equals(this.loopFrequency, other.loopFrequency)
&& Objects.equals(this.keepAliveIntervalMinutes, other.keepAliveIntervalMinutes)
&& Objects.equals(this.removeInactiveSessionAfter, other.removeInactiveSessionAfter);
&& Objects.equals(this.removeInactiveSessionAfter, other.removeInactiveSessionAfter)
&& Objects.equals(this.autoDetectDialect, other.autoDetectDialect);
}

@Override
Expand All @@ -104,7 +107,8 @@ public int hashCode() {
this.initialWaitForSessionTimeoutMillis,
this.loopFrequency,
this.keepAliveIntervalMinutes,
this.removeInactiveSessionAfter);
this.removeInactiveSessionAfter,
this.autoDetectDialect);
}

public Builder toBuilder() {
Expand Down Expand Up @@ -163,6 +167,10 @@ public boolean isBlockIfPoolExhausted() {
return actionOnExhaustion == ActionOnExhaustion.BLOCK;
}

public boolean isAutoDetectDialect() {
return autoDetectDialect;
}

@VisibleForTesting
long getInitialWaitForSessionTimeoutMillis() {
return initialWaitForSessionTimeoutMillis;
Expand Down Expand Up @@ -220,6 +228,7 @@ public static class Builder {
private long loopFrequency = 10 * 1000L;
private int keepAliveIntervalMinutes = 30;
private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L);
private boolean autoDetectDialect = false;

public Builder() {}

Expand All @@ -237,6 +246,7 @@ private Builder(SessionPoolOptions options) {
this.loopFrequency = options.loopFrequency;
this.keepAliveIntervalMinutes = options.keepAliveIntervalMinutes;
this.removeInactiveSessionAfter = options.removeInactiveSessionAfter;
this.autoDetectDialect = options.autoDetectDialect;
}

/**
Expand Down Expand Up @@ -327,6 +337,24 @@ public Builder setBlockIfPoolExhausted() {
return this;
}

/**
* Sets whether the client should automatically execute a background query to detect the dialect
* that is used by the database or not. Set this option to true if you do not know what the
* dialect of the database will be.
*
* <p>Note that you can always call {@link DatabaseClient#getDialect()} to get the dialect of a
* database regardless of this setting, but by setting this to true, the value will be
* pre-populated and cached in the client.
*
* @param autoDetectDialect Whether the client should automatically execute a background query
* to detect the dialect of the underlying database
* @return this builder for chaining
*/
public Builder setAutoDetectDialect(boolean autoDetectDialect) {
this.autoDetectDialect = autoDetectDialect;
return this;
}

/**
* The initial number of milliseconds to wait for a session to become available when one is
* requested. The session pool will keep retrying to get a session, and the timeout will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final CallCredentialsProvider callCredentialsProvider;
private final CloseableExecutorProvider asyncExecutorProvider;
private final String compressorName;
private final Dialect dialect;

/**
* Interface that can be used to provide {@link CallCredentials} instead of {@link Credentials} to
Expand Down Expand Up @@ -593,7 +592,6 @@ private SpannerOptions(Builder builder) {
callCredentialsProvider = builder.callCredentialsProvider;
asyncExecutorProvider = builder.asyncExecutorProvider;
compressorName = builder.compressorName;
dialect = builder.dialect;
}

/**
Expand Down Expand Up @@ -693,7 +691,6 @@ public static class Builder
private CloseableExecutorProvider asyncExecutorProvider;
private String compressorName;
private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST");
private Dialect dialect = Dialect.GOOGLE_STANDARD_SQL;

private Builder() {
// Manually set retry and polling settings that work.
Expand Down Expand Up @@ -748,7 +745,6 @@ private Builder() {
this.channelProvider = options.channelProvider;
this.channelConfigurator = options.channelConfigurator;
this.interceptorProvider = options.interceptorProvider;
this.dialect = options.dialect;
}

@Override
Expand Down Expand Up @@ -779,7 +775,6 @@ protected Set<String> getAllowedClientLibTokens() {
* <li>{@link #setHost(String)}
* <li>{@link #setNumChannels(int)}
* <li>{@link #setInterceptorProvider(GrpcInterceptorProvider)}
* <li>{@link #setDialect(Dialect)}
* <li>{@link #setHeaderProvider(com.google.api.gax.rpc.HeaderProvider)}
* </ol>
*/
Expand Down Expand Up @@ -1139,16 +1134,6 @@ public Builder setEmulatorHost(String emulatorHost) {
return this;
}

/**
* Sets the {@link Dialect} to use with Cloud Spanner. The default is {@link
* Dialect#GOOGLE_STANDARD_SQL}.
*/
public Builder setDialect(Dialect dialect) {
Preconditions.checkNotNull(dialect);
this.dialect = dialect;
return this;
}

@SuppressWarnings("rawtypes")
@Override
public SpannerOptions build() {
Expand Down Expand Up @@ -1276,10 +1261,6 @@ public String getCompressorName() {
return compressorName;
}

public Dialect getDialect() {
return dialect;
}

/** Returns the default query options to use for the specific database. */
public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) {
// Use the specific query options for the database if any have been specified. These have
Expand Down
Loading

0 comments on commit 9eccfc4

Please sign in to comment.