Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Commit

Permalink
Add support for setting the default log level and also custom log lev…
Browse files Browse the repository at this point in the history
…el overrides on the Dataflow worker.

[]
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=87355253
  • Loading branch information
lukecwik authored and davorbonaci committed Feb 27, 2015
1 parent cd845a9 commit 0aa4d9d
Show file tree
Hide file tree
Showing 7 changed files with 371 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public interface DataflowPipelineOptions extends
PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions,
DataflowPipelineWorkerPoolOptions, BigQueryOptions,
GcsOptions, StreamingOptions, CloudDebuggerOptions {
GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions {

/**
* GCS path for temporary files.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.sdk.options;

import com.google.common.base.Preconditions;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;

import java.util.Arrays;

/**
* Options which are used to control logging configuration on the Dataflow worker.
*/
public interface DataflowWorkerLoggingOptions extends PipelineOptions {
/**
* The set of log levels which can be used on the Dataflow worker.
*/
public enum Level {
DEBUG, ERROR, INFO, TRACE, WARN
}

/**
* This option controls the default log level of all loggers without a
* log level override.
*/
@Default.Enum("INFO")
Level getDefaultWorkerLogLevel();
void setDefaultWorkerLogLevel(Level level);

/**
* This option controls the log levels for specifically named loggers.
* <p>
* Later options with equivalent names override earlier options.
* <p>
* See {@link WorkerLogLevelOverride} for more information on how to configure logging
* on a per {@link Class}, {@link Package}, or name basis.
*/
WorkerLogLevelOverride[] getWorkerLogLevelOverrides();
void setWorkerLogLevelOverrides(WorkerLogLevelOverride[] string);

/**
* Defines a log level override for a specific class, package, or name.
* <p>
* {@link java.util.logging} is used on the Dataflow worker harness and supports
* a logging hierarchy based off of names which are "." separated. It is a common
* pattern to have the logger for a given class share the same name as the class itself.
* Given the classes {@code a.b.c.Foo}, {@code a.b.c.Xyz}, and {@code a.b.Bar}, with
* loggers named {@code "a.b.c.Foo"}, {@code "a.b.c.Xyz"}, and {@code "a.b.Bar"} respectively,
* we can override the log levels:
* <ul>
* <li>for {@code Foo} by specifying the name {@code "a.b.c.Foo"} or the {@link Class}
* representing {@code a.b.c.Foo}.
* <li>for {@code Foo}, {@code Xyz}, and {@code Bar} by specifying the name {@code "a.b"} or
* the {@link Package} representing {@code a.b}.
* <li>for {@code Foo} and {@code Bar} by specifying both of their names or classes.
* </ul>
* Note that by specifying multiple overrides, the exact name followed by the closest parent
* takes precedence.
*/
public static class WorkerLogLevelOverride {
private static final String SEPARATOR = "#";

/**
* Overrides the default log level for the passed in class.
* <p>
* This is equivalent to calling {@link #forName(String, Level)} and
* passing in the {@link Class#getName() class name}.
*/
public static WorkerLogLevelOverride forClass(Class<?> klass, Level level) {
Preconditions.checkNotNull(klass, "Expected class to be not null.");
return forName(klass.getName(), level);
}

/**
* Overrides the default log level for the passed in package.
* <p>
* This is equivalent to calling {@link #forName(String, Level)} and
* passing in the {@link Package#getName() package name}.
*/
public static WorkerLogLevelOverride forPackage(Package pkg, Level level) {
Preconditions.checkNotNull(pkg, "Expected package to be not null.");
return forName(pkg.getName(), level);
}

/**
* Overrides the default log level for the passed in name.
* <p>
* Note that because of the hierarchical nature of logger names, this will
* override the log level of all loggers which have the passed in name or
* a parent logger which has the passed in name.
*/
public static WorkerLogLevelOverride forName(String name, Level level) {
Preconditions.checkNotNull(name, "Expected name to be not null.");
Preconditions.checkNotNull(level,
"Expected level to be one of %s.", Arrays.toString(Level.values()));
return new WorkerLogLevelOverride(name, level);
}

/**
* Expects a value of the form {@code Name#Level}.
*/
@JsonCreator
public static WorkerLogLevelOverride create(String value) {
Preconditions.checkNotNull(value, "Expected value to be not null.");
Preconditions.checkArgument(value.contains(SEPARATOR),
"Expected '#' separator but none found within '%s'.", value);
String[] parts = value.split(SEPARATOR, 2);
Level level;
try {
level = Level.valueOf(parts[1]);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(String.format(
"Unsupported log level '%s' requested. Must be one of %s.",
parts[1], Arrays.toString(Level.values())));
}
return forName(parts[0], level);
}

private final String name;
private final Level level;
private WorkerLogLevelOverride(String name, Level level) {
this.name = name;
this.level = level;
}

public String getName() {
return name;
}

public Level getLevel() {
return level;
}

@JsonValue
@Override
public String toString() {
return name + SEPARATOR + level;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,12 @@ public void uncaughtException(Thread t, Throwable e) {
*/
public static void main(String[] args) throws Exception {
Thread.currentThread().setUncaughtExceptionHandler(WorkerUncaughtExceptionHandler.INSTANCE);
new DataflowWorkerLoggingInitializer().initialize();
DataflowWorkerLoggingInitializer.initialize();

DataflowWorkerHarnessOptions pipelineOptions =
PipelineOptionsFactory.createFromSystemProperties();
DataflowWorkerLoggingInitializer.configure(pipelineOptions);

final DataflowWorker worker = create(pipelineOptions);
processWork(pipelineOptions, worker);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,15 @@ static MapTask parseMapTask(String input) throws IOException {
}

public static void main(String[] args) throws Exception {
new DataflowWorkerLoggingInitializer().initialize();
DataflowWorkerLoggingInitializer.initialize();
DataflowWorkerHarnessOptions options =
PipelineOptionsFactory.createFromSystemProperties();
// TODO: Remove setting these options once we have migrated to passing
// through the pipeline options.
options.setAppName("StreamingWorkerHarness");
options.setStreaming(true);

DataflowWorkerLoggingInitializer.configure(options);
String hostport = System.getProperty("windmill.hostport");
if (hostport == null) {
throw new Exception("-Dwindmill.hostport must be set to the location of the windmill server");
Expand All @@ -112,13 +119,6 @@ public static void main(String[] args) throws Exception {
(WindmillServerStub) Class.forName(WINDMILL_SERVER_CLASS_NAME)
.getDeclaredConstructor(String.class).newInstance(hostport);

DataflowWorkerHarnessOptions options =
PipelineOptionsFactory.createFromSystemProperties();
// TODO: Remove setting these options once we have migrated to passing
// through the pipeline options.
options.setAppName("StreamingWorkerHarness");
options.setStreaming(true);

StreamingDataflowWorker worker =
new StreamingDataflowWorker(mapTasks, windmillServer, options);
worker.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,20 @@

package com.google.cloud.dataflow.sdk.runners.worker.logging;

import static com.google.cloud.dataflow.sdk.options.DataflowWorkerLoggingOptions.Level.DEBUG;
import static com.google.cloud.dataflow.sdk.options.DataflowWorkerLoggingOptions.Level.ERROR;
import static com.google.cloud.dataflow.sdk.options.DataflowWorkerLoggingOptions.Level.INFO;
import static com.google.cloud.dataflow.sdk.options.DataflowWorkerLoggingOptions.Level.TRACE;
import static com.google.cloud.dataflow.sdk.options.DataflowWorkerLoggingOptions.Level.WARN;

import com.google.api.client.util.Lists;
import com.google.cloud.dataflow.sdk.options.DataflowWorkerLoggingOptions;
import com.google.cloud.dataflow.sdk.options.DataflowWorkerLoggingOptions.WorkerLogLevelOverride;
import com.google.common.collect.ImmutableBiMap;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.logging.FileHandler;
import java.util.logging.Formatter;
import java.util.logging.Handler;
Expand All @@ -28,47 +38,60 @@
import java.util.logging.Logger;

/**
* Sets up java.util.Logging configuration on the Dataflow Worker Harness with a
* console and file logger. The console and file loggers use the
* {@link DataflowWorkerLoggingFormatter} format. A user can override
* the logging level and location by specifying the Java system properties
* "dataflow.worker.logging.level" and "dataflow.worker.logging.location" respectively.
* The default log level is INFO and the default location is a file named dataflow-worker.log
* within the systems temporary directory.
* Sets up {@link java.util.logging} configuration on the Dataflow worker with a
* file logger. The file logger uses the {@link DataflowWorkerLoggingFormatter} format.
* A user can override the logging level by customizing the options found within
* {@link DataflowWorkerLoggingOptions}. A user can override the location by specifying the
* Java system property "dataflow.worker.logging.location". The default log level is INFO
* and the default location is a file named dataflow-worker.log within the systems temporary
* directory.
*/
public class DataflowWorkerLoggingInitializer {
private static final String DEFAULT_LOGGING_LOCATION =
new File(System.getProperty("java.io.tmpdir"), "dataflow-worker.log").getPath();
private static final String ROOT_LOGGER_NAME = "";
public static final String DATAFLOW_WORKER_LOGGING_LEVEL = "dataflow.worker.logging.level";
public static final String DATAFLOW_WORKER_LOGGING_LOCATION = "dataflow.worker.logging.location";
public static final ImmutableBiMap<Level, String> LEVELS =
ImmutableBiMap.<Level, String>builder()
.put(Level.SEVERE, "ERROR")
.put(Level.WARNING, "WARNING")
.put(Level.INFO, "INFO")
.put(Level.FINE, "DEBUG")
.put(Level.FINEST, "TRACE")
private static final String DATAFLOW_WORKER_LOGGING_LOCATION = "dataflow.worker.logging.location";
static final ImmutableBiMap<Level, DataflowWorkerLoggingOptions.Level> LEVELS =
ImmutableBiMap.<Level, DataflowWorkerLoggingOptions.Level>builder()
.put(Level.SEVERE, ERROR)
.put(Level.WARNING, WARN)
.put(Level.INFO, INFO)
.put(Level.FINE, DEBUG)
.put(Level.FINEST, TRACE)
.build();
private static final String DEFAULT_LOG_LEVEL = LEVELS.get(Level.INFO);

public void initialize() {
initialize(LogManager.getLogManager());
}
/**
* This default log level is overridden by the log level found at
* {@code DataflowWorkerLoggingOptions#getDefaultWorkerLogLevel()}.
*/
private static final DataflowWorkerLoggingOptions.Level DEFAULT_LOG_LEVEL =
LEVELS.get(Level.INFO);

/* We need to store a reference to the configured loggers so that they are not
* garbage collected. java.util.logging only has weak references to the loggers
* so if they are garbage collection, our hierarchical configuration will be lost. */
private static List<Logger> configuredLoggers = Lists.newArrayList();
private static FileHandler fileHandler;

void initialize(LogManager logManager) {
/**
* Sets up the initial logging configuration.
*/
public static synchronized void initialize() {
if (fileHandler != null) {
return;
}
try {
Level logLevel = LEVELS.inverse().get(
System.getProperty(DATAFLOW_WORKER_LOGGING_LEVEL, DEFAULT_LOG_LEVEL));
Level logLevel = LEVELS.inverse().get(DEFAULT_LOG_LEVEL);
Formatter formatter = new DataflowWorkerLoggingFormatter();

FileHandler fileHandler = new FileHandler(
fileHandler = new FileHandler(
System.getProperty(DATAFLOW_WORKER_LOGGING_LOCATION, DEFAULT_LOGGING_LOCATION),
true /* Append so that we don't squash existing logs */);
fileHandler.setFormatter(formatter);
fileHandler.setLevel(logLevel);
fileHandler.setLevel(Level.ALL);

// Reset the global log manager, get the root logger and remove the default log handlers.
LogManager logManager = LogManager.getLogManager();
logManager.reset();
Logger rootLogger = logManager.getLogger(ROOT_LOGGER_NAME);
for (Handler handler : rootLogger.getHandlers()) {
Expand All @@ -81,4 +104,34 @@ void initialize(LogManager logManager) {
throw new ExceptionInInitializerError(e);
}
}

/**
* Reconfigures logging with the passed in options.
*/
public static synchronized void configure(DataflowWorkerLoggingOptions options) {
initialize();
if (options.getDefaultWorkerLogLevel() != null) {
LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME).setLevel(
LEVELS.inverse().get(options.getDefaultWorkerLogLevel()));
}
/* We store a reference to all the custom loggers the user configured.
* To make sure that these custom levels override the default logger level,
* we break the parent chain and have the logger directly pass log records
* to the file handler. */
if (options.getWorkerLogLevelOverrides() != null) {
for (WorkerLogLevelOverride loggerOverride : options.getWorkerLogLevelOverrides()) {
Logger logger = Logger.getLogger(loggerOverride.getName());
logger.setUseParentHandlers(false);
logger.setLevel(LEVELS.inverse().get(loggerOverride.getLevel()));
logger.addHandler(fileHandler);
configuredLoggers.add(logger);
}
}
}

// Visible for testing
static void reset() {
configuredLoggers = Lists.newArrayList();
fileHandler = null;
}
}
Loading

0 comments on commit 0aa4d9d

Please sign in to comment.