Skip to content

Commit

Permalink
[BEAM-13015] Add jamm as a java agent to the Java SDK harness contain…
Browse files Browse the repository at this point in the history
…er (#16412)

This allows for accurate object sizing for caching.
  • Loading branch information
lukecwik authored Jan 4, 2022
1 parent 5a11778 commit 122ae34
Show file tree
Hide file tree
Showing 23 changed files with 343 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_PVR_Spark_Batch',
rootBuildScriptDir(commonJobProperties.checkoutDir)
tasks(':runners:spark:2:job-server:validatesPortableRunnerBatch')
tasks(':runners:spark:3:job-server:validatesPortableRunnerBatch')
tasks(':runners:spark:2:job-server:validatesPortableRunnerDocker')
tasks(':runners:spark:3:job-server:validatesPortableRunnerDocker')
commonJobProperties.setGradleSwitches(delegate)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import CommonTestProperties
import PrecommitJobBuilder

// This job runs a limited subset of ValidatesRunner tests against the Flink runner in the docker environment.
PrecommitJobBuilder builder = new PrecommitJobBuilder(
scope: this,
nameBase: 'Java_PVR_Flink_Docker',
gradleTask: ":runners:flink:${CommonTestProperties.getFlinkVersion()}:job-server:validatesPortableRunnerDocker",
timeoutMins: 240,
triggerPathPatterns: [
'^sdks/java/core/src/test/java/org/apache/beam/sdk/.*$',
'^sdks/java/container/.*$',
'^sdks/java/harness/.*$',
'^runners/flink/.*$',
'^runners/java-fn-execution/.*$',
],
)
builder.build {
// Publish all test results to Jenkins.
publishers {
archiveJunit('**/build/test-results/**/*.xml')
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,10 @@ class BeamModulePlugin implements Plugin<Project> {
jackson_datatype_joda : "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jackson_version",
jackson_module_scala_2_11 : "com.fasterxml.jackson.module:jackson-module-scala_2.11:$jackson_version",
jackson_module_scala_2_12 : "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jackson_version",
// Swap to use the officially published version of 0.4.x once available
// instead of relying on a community published copy. See
// https://github.com/jbellis/jamm/issues/44 for additional details.
jamm : 'io.github.stephankoelle:jamm:0.4.1',
jaxb_api : "jakarta.xml.bind:jakarta.xml.bind-api:$jaxb_api_version",
jaxb_impl : "com.sun.xml.bind:jaxb-impl:$jaxb_api_version",
joda_time : "joda-time:joda-time:2.10.10",
Expand Down
4 changes: 4 additions & 0 deletions runners/direct-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ task needsRunnerTests(type: Test) {
testClassesDirs += files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
useJUnit {
includeCategories "org.apache.beam.sdk.testing.NeedsRunner"
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
excludeCategories "org.apache.beam.sdk.testing.LargeKeys\$Above100MB"
// MetricsPusher isn't implemented in direct runner
excludeCategories "org.apache.beam.sdk.testing.UsesMetricsPusher"
Expand Down Expand Up @@ -160,6 +162,8 @@ task validatesRunner(type: Test) {
testClassesDirs += files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
useJUnit {
includeCategories "org.apache.beam.sdk.testing.ValidatesRunner"
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
excludeCategories "org.apache.beam.sdk.testing.LargeKeys\$Above100MB"
excludeCategories 'org.apache.beam.sdk.testing.UsesMetricsPusher'
excludeCategories "org.apache.beam.sdk.testing.UsesCrossLanguageTransforms"
Expand Down
2 changes: 2 additions & 0 deletions runners/flink/flink_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ def createValidatesRunnerTask(Map m) {
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
} else {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}
excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
Expand Down
47 changes: 29 additions & 18 deletions runners/flink/job-server/flink_job_server.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ runShadow {
jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"]
}

def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpointing) {
def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpointing, boolean docker) {
def pipelineOptions = [
// Limit resource consumption via parallelism
"--parallelism=2",
Expand All @@ -145,15 +145,24 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpoi
testClasspathConfiguration: configurations.validatesPortableRunner,
numParallelTests: 1,
pipelineOpts: pipelineOptions,
environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
environment: docker ? BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.DOCKER : BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
testCategories: {
if (streaming && checkpointing) {
includeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
// TestStreamSource does not support checkpointing
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
} else {
if (docker) {
includeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
return
}

if (streaming && checkpointing) {
includeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
excludeCategories 'org.apache.beam.sdk.testing.UsesBoundedSplittableParDo'
// TestStreamSource does not support checkpointing
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
return
}

includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders'
// Larger keys are possible, but they require more memory.
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above10MB'
Expand All @@ -176,14 +185,14 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpoi
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithMultipleStages'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp'
} else {
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle'
return
}
}

excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle'
},
testFilter: {
// TODO(BEAM-10016)
Expand All @@ -200,11 +209,13 @@ def portableValidatesRunnerTask(String name, Boolean streaming, Boolean checkpoi
)
}

project.ext.validatesPortableRunnerBatch = portableValidatesRunnerTask("Batch", false, false)
project.ext.validatesPortableRunnerStreaming = portableValidatesRunnerTask("Streaming", true, false)
project.ext.validatesPortableRunnerStreamingCheckpoint = portableValidatesRunnerTask("StreamingCheckpointing", true, true)
project.ext.validatesPortableRunnerDocker = portableValidatesRunnerTask("Docker", false, false, true)
project.ext.validatesPortableRunnerBatch = portableValidatesRunnerTask("Batch", false, false, false)
project.ext.validatesPortableRunnerStreaming = portableValidatesRunnerTask("Streaming", true, false, false)
project.ext.validatesPortableRunnerStreamingCheckpoint = portableValidatesRunnerTask("StreamingCheckpointing", true, true, false)

task validatesPortableRunner() {
dependsOn validatesPortableRunnerDocker
dependsOn validatesPortableRunnerBatch
dependsOn validatesPortableRunnerStreaming
dependsOn validatesPortableRunnerStreamingCheckpoint
Expand Down
2 changes: 2 additions & 0 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ def runnerV2PipelineOptions = [
]

def commonLegacyExcludeCategories = [
// Should be run only in a properly configured SDK harness environment
'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment',
'org.apache.beam.sdk.testing.LargeKeys$Above10MB',
'org.apache.beam.sdk.testing.UsesAttemptedMetrics',
'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms',
Expand Down
2 changes: 2 additions & 0 deletions runners/jet/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ task validatesRunnerBatch(type: Test) {
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
excludeCategories "org.apache.beam.sdk.testing.LargeKeys\$Above100MB"
excludeCategories 'org.apache.beam.sdk.testing.UsesTimerMap'
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
Expand Down
2 changes: 2 additions & 0 deletions runners/portability/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ def createUlrValidatesRunnerTask = { name, environmentType, dockerImageTask = ""
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
Expand Down
2 changes: 2 additions & 0 deletions runners/samza/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ task validatesRunner(type: Test) {
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.NeedsRunner'
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesSchema'
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
Expand Down
45 changes: 31 additions & 14 deletions runners/samza/job-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,29 @@ runShadow {
args = []
}

def tempDir = File.createTempDir()
def pipelineOptions = [
"--configOverride={\"job.non-logged.store.base.dir\":\"" + tempDir + "\"}"
]
createPortableValidatesRunnerTask(
name: "validatesPortableRunner",
jobServerDriver: "org.apache.beam.runners.samza.SamzaJobServerDriver",
jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0",
testClasspathConfiguration: configurations.validatesPortableRunner,
numParallelTests: 1,
pipelineOpts: pipelineOptions,
environment: BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
testCategories: {
def portableValidatesRunnerTask(String name, boolean docker) {
def tempDir = File.createTempDir()
def pipelineOptions = [
"--configOverride={\"job.non-logged.store.base.dir\":\"" + tempDir + "\"}"
]
createPortableValidatesRunnerTask(
name: "validatesPortableRunner${name}",
jobServerDriver: "org.apache.beam.runners.samza.SamzaJobServerDriver",
jobServerConfig: "--job-host=localhost,--job-port=0,--artifact-port=0,--expansion-port=0",
testClasspathConfiguration: configurations.validatesPortableRunner,
numParallelTests: 1,
pipelineOpts: pipelineOptions,
environment: docker ? BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.DOCKER : BeamModulePlugin.PortableValidatesRunnerConfiguration.Environment.EMBEDDED,
testCategories: {
if (docker) {
includeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
return
}

includeCategories 'org.apache.beam.sdk.testing.NeedsRunner'
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
// TODO: BEAM-12350
excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics'
// TODO: BEAM-12681
Expand Down Expand Up @@ -173,7 +181,16 @@ createPortableValidatesRunnerTask(
// TODO(BEAM-13498)
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew'
}
)
)
}

project.ext.validatesPortableRunnerDocker = portableValidatesRunnerTask("Docker", true)
project.ext.validatesPortableRunnerEmbedded = portableValidatesRunnerTask("Embedded", false)

task validatesPortableRunner() {
dependsOn validatesPortableRunnerDocker
dependsOn validatesPortableRunnerEmbedded
}

def jobPort = BeamModulePlugin.getRandomPort()
def artifactPort = BeamModulePlugin.getRandomPort()
Expand Down
Loading

0 comments on commit 122ae34

Please sign in to comment.