From b40c276178ec553a28f3af025514e105969d03ee Mon Sep 17 00:00:00 2001 From: Ankur Goenka Date: Fri, 5 Oct 2018 14:06:24 -0700 Subject: [PATCH 1/2] [BEAM-4176] Enable Post Commit JAVA PVR tests for Flink --- ..._Java_PortableValidatesRunner_Flink.groovy | 43 +++++++++++++++++++ .../beam/gradle/BeamModulePlugin.groovy | 1 + ...ingFlinkExecutableStageContextFactory.java | 1 + 3 files changed, 45 insertions(+) create mode 100644 .test-infra/jenkins/job_PostCommit_Java_PortableValidatesRunner_Flink.groovy diff --git a/.test-infra/jenkins/job_PostCommit_Java_PortableValidatesRunner_Flink.groovy b/.test-infra/jenkins/job_PostCommit_Java_PortableValidatesRunner_Flink.groovy new file mode 100644 index 000000000000..ad09a0ab53d8 --- /dev/null +++ b/.test-infra/jenkins/job_PostCommit_Java_PortableValidatesRunner_Flink.groovy @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import CommonJobProperties as commonJobProperties +import PostcommitJobBuilder + +// This job runs the suite of ValidatesRunner tests against the Flink runner. +PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_PVR_Flink', + 'Run Java Flink PortableValidatesRunner', 'Java Flink PortableValidatesRunner Tests', this) { + description('Runs the Java PortableValidatesRunner suite on the Flink runner.') + + // Set common parameters. + commonJobProperties.setTopLevelMainJobProperties(delegate) + + // Publish all test results to Jenkins + publishers { + archiveJunit('**/build/test-results/**/*.xml') + } + + // Gradle goals for this job. + steps { + gradle { + rootBuildScriptDir(commonJobProperties.checkoutDir) + tasks(':beam-runners-flink_2.11-job-server:validatesPortableRunner') + commonJobProperties.setGradleSwitches(delegate) + } + } +} diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index d1a80dc5e29f..28548c09ddd1 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1499,6 +1499,7 @@ artifactId=${project.name} testClassesDirs = project.files(project.project(":beam-sdks-java-core").sourceSets.test.output.classesDirs, project.project(":beam-runners-core-java").sourceSets.test.output.classesDirs) maxParallelForks config.parallelism useJUnit(config.testCategories) + dependsOn ':beam-sdks-java-container:docker' } } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java index bb2b9dcbe168..90d291ea28ae 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java @@ -115,6 +115,7 @@ private void scheduleRelease(JobInfo jobInfo) { int environmentCacheTTLMillis = pipelineOptions.as(PortablePipelineOptions.class).getEnvironmentCacheMillis(); if (environmentCacheTTLMillis > 0) { + // Do immediate cleanup if this class is not loaded on Flink parent classloader. if (this.getClass().getClassLoader() != ExecutionEnvironment.class.getClassLoader()) { LOG.warn( "{} is not loaded on parent Flink classloader. " From ae34c9312dd0647e9333a55f2d4203ac52e65666 Mon Sep 17 00:00:00 2001 From: Ankur Goenka Date: Mon, 15 Oct 2018 17:32:18 -0700 Subject: [PATCH 2/2] [BEAM-4176] Adding start method back to the FlinkJobserverDriver --- .../org/apache/beam/runners/flink/FlinkJobServerDriver.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java index 679c7cc4bb90..93dc6f0121ca 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java @@ -170,6 +170,11 @@ public void run() { } } + public String start() throws IOException { + jobServer = createJobServer(); + return jobServer.getApiServiceDescriptor().getUrl(); + } + public void stop() { if (jobServer != null) { try {