Skip to content

Commit

Permalink
Log user code exceptions in Java (#28514)
Browse files Browse the repository at this point in the history
* Fix transform id not set during exceptions

* spotless

* spotless

---------

Co-authored-by: Sam Rohde <[email protected]>
  • Loading branch information
rohdesamuel and Sam Rohde authored Sep 19, 2023
1 parent 272da41 commit 6261a00
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,6 @@ private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String id) {
executionStateSampler,
processWideCache,
dataSampler);
logging.setProcessBundleHandler(processBundleHandler);

BeamFnStatusClient beamFnStatusClient = null;
if (statusApiServiceDescriptor != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ public void onNext(BeamFnApi.InstructionRequest request) {
sendErrorResponse(e);
throw e;
} finally {
BeamFnLoggingMDC.setInstructionId(null);
BeamFnLoggingMDC.reset();
}
});
} finally {
BeamFnLoggingMDC.setInstructionId(null);
BeamFnLoggingMDC.reset();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessor;
import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings;
Expand Down Expand Up @@ -120,6 +121,14 @@ public interface ExecutionState {
* <p>Must only be invoked by the bundle processing thread.
*/
void deactivate();

/**
* Sets the error state to the currently executing state. Returns true if this was the first
* time the error was set. Returns false otherwise.
*
* <p>This can only be set once.
*/
boolean error();
}

/** Stops the execution of the state sampler. */
Expand Down Expand Up @@ -250,6 +259,8 @@ public class ExecutionStateTracker implements BundleProgressReporter {
private @Nullable ExecutionStateImpl currentState;
// Read by multiple threads, written by the bundle processing thread lazily.
private final AtomicReference<@Nullable ExecutionStateImpl> currentStateLazy;
// If an exception occurs, this will be to state at the time of exception.
private boolean inErrorState = false;
// Read and written by the ExecutionStateSampler thread
private long transitionsAtLastSample;

Expand Down Expand Up @@ -465,6 +476,15 @@ public void deactivate() {
numTransitions += 1;
numTransitionsLazy.lazySet(numTransitions);
}

@Override
public boolean error() {
if (!inErrorState) {
inErrorState = true;
return true;
}
return false;
}
}

/**
Expand All @@ -473,6 +493,7 @@ public void deactivate() {
* <p>Only invoked by the bundle processing thread.
*/
public void start(String processBundleId) {
BeamFnLoggingMDC.setStateTracker(this);
this.processBundleId.lazySet(processBundleId);
this.lastTransitionTime.lazySet(clock.getMillis());
this.trackedThread.lazySet(Thread.currentThread());
Expand Down Expand Up @@ -514,6 +535,8 @@ public void reset() {
this.numTransitionsLazy.lazySet(0);
this.lastTransitionTime.lazySet(0);
this.metricsContainerRegistry.reset();
this.inErrorState = false;
BeamFnLoggingMDC.setStateTracker(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The {@code PCollectionConsumerRegistry} is used to maintain a collection of consuming
Expand Down Expand Up @@ -97,6 +99,7 @@ public static ConsumerAndMetadata forConsumer(
private final ProcessBundleDescriptor processBundleDescriptor;
private final RehydratedComponents rehydratedComponents;
private final @Nullable DataSampler dataSampler;
private static final Logger LOG = LoggerFactory.getLogger(PCollectionConsumerRegistry.class);

public PCollectionConsumerRegistry(
ExecutionStateTracker stateTracker,
Expand Down Expand Up @@ -242,6 +245,26 @@ public FnDataReceiver<WindowedValue<?>> getMultiplexingConsumer(String pCollecti
});
}

private static <T> void logAndRethrow(
Exception e,
ExecutionState executionState,
ExecutionStateTracker executionStateTracker,
String ptransformId,
@Nullable OutputSampler<T> outputSampler,
@Nullable ElementSample<T> elementSample)
throws Exception {
ExecutionStateSampler.ExecutionStateTrackerStatus status = executionStateTracker.getStatus();
String processBundleId = status == null ? null : status.getProcessBundleId();
if (outputSampler != null) {
outputSampler.exception(elementSample, e, ptransformId, processBundleId);
}

if (executionState.error()) {
LOG.error("Failed to process element for bundle \"{}\"", processBundleId, e);
}
throw e;
}

/**
* A wrapping {@code FnDataReceiver<WindowedValue<T>>} which counts the number of elements
* consumed by the original {@code FnDataReceiver<WindowedValue<T>> consumer} and sets up metrics
Expand Down Expand Up @@ -324,13 +347,8 @@ public void accept(WindowedValue<T> input) throws Exception {
try {
this.delegate.accept(input);
} catch (Exception e) {
if (outputSampler != null) {
ExecutionStateSampler.ExecutionStateTrackerStatus status =
executionStateTracker.getStatus();
String processBundleId = status == null ? null : status.getProcessBundleId();
outputSampler.exception(elementSample, e, ptransformId, processBundleId);
}
throw e;
logAndRethrow(
e, executionState, executionStateTracker, ptransformId, outputSampler, elementSample);
} finally {
executionState.deactivate();
}
Expand Down Expand Up @@ -419,14 +437,13 @@ public void accept(WindowedValue<T> input) throws Exception {
try {
consumerAndMetadata.getConsumer().accept(input);
} catch (Exception e) {
if (outputSampler != null) {
ExecutionStateSampler.ExecutionStateTrackerStatus status =
consumerAndMetadata.getExecutionStateTracker().getStatus();
String processBundleId = status == null ? null : status.getProcessBundleId();
outputSampler.exception(
elementSample, e, consumerAndMetadata.getPTransformId(), processBundleId);
}
throw e;
logAndRethrow(
e,
state,
consumerAndMetadata.getExecutionStateTracker(),
consumerAndMetadata.getPTransformId(),
outputSampler,
elementSample);
} finally {
state.deactivate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.fn.harness.control.ProcessBundleHandler.BundleProcessor;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.LogEntry;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
Expand Down Expand Up @@ -105,8 +104,6 @@ public class BeamFnLoggingClient implements AutoCloseable {
* so if they are garbage collected, our hierarchical configuration will be lost. */
private final Collection<Logger> configuredLoggers = new ArrayList<>();

private @Nullable ProcessBundleHandler processBundleHandler;

private final BlockingQueue<LogEntry> bufferedLogEntries =
new ArrayBlockingQueue<>(MAX_BUFFERED_LOG_ENTRY_COUNT);

Expand Down Expand Up @@ -347,10 +344,6 @@ public void close() throws Exception {
}
}

public void setProcessBundleHandler(ProcessBundleHandler processBundleHandler) {
this.processBundleHandler = processBundleHandler;
}

// Reset the logging configuration to what it is at startup.
@RequiresNonNull("configuredLoggers")
@RequiresNonNull("logRecordHandler")
Expand Down Expand Up @@ -440,14 +433,12 @@ public void publish(LogRecord record) {
if (loggerName != null) {
builder.setLogLocation(loggerName);
}
if (instructionId != null && processBundleHandler != null) {
BundleProcessor bundleProcessor =
processBundleHandler.getBundleProcessorCache().find(instructionId);
if (bundleProcessor != null) {
String transformId = bundleProcessor.getStateTracker().getCurrentThreadsPTransformId();
if (transformId != null) {
builder.setTransformId(transformId);
}

ExecutionStateSampler.ExecutionStateTracker stateTracker = BeamFnLoggingMDC.getStateTracker();
if (stateTracker != null) {
String transformId = stateTracker.getCurrentThreadsPTransformId();
if (transformId != null) {
builder.setTransformId(transformId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
*/
package org.apache.beam.fn.harness.logging;

import org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionStateTracker;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Mapped diagnostic context to be consumed and set on LogEntry protos in BeamFnLoggingClient. */
public class BeamFnLoggingMDC {
private static final ThreadLocal<@Nullable String> instructionId = new ThreadLocal<>();

private static final ThreadLocal<@Nullable ExecutionStateTracker> stateTracker =
new ThreadLocal<>();

/** Sets the Instruction ID of the current thread, which will be inherited by child threads. */
public static void setInstructionId(@Nullable String newInstructionId) {
instructionId.set(newInstructionId);
Expand All @@ -32,4 +36,20 @@ public static void setInstructionId(@Nullable String newInstructionId) {
public static @Nullable String getInstructionId() {
return instructionId.get();
}

/** Sets the State Tracker of the current thread, which will be inherited by child threads. */
public static void setStateTracker(@Nullable ExecutionStateTracker newStateTracker) {
stateTracker.set(newStateTracker);
}

/** Gets the State Tracker of the current thread. */
public static @Nullable ExecutionStateTracker getStateTracker() {
return stateTracker.get();
}

/** Resets to a default state. */
public static void reset() {
instructionId.set(null);
stateTracker.set(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -648,4 +649,28 @@ public Long answer(InvocationOnMock invocation) throws Throwable {
sampler.stop();
expectedLogs.verifyWarn("Operation ongoing in bundle bundleId for PTransform");
}

@Test
public void testErrorState() throws Exception {
MillisProvider clock = mock(MillisProvider.class);
ExecutionStateSampler sampler =
new ExecutionStateSampler(
PipelineOptionsFactory.fromArgs("--experiments=state_sampling_period_millis=10")
.create(),
clock);
ExecutionStateTracker tracker = sampler.create();
ExecutionState state1 =
tracker.create("shortId1", "ptransformId1", "ptransformIdName1", "process");
ExecutionState state2 =
tracker.create("shortId2", "ptransformId2", "ptransformIdName2", "process");

state1.activate();
state2.activate();
assertTrue(state2.error());
assertFalse(state2.error());
state2.deactivate();
assertFalse(state2.error());
tracker.reset();
assertTrue(state1.error());
}
}
Loading

0 comments on commit 6261a00

Please sign in to comment.