Skip to content

Commit

Permalink
[#24927] Ensure the SDK harness handles JCL/log4j/log4j2 messages. (#…
Browse files Browse the repository at this point in the history
…24928)

* [#24927] Ensure the SDK harness handles JCL/log4j/log4j2 messages.

This routes JCL/log4j/log4j2 messages to SLF4 which is then routed to JUL and finally over the Fn Logging API to the runner.

Fixes #24927

* Fix up overrides
  • Loading branch information
lukecwik authored Jan 8, 2023
1 parent a388568 commit 95e5391
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ class BeamModulePlugin implements Plugin<Project> {
def jsr305_version = "3.0.2"
def everit_json_version = "1.14.1"
def kafka_version = "2.4.1"
def log4j2_version = "2.17.2"
def nemo_version = "0.1"
def netty_version = "4.1.77.Final"
def postgres_version = "42.2.16"
Expand Down Expand Up @@ -573,6 +574,7 @@ class BeamModulePlugin implements Plugin<Project> {
commons_csv : "org.apache.commons:commons-csv:1.8",
commons_io : "commons-io:commons-io:2.7",
commons_lang3 : "org.apache.commons:commons-lang3:3.9",
commons_logging : "commons-logging:commons-logging:1.2",
commons_math3 : "org.apache.commons:commons-math3:3.6.1",
dbcp2 : "org.apache.commons:commons-dbcp2:$dbcp2_version",
error_prone_annotations : "com.google.errorprone:error_prone_annotations:$errorprone_version",
Expand Down Expand Up @@ -675,6 +677,7 @@ class BeamModulePlugin implements Plugin<Project> {
jamm : 'io.github.stephankoelle:jamm:0.4.1',
jaxb_api : "jakarta.xml.bind:jakarta.xml.bind-api:$jaxb_api_version",
jaxb_impl : "com.sun.xml.bind:jaxb-impl:$jaxb_api_version",
jcl_over_slf4j : "org.slf4j:jcl-over-slf4j:$slf4j_version",
jmh_core : "org.openjdk.jmh:jmh-core:$jmh_version",
joda_time : "joda-time:joda-time:2.10.10",
jsonassert : "org.skyscreamer:jsonassert:1.5.0",
Expand All @@ -684,6 +687,12 @@ class BeamModulePlugin implements Plugin<Project> {
junit : "junit:junit:4.13.1",
kafka : "org.apache.kafka:kafka_2.11:$kafka_version",
kafka_clients : "org.apache.kafka:kafka-clients:$kafka_version",
log4j : "log4j:log4j:1.2.17",
log4j_over_slf4j : "org.slf4j:log4j-over-slf4j:$slf4j_version",
log4j2_api : "org.apache.logging.log4j:log4j-api:$log4j2_version",
log4j2_core : "org.apache.logging.log4j:log4j-core:$log4j2_version",
log4j2_to_slf4j : "org.apache.logging.log4j:log4j-to-slf4j:$log4j2_version",
log4j2_slf4j_impl : "org.apache.logging.log4j:log4j-slf4j-impl:$log4j2_version",
mockito_core : "org.mockito:mockito-core:3.7.7",
mockito_inline : "org.mockito:mockito-inline:4.5.1",
mongo_java_driver : "org.mongodb:mongo-java-driver:3.12.11",
Expand Down Expand Up @@ -711,9 +720,14 @@ class BeamModulePlugin implements Plugin<Project> {
sbe_tool : "uk.co.real-logic:sbe-tool:$sbe_tool_version",
singlestore_jdbc : "com.singlestore:singlestore-jdbc-client:$singlestore_jdbc_version",
slf4j_api : "org.slf4j:slf4j-api:$slf4j_version",
slf4j_simple : "org.slf4j:slf4j-simple:$slf4j_version",
slf4j_android : "org.slf4j:slf4j-android:$slf4j_version",
slf4j_ext : "org.slf4j:slf4j-ext:$slf4j_version",
slf4j_jdk14 : "org.slf4j:slf4j-jdk14:$slf4j_version",
slf4j_nop : "org.slf4j:slf4j-nop:$slf4j_version",
slf4j_simple : "org.slf4j:slf4j-simple:$slf4j_version",
slf4j_jul_to_slf4j : "org.slf4j:jul-to-slf4j:$slf4j_version",
slf4j_log4j12 : "org.slf4j:slf4j-log4j12:$slf4j_version",
slf4j_jcl : "org.slf4j:slf4j-jcl:$slf4j_version",
snappy_java : "org.xerial.snappy:snappy-java:1.1.8.4",
spark_core : "org.apache.spark:spark-core_2.11:$spark2_version",
spark_network_common : "org.apache.spark:spark-network-common_2.11:$spark2_version",
Expand Down
3 changes: 3 additions & 0 deletions sdks/java/container/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ ARG pull_licenses

ADD target/slf4j-api.jar /opt/apache/beam/jars/
ADD target/slf4j-jdk14.jar /opt/apache/beam/jars/
ADD target/jcl-over-slf4j.jar /opt/apache/beam/jars/
ADD target/log4j-over-slf4j.jar /opt/apache/beam/jars/
ADD target/log4j-to-slf4j.jar /opt/apache/beam/jars/
ADD target/beam-sdks-java-harness.jar /opt/apache/beam/jars/

# Required to run cross-language pipelines with KafkaIO
Expand Down
3 changes: 3 additions & 0 deletions sdks/java/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func main() {
cp := []string{
filepath.Join(jarsDir, "slf4j-api.jar"),
filepath.Join(jarsDir, "slf4j-jdk14.jar"),
filepath.Join(jarsDir, "jcl-over-slf4j.jar"),
filepath.Join(jarsDir, "log4j-over-slf4j.jar"),
filepath.Join(jarsDir, "log4j-to-slf4j.jar"),
filepath.Join(jarsDir, "beam-sdks-java-harness.jar"),
filepath.Join(jarsDir, "beam-sdks-java-io-kafka.jar"),
filepath.Join(jarsDir, "kafka-clients.jar"),
Expand Down
3 changes: 3 additions & 0 deletions sdks/java/container/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ configurations {
dependencies {
dockerDependency library.java.slf4j_api
dockerDependency library.java.slf4j_jdk14
dockerDependency library.java.jcl_over_slf4j
dockerDependency library.java.log4j_over_slf4j
dockerDependency library.java.log4j2_to_slf4j
dockerDependency project(path: ":sdks:java:harness", configuration: "shadow")
// For executing KafkaIO, e.g. as an external transform
dockerDependency project(":sdks:java:io:kafka")
Expand Down
3 changes: 3 additions & 0 deletions sdks/java/container/common.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ task copyDockerfileDependencies(type: Copy) {
from configurations.dockerDependency
rename 'slf4j-api.*', 'slf4j-api.jar'
rename 'slf4j-jdk14.*', 'slf4j-jdk14.jar'
rename 'jcl-over-slf4j.*', 'jcl-over-slf4j.jar'
rename 'log4j-over-slf4j.*', 'log4j-over-slf4j.jar'
rename 'log4j-to-slf4j.*', 'log4j-to-slf4j.jar'
if (imageJavaVersion == "11" || imageJavaVersion == "17") {
rename 'beam-sdks-java-container-agent.*.jar', 'open-module-agent.jar'
}
Expand Down
9 changes: 7 additions & 2 deletions sdks/java/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ applyJavaNature(
],
shadowClosure: {
dependencies {
include(dependency("org.apache.commons:.*"))
include(dependency(library.java.commons_compress))
include(dependency(library.java.commons_lang3))
include(dependency(library.java.antlr_runtime))
}
relocate "com.google.thirdparty", getJavaRelocatedPath("com.google.thirdparty")
relocate "org.apache.commons", getJavaRelocatedPath("org.apache.commons")
relocate "org.apache.commons.compress", getJavaRelocatedPath("org.apache.commons.compress")
relocate "org.apache.commons.lang3", getJavaRelocatedPath("org.apache.commons.lang3")
relocate "org.antlr.v4", getJavaRelocatedPath("org.antlr.v4")
},
)
Expand Down Expand Up @@ -114,6 +116,9 @@ dependencies {
shadowTest library.java.quickcheck_generators
shadowTest library.java.avro_tests
shadowTest library.java.zstd_jni
shadowTest library.java.commons_logging
shadowTest library.java.log4j
shadowTest library.java.log4j2_api
shadowTest library.java.jamm
testRuntimeOnly library.java.slf4j_jdk14
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,24 @@
*/
package org.apache.beam.sdk;

import static org.apache.beam.sdk.testing.ExpectedLogs.verifyLogged;
import static org.apache.beam.sdk.testing.ExpectedLogs.verifyNotLogged;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasItemInArray;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertNotNull;

import java.security.Security;
import java.util.logging.Level;
import java.util.logging.LogManager;
import javax.net.ssl.SSLContext;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.sdk.options.SdkHarnessOptions.LogLevel;
import org.apache.beam.sdk.options.SdkHarnessOptions.SdkHarnessLogLevelOverrides;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.ExpectedLogs.LogSaver;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment;
Expand Down Expand Up @@ -110,4 +119,81 @@ public void testTlsAvailable() throws Exception {

p.run().waitUntilFinish();
}

private static class LoggingDoFn extends DoFn<String, String> {
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> output) {
LogSaver logSaver = new LogSaver();
LogManager.getLogManager().getLogger("").addHandler(logSaver);

try {
Exception fooException = new RuntimeException("a.Foo-RuntimeException");
// Test the different log levels for various named loggers.
final org.slf4j.Logger fooLogger = org.slf4j.LoggerFactory.getLogger("a.Foo");
fooLogger.trace("a.Foo-Trace");
fooLogger.debug("a.Foo-Debug");
fooLogger.info("a.Foo-Info");
fooLogger.warn("a.Foo-Warn");
fooLogger.error("a.Foo-Error", fooException);

Exception barException = new RuntimeException("a.b.Bar-RuntimeException");
final org.slf4j.Logger barLogger = org.slf4j.LoggerFactory.getLogger("a.b.Bar");
barLogger.trace("a.b.Bar-Trace");
barLogger.debug("a.b.Bar-Debug");
barLogger.info("a.b.Bar-Info");
barLogger.warn("a.b.Bar-Warn");
barLogger.error("a.b.Bar-Error", barException);

// Test the different types of loggers (e.g. slf4j, jcl, jul, log4j, log4jc)
final org.slf4j.Logger slf4jLogger = org.slf4j.LoggerFactory.getLogger("logger.slf4j");
slf4jLogger.info("SLF4J log messages work");
final org.apache.commons.logging.Log jclLogger =
org.apache.commons.logging.LogFactory.getLog("logger.jcl");
jclLogger.info("JCL log messages work");
final java.util.logging.Logger julLogger = java.util.logging.Logger.getLogger("logger.jul");
julLogger.info("JUL log messages work");
final org.apache.log4j.Logger log4jLogger =
org.apache.log4j.Logger.getLogger("logger.log4j");
log4jLogger.info("Log4j log messages work");
final org.apache.logging.log4j.Logger log4j2Logger =
org.apache.logging.log4j.LogManager.getLogger("logger.log4j2");
log4j2Logger.info("Log4j2 log messages work");

verifyNotLogged(ExpectedLogs.matcher(Level.FINEST, "a.Foo-Trace"), logSaver);
verifyLogged(ExpectedLogs.matcher(Level.FINE, "a.Foo-Debug"), logSaver);
verifyLogged(ExpectedLogs.matcher(Level.INFO, "a.Foo-Info"), logSaver);
verifyLogged(ExpectedLogs.matcher(Level.WARNING, "a.Foo-Warn"), logSaver);
verifyLogged(ExpectedLogs.matcher(Level.SEVERE, "a.Foo-Error", fooException), logSaver);

verifyNotLogged(ExpectedLogs.matcher(Level.FINEST, "a.Foo-Trace"), logSaver);
verifyNotLogged(ExpectedLogs.matcher(Level.FINE, "a.b.Bar-Debug"), logSaver);
verifyNotLogged(ExpectedLogs.matcher(Level.INFO, "a.b.Bar-Info"), logSaver);
verifyLogged(ExpectedLogs.matcher(Level.WARNING, "a.b.Bar-Warn"), logSaver);
verifyLogged(ExpectedLogs.matcher(Level.SEVERE, "a.b.Bar-Error", barException), logSaver);

verifyLogged(ExpectedLogs.matcher(Level.INFO, "SLF4J log messages work"), logSaver);
verifyLogged(ExpectedLogs.matcher(Level.INFO, "JCL log messages work"), logSaver);
verifyLogged(ExpectedLogs.matcher(Level.INFO, "JUL log messages work"), logSaver);
verifyLogged(ExpectedLogs.matcher(Level.INFO, "Log4j log messages work"), logSaver);
verifyLogged(ExpectedLogs.matcher(Level.INFO, "Log4j2 log messages work"), logSaver);
output.output(element);
} finally {
LogManager.getLogManager().getLogger("").removeHandler(logSaver);
}
}
}

@Test
@Category({ValidatesRunner.class, UsesSdkHarnessEnvironment.class})
public void testLogging() throws Exception {
p.getOptions().as(SdkHarnessOptions.class).setDefaultSdkHarnessLogLevel(LogLevel.DEBUG);
p.getOptions()
.as(SdkHarnessOptions.class)
.setSdkHarnessLogLevelOverrides(
new SdkHarnessLogLevelOverrides().addOverrideForName("a.b.Bar", LogLevel.WARN));
PCollection<String> input = p.apply(Create.of("Logging Works").withCoder(StringUtf8Coder.of()));
PCollection<String> output = input.apply(ParDo.of(new LoggingDoFn()));
PAssert.that(output).containsInAnyOrder("Logging Works");
p.run().waitUntilFinish();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void verifyError(String substring, Throwable t) {
* @param substring The message to match against.
*/
public void verifyNotLogged(String substring) {
verifyNotLogged(matcher(substring));
verifyNotLogged(matcher(substring), logSaver);
}

/**
Expand All @@ -187,10 +187,10 @@ public void verifyLogRecords(Matcher<Iterable<LogRecord>> matcher) {
}

private void verify(final Level level, final String substring) {
verifyLogged(matcher(level, substring));
verifyLogged(matcher(level, substring), logSaver);
}

private TypeSafeMatcher<LogRecord> matcher(final String substring) {
public static TypeSafeMatcher<LogRecord> matcher(final String substring) {
return new TypeSafeMatcher<LogRecord>() {
@Override
public void describeTo(Description description) {
Expand All @@ -204,7 +204,7 @@ protected boolean matchesSafely(LogRecord item) {
};
}

private TypeSafeMatcher<LogRecord> matcher(final Level level, final String substring) {
public static TypeSafeMatcher<LogRecord> matcher(final Level level, final String substring) {
return new TypeSafeMatcher<LogRecord>() {
@Override
public void describeTo(Description description) {
Expand All @@ -220,14 +220,14 @@ protected boolean matchesSafely(LogRecord item) {
}

private void verify(final Level level, final String substring, final Throwable throwable) {
verifyLogged(matcher(level, substring, throwable));
verifyLogged(matcher(level, substring, throwable), logSaver);
}

private void verifyNo(final Level level, final String substring, final Throwable throwable) {
verifyNotLogged(matcher(level, substring, throwable));
verifyNotLogged(matcher(level, substring, throwable), logSaver);
}

private TypeSafeMatcher<LogRecord> matcher(
public static TypeSafeMatcher<LogRecord> matcher(
final Level level, final String substring, final Throwable throwable) {
return new TypeSafeMatcher<LogRecord>() {
@Override
Expand All @@ -249,7 +249,7 @@ protected boolean matchesSafely(LogRecord item) {
};
}

private void verifyLogged(Matcher<LogRecord> matcher) {
public static void verifyLogged(Matcher<LogRecord> matcher, LogSaver logSaver) {
for (LogRecord record : logSaver.getLogs()) {
if (matcher.matches(record)) {
return;
Expand All @@ -259,17 +259,18 @@ private void verifyLogged(Matcher<LogRecord> matcher) {
fail(String.format("Missing match for [%s]", matcher));
}

private void verifyNotLogged(Matcher<LogRecord> matcher) {
public static void verifyNotLogged(Matcher<LogRecord> matcher, LogSaver logSaver) {
// Don't use Matchers.everyItem(Matchers.not(matcher)) because it doesn't format the logRecord
for (LogRecord record : logSaver.getLogs()) {
if (matcher.matches(record)) {
fail(String.format("Unexpected match of [%s]: [%s]", matcher, logFormatter.format(record)));
fail(
String.format("Unexpected match of [%s]: [%s]", matcher, LOG_FORMATTER.format(record)));
}
}
}

@Override
protected void before() throws Throwable {
protected void before() {
previousLevel = log.getLevel();
log.setLevel(Level.ALL);
log.addHandler(logSaver);
Expand All @@ -282,9 +283,9 @@ protected void after() {
logSaver.reset();
}

private static final Formatter LOG_FORMATTER = new SimpleFormatter();
private final Logger log;
private final LogSaver logSaver;
private final Formatter logFormatter = new SimpleFormatter();
private Level previousLevel;

private ExpectedLogs(String name) {
Expand All @@ -294,7 +295,7 @@ private ExpectedLogs(String name) {

/** A JUL logging {@link Handler} that records all logging events that are passed to it. */
@ThreadSafe
private static class LogSaver extends Handler {
public static class LogSaver extends Handler {
private final Collection<LogRecord> logRecords = new ConcurrentLinkedDeque<>();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,8 @@ enableJavaPerformanceTesting()
description = "Apache Beam :: SDKs :: Java :: IO :: Elasticsearch-Tests :: 5.x"
ext.summary = "Tests of ElasticsearchIO on Elasticsearch 5.x"

def log4j_version = "2.17.1"
def elastic_search_version = "5.6.3"

configurations.all {
resolutionStrategy {
// Make sure the log4j versions for api and core match instead of taking the default
// Gradle rule of using the latest.
force "org.apache.logging.log4j:log4j-core:$log4j_version"
force "org.apache.logging.log4j:log4j-api:$log4j_version"
}
}

dependencies {
testImplementation project(path: ":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common", configuration: "testRuntimeMigration")
testImplementation library.java.testcontainers_elasticsearch
Expand All @@ -50,8 +40,8 @@ dependencies {
testImplementation library.java.hamcrest
testImplementation library.java.junit
testImplementation "org.elasticsearch.client:elasticsearch-rest-client:$elastic_search_version"
testRuntimeOnly "org.apache.logging.log4j:log4j-api:$log4j_version"
testRuntimeOnly "org.apache.logging.log4j:log4j-core:$log4j_version"
testRuntimeOnly library.java.log4j2_api
testRuntimeOnly library.java.log4j2_core
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,8 @@ enableJavaPerformanceTesting()
description = "Apache Beam :: SDKs :: Java :: IO :: Elasticsearch-Tests :: 6.x"
ext.summary = "Tests of ElasticsearchIO on Elasticsearch 6.x"

def log4j_version = "2.17.1"
def elastic_search_version = "6.4.0"

configurations.all {
resolutionStrategy {
// Make sure the log4j versions for api and core match instead of taking the default
// Gradle rule of using the latest.
force "org.apache.logging.log4j:log4j-core:$log4j_version"
force "org.apache.logging.log4j:log4j-api:$log4j_version"
}
}

dependencies {
testImplementation project(path: ":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common", configuration: "testRuntimeMigration")
testImplementation library.java.testcontainers_elasticsearch
Expand All @@ -49,8 +39,8 @@ dependencies {
testImplementation library.java.hamcrest
testImplementation library.java.junit
testImplementation "org.elasticsearch.client:elasticsearch-rest-client:$elastic_search_version"
testRuntimeOnly "org.apache.logging.log4j:log4j-api:$log4j_version"
testRuntimeOnly "org.apache.logging.log4j:log4j-core:$log4j_version"
testRuntimeOnly library.java.log4j2_api
testRuntimeOnly library.java.log4j2_core
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
}
Loading

0 comments on commit 95e5391

Please sign in to comment.