Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Do not merge] Disable keepalive pings. #33415

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/beam_PostCommit_Java_InfluxDbIO_IT.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ jobs:
id: install_influxdb
run: |
kubectl apply -f ${{ github.workspace }}/.test-infra/kubernetes/influxdb/influxdb.yml
kubectl wait svc/influxdb-load-balancer-service --for=jsonpath='{.status.loadBalancer.ingress[0].ip}'
kubectl wait --timeout 2m svc/influxdb-load-balancer-service --for=jsonpath='{.status.loadBalancer.ingress[0].ip}'
loadbalancer_IP=$(kubectl get svc influxdb-load-balancer-service -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
echo influxdb_IP=$loadbalancer_IP >> $GITHUB_OUTPUT
- name: Run Java InfluxDbIO_IT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker>
new ConcurrentHashMap<>();

private static final long LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(5);
private static final long BUNDLE_LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(10);
private static final AtomicIntegerFieldUpdater<ExecutionStateTracker> SAMPLING_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ExecutionStateTracker.class, "sampling");

Expand Down Expand Up @@ -140,17 +139,8 @@ public String getDescription() {
*/
private volatile long millisSinceLastTransition = 0;

/**
* The number of milliseconds since the {@link ExecutionStateTracker} initial state.
*
* <p>This variable is updated by the Sampling thread, and read by the Progress Reporting thread,
* thus it being marked volatile.
*/
private volatile long millisSinceBundleStart = 0;

private long transitionsAtLastSample = 0;
private long nextLullReportMs = LULL_REPORT_MS;
private long nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS;

public ExecutionStateTracker(ExecutionStateSampler sampler) {
this.sampler = sampler;
Expand All @@ -165,10 +155,8 @@ public synchronized void reset() {
currentState = null;
numTransitions = 0;
millisSinceLastTransition = 0;
millisSinceBundleStart = 0;
transitionsAtLastSample = 0;
nextLullReportMs = LULL_REPORT_MS;
nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS;
}

@VisibleForTesting
Expand Down Expand Up @@ -347,19 +335,6 @@ protected void takeSampleOnce(long millisSinceLastSample) {
transitionsAtLastSample = transitionsAtThisSample;
}
updateMillisSinceLastTransition(millisSinceLastSample, state);
updateMillisSinceBundleStart(millisSinceLastSample);
}

// Override this to implement bundle level lull reporting.
protected void reportBundleLull(long millisSinceBundleStart) {}

@SuppressWarnings("NonAtomicVolatileUpdate")
private void updateMillisSinceBundleStart(long millisSinceLastSample) {
millisSinceBundleStart += millisSinceLastSample;
if (millisSinceBundleStart > nextBundleLullReportMs) {
reportBundleLull(millisSinceBundleStart);
nextBundleLullReportMs += BUNDLE_LULL_REPORT_MS;
}
}

@SuppressWarnings("NonAtomicVolatileUpdate")
Expand Down
4 changes: 2 additions & 2 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ evaluationDependsOn(":sdks:java:container:java11")

ext.dataflowLegacyEnvironmentMajorVersion = '8'
ext.dataflowFnapiEnvironmentMajorVersion = '8'
ext.dataflowLegacyContainerVersion = 'beam-master-20240125'
ext.dataflowFnapiContainerVersion = 'beam-master-20240125'
ext.dataflowLegacyContainerVersion = '2.55.1'
ext.dataflowFnapiContainerVersion = '2.55.1'
ext.dataflowContainerBaseRepository = 'gcr.io/cloud-dataflow/v1beta3'

processResources {
Expand Down
6 changes: 2 additions & 4 deletions runners/google-cloud-dataflow-java/examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,8 @@ def commonConfig = { Map args ->
"--region=${gcpRegion}",
"--tempRoot=${actualGcsTempRoot}",
"--runner=TestDataflowRunner",
"--dataflowWorkerJar=${actualDataflowWorkerJar}"]
if (actualWorkerHarnessContainerImage) {
preCommitBeamTestPipelineOptions += "--workerHarnessContainerImage=${actualWorkerHarnessContainerImage}"
}
"--dataflowWorkerJar=${actualDataflowWorkerJar}",
"--workerHarnessContainerImage=${actualWorkerHarnessContainerImage}"]
preCommitBeamTestPipelineOptions += additionalOptions
systemProperty "beamTestPipelineOptions", JsonOutput.toJson(preCommitBeamTestPipelineOptions)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import com.google.api.client.util.Clock;
import com.google.api.services.dataflow.model.SideInputInfo;
import java.io.Closeable;
import java.io.IOException;
Expand All @@ -30,8 +29,6 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.core.NullSideInputReader;
Expand All @@ -40,30 +37,22 @@
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler;
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closer;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.DateTimeUtils.MillisProvider;
import org.joda.time.Instant;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Execution context for the Dataflow worker. */
@SuppressWarnings({
Expand Down Expand Up @@ -271,59 +260,23 @@ public static class DataflowExecutionStateTracker extends ExecutionStateTracker
@Nullable
private ActiveMessageMetadata activeMessageMetadata = null;

/** Clock used to either provide real system time or mocked to virtualize time for testing. */
private final Clock clock;
private final MillisProvider clock = System::currentTimeMillis;

@GuardedBy("this")
private final Map<String, IntSummaryStatistics> processingTimesByStep = new HashMap<>();

/** Last milliseconds since epoch when a full thread dump was performed. */
private long lastFullThreadDumpMillis = 0;

/** The minimum lull duration in milliseconds to perform a full thread dump. */
private static final long LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS = 20 * 60 * 1000;

private static final Logger LOG = LoggerFactory.getLogger(DataflowExecutionStateTracker.class);

private static final PeriodFormatter DURATION_FORMATTER =
new PeriodFormatterBuilder()
.appendDays()
.appendSuffix("d")
.minimumPrintedDigits(2)
.appendHours()
.appendSuffix("h")
.printZeroAlways()
.appendMinutes()
.appendSuffix("m")
.appendSeconds()
.appendSuffix("s")
.toFormatter();

public DataflowExecutionStateTracker(
ExecutionStateSampler sampler,
DataflowOperationContext.DataflowExecutionState otherState,
CounterFactory counterFactory,
PipelineOptions options,
String workItemId) {
this(sampler, otherState, counterFactory, options, workItemId, Clock.SYSTEM);
}

@VisibleForTesting
public DataflowExecutionStateTracker(
ExecutionStateSampler sampler,
DataflowOperationContext.DataflowExecutionState otherState,
CounterFactory counterFactory,
PipelineOptions options,
String workItemId,
Clock clock) {
super(sampler);
this.elementExecutionTracker =
DataflowElementExecutionTracker.create(counterFactory, options);
this.otherState = otherState;
this.workItemId = workItemId;
this.contextActivationObserverRegistry = ContextActivationObserverRegistry.createDefault();
this.clock = clock;
DataflowWorkerLoggingInitializer.initialize();
}

@Override
Expand All @@ -348,76 +301,12 @@ public Closeable activate() {
}
}

private boolean shouldLogFullThreadDumpForBundle(Duration lullDuration) {
if (lullDuration.getMillis() < LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS) {
return false;
}
long now = clock.currentTimeMillis();
if (lastFullThreadDumpMillis + LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS < now) {
lastFullThreadDumpMillis = now;
return true;
}
return false;
}

private String getBundleLullMessage(Duration lullDuration) {
StringBuilder message = new StringBuilder();
message
.append("Operation ongoing in bundle for at least ")
.append(DURATION_FORMATTER.print(lullDuration.toPeriod()))
.append(" without completing")
.append("\n");
synchronized (this) {
if (this.activeMessageMetadata != null) {
message.append(
"Current user step name: " + getActiveMessageMetadata().get().userStepName() + "\n");
message.append(
"Time spent in this step(millis): "
+ (clock.currentTimeMillis() - getActiveMessageMetadata().get().startTime())
+ "\n");
}
message.append("Processing times in each step(millis)\n");
for (Map.Entry<String, IntSummaryStatistics> entry :
this.processingTimesByStep.entrySet()) {
message.append("Step name: " + entry.getKey() + "\n");
message.append("Time spent in this step: " + entry.getValue().toString() + "\n");
}
}

return message.toString();
}

@Override
protected void takeSampleOnce(long millisSinceLastSample) {
elementExecutionTracker.takeSample(millisSinceLastSample);
super.takeSampleOnce(millisSinceLastSample);
}

@Override
protected void reportBundleLull(long millisElapsedSinceBundleStart) {
// If we're not logging warnings, nothing to report.
if (!LOG.isWarnEnabled()) {
return;
}

Duration lullDuration = Duration.millis(millisElapsedSinceBundleStart);

// Since the lull reporting executes in the sampler thread, it won't automatically inherit the
// context of the current step. To ensure things are logged correctly, we get the currently
// registered DataflowWorkerLoggingHandler and log directly in the desired context.
LogRecord logRecord = new LogRecord(Level.WARNING, getBundleLullMessage(lullDuration));
logRecord.setLoggerName(DataflowExecutionStateTracker.LOG.getName());

// Publish directly in the context of this specific ExecutionState.
DataflowWorkerLoggingHandler dataflowLoggingHandler =
DataflowWorkerLoggingInitializer.getLoggingHandler();
dataflowLoggingHandler.publish(logRecord);

if (shouldLogFullThreadDumpForBundle(lullDuration)) {
StackTraceUtil.logAllStackTraces();
}
}

/**
* Enter a new state on the tracker. If the new state is a Dataflow processing state, tracks the
* activeMessageMetadata with the start time of the new state.
Expand All @@ -434,7 +323,7 @@ public Closeable enterState(ExecutionState newState) {
synchronized (this) {
this.activeMessageMetadata =
ActiveMessageMetadata.create(
newDFState.getStepName().userName(), clock.currentTimeMillis());
newDFState.getStepName().userName(), clock.getMillis());
}
}
elementExecutionTracker.enter(newDFState.getStepName());
Expand Down
Loading
Loading