Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sql): use a connection pool named "read" for some read operations in SqlExecutionRepository #4803

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ import java.util.Optional
import org.jooq.DSLContext
import org.junit.jupiter.api.extension.ExtendWith
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.test.context.junit.jupiter.SpringExtension
import javax.sql.DataSource

@Configuration
class SqlTestConfig {
Expand Down Expand Up @@ -121,7 +123,8 @@ class SqlTestConfig {
registry: Registry,
properties: SqlProperties,
orcaSqlProperties: OrcaSqlProperties,
compressionProperties: ExecutionCompressionProperties
compressionProperties: ExecutionCompressionProperties,
dataSource: DataSource
) = SqlExecutionRepository(
orcaSqlProperties.partitionName,
dsl,
Expand All @@ -131,7 +134,8 @@ class SqlTestConfig {
orcaSqlProperties.stageReadSize,
interlink = null,
compressionProperties = compressionProperties,
pipelineRefEnabled = false
pipelineRefEnabled = false,
dataSource = dataSource
)

@Bean
Expand Down Expand Up @@ -192,4 +196,7 @@ class SqlTestConfig {
"spring.application.name=orcaTest"
]
)
class SqlQueueIntegrationTest : QueueIntegrationTest()
class SqlQueueIntegrationTest : QueueIntegrationTest() {
@MockBean
var dataSource: DataSource? = null
}
2 changes: 2 additions & 0 deletions orca-sql/orca-sql.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ dependencies {
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("dev.minutest:minutest")
testImplementation("com.nhaarman:mockito-kotlin")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.testcontainers:mysql")
testImplementation("org.testcontainers:postgresql")

testRuntimeOnly("com.mysql:mysql-connector-j")
testRuntimeOnly("org.postgresql:postgresql")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
testRuntimeOnly(project(":keiko-sql")) // so SpringLiquibaseProxy has changelog-keiko.yml
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ import com.netflix.spinnaker.orca.sql.SqlHealthcheckActivator
import com.netflix.spinnaker.orca.sql.pipeline.persistence.ExecutionStatisticsRepository
import com.netflix.spinnaker.orca.sql.pipeline.persistence.SqlExecutionRepository
import com.netflix.spinnaker.orca.sql.telemetry.SqlActiveExecutionsMonitor
import java.time.Clock
import java.util.Optional
import javax.sql.DataSource
import liquibase.integration.spring.SpringLiquibase
import net.javacrumbs.shedlock.core.LockProvider
import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider
Expand All @@ -49,10 +52,11 @@ import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.*
import java.time.Clock
import java.util.*
import javax.sql.DataSource
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
import org.springframework.context.annotation.Primary

@Configuration
@ConditionalOnProperty("sql.enabled")
Expand All @@ -78,7 +82,8 @@ class SqlConfiguration {
interlink: Optional<Interlink>,
executionRepositoryListeners: Collection<ExecutionRepositoryListener>,
compressionProperties: ExecutionCompressionProperties,
pipelineRefProperties: PipelineRefProperties
pipelineRefProperties: PipelineRefProperties,
dataSource: DataSource
) =
SqlExecutionRepository(
orcaSqlProperties.partitionName,
Expand All @@ -90,7 +95,8 @@ class SqlConfiguration {
interlink = interlink.orElse(null),
executionRepositoryListeners = executionRepositoryListeners,
compressionProperties = compressionProperties,
pipelineRefEnabled = pipelineRefProperties.enabled
pipelineRefEnabled = pipelineRefProperties.enabled,
dataSource = dataSource
).let {
InstrumentedProxy.proxy(registry, it, "sql.executions", mapOf(Pair("repository", "primary"))) as ExecutionRepository
}
Expand All @@ -105,7 +111,8 @@ class SqlConfiguration {
orcaSqlProperties: OrcaSqlProperties,
@Value("\${execution-repository.sql.secondary.pool-name}") poolName: String,
compressionProperties: ExecutionCompressionProperties,
pipelineRefProperties: PipelineRefProperties
pipelineRefProperties: PipelineRefProperties,
dataSource: DataSource
) =
SqlExecutionRepository(
orcaSqlProperties.partitionName,
Expand All @@ -116,7 +123,8 @@ class SqlConfiguration {
orcaSqlProperties.stageReadSize,
poolName,
compressionProperties = compressionProperties,
pipelineRefEnabled = pipelineRefProperties.enabled
pipelineRefEnabled = pipelineRefProperties.enabled,
dataSource = dataSource
).let {
InstrumentedProxy.proxy(registry, it, "sql.executions", mapOf(Pair("repository", "secondary"))) as ExecutionRepository
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ import org.jooq.impl.DSL.table
import org.jooq.impl.DSL.timestampSub
import org.jooq.impl.DSL.value
import org.slf4j.LoggerFactory
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource
import rx.Observable
import java.io.ByteArrayOutputStream
import java.nio.charset.StandardCharsets
import javax.sql.DataSource

/**
* A generic SQL [ExecutionRepository].
Expand All @@ -96,10 +98,12 @@ class SqlExecutionRepository(
private val batchReadSize: Int = 10,
private val stageReadSize: Int = 200,
private val poolName: String = "default",
internal var readPoolName: String = "read", /* internal for testing */
private val interlink: Interlink? = null,
private val executionRepositoryListeners: Collection<ExecutionRepositoryListener> = emptyList(),
private val compressionProperties: ExecutionCompressionProperties,
private val pipelineRefEnabled: Boolean
private val pipelineRefEnabled: Boolean,
private val dataSource: DataSource
) : ExecutionRepository, ExecutionStatisticsRepository {
companion object {
val ulid = SpinULID(SecureRandom())
Expand All @@ -109,7 +113,13 @@ class SqlExecutionRepository(
private val log = LoggerFactory.getLogger(javaClass)

init {
log.info("Creating SqlExecutionRepository with partition=$partitionName and pool=$poolName")
// If there's no read pool configured, fall back to the default pool
if ((dataSource !is AbstractRoutingDataSource)
|| (dataSource.resolvedDataSources[readPoolName] == null)) {
readPoolName = poolName
}

log.info("Creating SqlExecutionRepository with partition=$partitionName, pool=$poolName, readPool=$readPoolName")

try {
withPool(poolName) {
Expand Down Expand Up @@ -158,10 +168,8 @@ class SqlExecutionRepository(
validateHandledPartitionOrThrow(execution)

withPool(poolName) {
jooq.transactional {
it.delete(execution.type.stagesTableName)
.where(stageId.toWhereCondition()).execute()
}
jooq.delete(execution.type.stagesTableName)
.where(stageId.toWhereCondition()).execute()
}
}

Expand Down Expand Up @@ -253,7 +261,7 @@ class SqlExecutionRepository(
}

override fun isCanceled(type: ExecutionType, id: String): Boolean {
withPool(poolName) {
withPool(readPoolName) {
return jooq.fetchExists(
jooq.selectFrom(type.tableName)
.where(id.toWhereCondition())
Expand Down Expand Up @@ -388,7 +396,7 @@ class SqlExecutionRepository(
}

private fun retrieve(type: ExecutionType, criteria: ExecutionCriteria, partition: String?): Observable<PipelineExecution> {
withPool(poolName) {
withPool(readPoolName) {
val select = jooq.selectExecutions(
type,
fields = selectExecutionFields(compressionProperties) + field("status"),
Expand Down Expand Up @@ -417,7 +425,7 @@ class SqlExecutionRepository(
}

override fun retrievePipelinesForApplication(application: String): Observable<PipelineExecution> =
withPool(poolName) {
withPool(readPoolName) {
Observable.from(
fetchExecutions { pageSize, cursor ->
selectExecutions(PIPELINE, pageSize, cursor) {
Expand All @@ -434,7 +442,7 @@ class SqlExecutionRepository(
// When not filtering by status, provide an index hint to ensure use of `pipeline_config_id_idx` which
// fully satisfies the where clause and order by. Without, some lookups by config_id matching thousands
// of executions triggered costly full table scans.
withPool(poolName) {
withPool(readPoolName) {
val select = if (criteria.statuses.isEmpty() || criteria.statuses.size == ExecutionStatus.values().size) {
jooq.selectExecutions(
PIPELINE,
Expand Down Expand Up @@ -478,7 +486,7 @@ class SqlExecutionRepository(
criteria: ExecutionCriteria,
sorter: ExecutionComparator?
): MutableList<PipelineExecution> {
withPool(poolName) {
withPool(readPoolName) {
return jooq.selectExecutions(
ORCHESTRATION,
conditions = {
Expand Down Expand Up @@ -516,7 +524,7 @@ class SqlExecutionRepository(
}

override fun retrieveOrchestrationForCorrelationId(correlationId: String): PipelineExecution {
withPool(poolName) {
withPool(readPoolName) {
val execution = jooq.selectExecution(ORCHESTRATION, compressionProperties)
.where(
field("id").eq(
Expand All @@ -530,21 +538,27 @@ class SqlExecutionRepository(
)
.fetchExecution()

if (execution != null) {
if (!execution.status.isComplete) {
return execution
}
jooq.transactional {
it.deleteFrom(table("correlation_ids")).where(field("id").eq(correlationId)).execute()
}
if (execution == null) {
throw ExecutionNotFoundException("No Orchestration found for correlation ID $correlationId")
}

throw ExecutionNotFoundException("No Orchestration found for correlation ID $correlationId")
if (!execution.status.isComplete) {
return execution
}
}

// If we get here, there's an execution with the given correlation id, but
// it's complete, so clean up the correlation_ids table.
withPool(poolName) {
jooq.deleteFrom(table("correlation_ids")).where(field("id").eq(correlationId)).execute()
}

// Treat a completed execution similar to not finding one at all.
throw ExecutionNotFoundException("Complete Orchestration found for correlation ID $correlationId")
}

override fun retrievePipelineForCorrelationId(correlationId: String): PipelineExecution {
withPool(poolName) {
withPool(readPoolName) {
val execution = jooq.selectExecution(PIPELINE, compressionProperties)
.where(
field("id").eq(
Expand All @@ -558,17 +572,22 @@ class SqlExecutionRepository(
)
.fetchExecution()

if (execution != null) {
if (!execution.status.isComplete) {
return execution
}
jooq.transactional {
it.deleteFrom(table("correlation_ids")).where(field("id").eq(correlationId)).execute()
}
if (execution == null) {
throw ExecutionNotFoundException("No Pipeline found for correlation ID $correlationId")
}

throw ExecutionNotFoundException("No Pipeline found for correlation ID $correlationId")
if (!execution.status.isComplete) {
return execution
}
}

// If we get here, there's an execution with the given correlation id, but
// it's complete, so clean up the correlation_ids table.
withPool(poolName) {
jooq.deleteFrom(table("correlation_ids")).where(field("id").eq(correlationId)).execute()
}

throw ExecutionNotFoundException("Complete Pipeline found for correlation ID $correlationId")
}

override fun retrieveBufferedExecutions(): MutableList<PipelineExecution> =
Expand All @@ -583,7 +602,7 @@ class SqlExecutionRepository(
}

override fun retrieveAllApplicationNames(type: ExecutionType?): List<String> {
withPool(poolName) {
withPool(readPoolName) {
return if (type == null) {
jooq.select(field("application"))
.from(PIPELINE.tableName)
Expand All @@ -606,7 +625,7 @@ class SqlExecutionRepository(
}

override fun retrieveAllApplicationNames(type: ExecutionType?, minExecutions: Int): List<String> {
withPool(poolName) {
withPool(readPoolName) {
return if (type == null) {
jooq.select(field("application"))
.from(PIPELINE.tableName)
Expand All @@ -632,7 +651,7 @@ class SqlExecutionRepository(
}

override fun countActiveExecutions(): ActiveExecutionsReport {
withPool(poolName) {
withPool(readPoolName) {
val partitionPredicate = if (partitionName != null) field(name("partition")).eq(partitionName) else value(1).eq(value(1))

val orchestrationsQuery = jooq.selectCount()
Expand Down Expand Up @@ -661,7 +680,7 @@ class SqlExecutionRepository(
buildTimeEndBoundary: Long,
executionCriteria: ExecutionCriteria
): List<PipelineExecution> {
withPool(poolName) {
withPool(readPoolName) {
val select = jooq.select(selectExecutionFields(compressionProperties))
.from(PIPELINE.tableName)
.join(
Expand Down Expand Up @@ -741,7 +760,7 @@ class SqlExecutionRepository(
}

override fun hasExecution(type: ExecutionType, id: String): Boolean {
withPool(poolName) {
withPool(readPoolName) {
return jooq.selectCount()
.from(type.tableName)
.where(id.toWhereCondition())
Expand All @@ -750,7 +769,7 @@ class SqlExecutionRepository(
}

override fun retrieveAllExecutionIds(type: ExecutionType): MutableList<String> {
withPool(poolName) {
withPool(readPoolName) {
return jooq.select(field("id")).from(type.tableName).fetch("id", String::class.java)
}
}
Expand All @@ -770,7 +789,7 @@ class SqlExecutionRepository(
): Pair<String, String?> {
if (isULID(id)) return Pair(id, null)

withPool(poolName) {
withPool(readPoolName) {
val ts = (timestamp ?: System.currentTimeMillis())
val row = ctx.select(field("id"))
.from(table)
Expand Down Expand Up @@ -1109,14 +1128,10 @@ class SqlExecutionRepository(
private fun selectExecution(
ctx: DSLContext,
type: ExecutionType,
id: String,
forUpdate: Boolean = false
id: String
): PipelineExecution? {
withPool(poolName) {
val select = ctx.selectExecution(type, compressionProperties).where(id.toWhereCondition())
if (forUpdate) {
select.forUpdate()
}
return select.fetchExecution()
}
}
Expand All @@ -1127,7 +1142,7 @@ class SqlExecutionRepository(
cursor: String?,
where: ((SelectJoinStep<Record>) -> SelectConditionStep<Record>)? = null
): Collection<PipelineExecution> {
withPool(poolName) {
withPool(readPoolName) {
val select = jooq.selectExecutions(
type,
conditions = {
Expand Down
Loading