Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink streaming flakes in ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful and ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful #19814

Closed
damccorm opened this issue Jun 4, 2022 · 10 comments

Comments

@damccorm
Copy link
Contributor

damccorm commented Jun 4, 2022

Temporary fail from Jenkins:


java.lang.AssertionError: Function should have been torn down after exception
Expected: is <TEARDOWN>

    but: was <START_BUNDLE>
	at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
	at org.apache.beam.sdk.transforms.ParDoLifecycleTest.lambda$validate$0(ParDoLifecycleTest.java:266)
	at
java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4707)
	at org.apache.beam.sdk.transforms.ParDoLifecycleTest.validate(ParDoLifecycleTest.java:264)
	at
org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful(ParDoLifecycleTest.java:253)
	at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
	at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:349)
	at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at
org.junit.runners.ParentRunner$3.run(ParentRunner.java:314)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:312)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:396)
	at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
	at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
	at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
	at
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
	at
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
	at
sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at
java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
	at
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
	at
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
	at
com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
	at
sun.reflect.GeneratedMethodAccessor30.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at
java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
	at
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
	at
org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
	at
org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
	at
org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
	at
java.lang.Thread.run(Thread.java:748)

Standard Error

Aug 26, 2019 2:47:16 PM org.apache.beam.runners.direct.ParDoEvaluatorFactory
createParDoEvaluator
SEVERE: Exception encountered while cleaning up in ParDo evaluator construction
org.apache.beam.sdk.util.UserCodeException:
java.lang.AssertionError: Expected to have a processing method throw an exception
	at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
	at
org.apache.beam.sdk.transforms.ParDoLifecycleTest$ExceptionThrowingFn$DoFnInvoker.invokeTeardown(Unknown
Source)
	at org.apache.beam.runners.direct.DoFnLifecycleManager$TeardownRemovedFnListener.onRemoval(DoFnLifecycleManager.java:113)
	at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1809)
	at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.runUnlockedCleanup(LocalCache.java:3462)
	at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.postWriteCleanup(LocalCache.java:3438)
	at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.remove(LocalCache.java:3072)
	at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.remove(LocalCache.java:4236)
	at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.invalidate(LocalCache.java:4899)
	at
org.apache.beam.runners.direct.DoFnLifecycleManager.remove(DoFnLifecycleManager.java:66)
	at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createParDoEvaluator(ParDoEvaluatorFactory.java:168)
	at
org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator(ParDoEvaluatorFactory.java:121)
	at
org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication(ParDoEvaluatorFactory.java:79)
	at
org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication(TransformEvaluatorRegistry.java:169)
	at
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:117)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at
java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused
by: java.lang.AssertionError: Expected to have a processing method throw an exception
	at org.junit.Assert.fail(Assert.java:89)
	at
org.apache.beam.sdk.transforms.ParDoLifecycleTest$ExceptionThrowingFn.after(ParDoLifecycleTest.java:398)

Imported from Jira BEAM-8101. Original Jira may contain additional context.
Reported by: janl.

@TheNeuralBit
Copy link
Member

@adude3141 it looks like you had the most recent updates on BEAM-8101. Do you have any thoughts on a path forward?

@kennknowles
Copy link
Member

kennknowles commented Oct 18, 2022

Did not flake in 100 runs on DirectRunner:

for i in `seq 1 100` ; do (
  echo "RUN $i"
  ./gradlew :runners:direct-java:cleanTest --quiet
  ./gradlew :runners:direct-java:needsRunnerTests --quiet --tests ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful
) 2>/dev/null
done

@kennknowles
Copy link
Member

Did not flake in 100 runs on Flink batch:

for i in `seq 1 100` ; do (
  echo "RUN $i"
  ./gradlew :runners:flink:1.15:cleanTest --quiet
  ./gradlew :runners:flink:1.15:validatesRunnerBatch --quiet --tests ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful
) 2>/dev/null
done

@kennknowles
Copy link
Member

Notably the flakes only occur in the stateful variant.

@kennknowles kennknowles changed the title Flakes in ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful for Direct, Spark, Flink Flakes in ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful for Spark, Flink Oct 18, 2022
@kennknowles
Copy link
Member

We do see recent flakes in Spark batch:

These may be the same root cause in the StatefulDoFnRunner but may be something else entirely having to do with exception handling in the runner. I will fork the tickets.

@kennknowles kennknowles changed the title Flakes in ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful for Spark, Flink Flink streaming flakes in ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful and ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful Oct 18, 2022
@kennknowles
Copy link
Member

Actually the Spark flake is not in a stateful variant.

@kennknowles
Copy link
Member

Despite recent flakes in Jenkins, this succeeded:

for i in `seq 1 100` ; do (
  echo "RUN $i"
  ./gradlew :runners:flink:1.15:cleanTest --quiet
  ./gradlew :runners:flink:1.15:validatesRunnerStreaming --quiet --tests ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful
) 2>/dev/null
done

From the logs, it seems clear that the state of the chain moves to failed, and presumably it is taken down in a race condition with calling teardown.

[CHAIN DataSource (at Create.Values/Read(CreateSource) (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat)) -> FlatMap (FlatMap at ParDo(ExceptionThrowing)/ParMultiDo(ExceptionThrowing)) -> FlatMap (FlatMap at ParDo(ExceptionThrowing)/ParMultiDo(ExceptionThrowing).output) (1/2)#0] WARN org.apache.flink.runtime.taskmanager.Task - CHAIN DataSource (at Create.Values/Read(CreateSource) (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat)) -> FlatMap (FlatMap at ParDo(ExceptionThrowing)/ParMultiDo(ExceptionThrowing)) -> FlatMap (FlatMap at ParDo(ExceptionThrowing)/ParMultiDo(ExceptionThrowing).output) (1/2)#0 (5f615db581224acfcac1a918f0fa2b1c) switched from RUNNING to FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException: java.lang.Exception: Hasn't yet thrown
	at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
	at org.apache.beam.sdk.transforms.ParDoLifecycleTest$ExceptionThrowingFn$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
	at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
	at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
	at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:122)
	at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:59)
	at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
	at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:208)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.Exception: Hasn't yet thrown
	at org.apache.beam.sdk.transforms.ParDoLifecycleTest$ExceptionThrowingFn.throwIfNecessary(ParDoLifecycleTest.java:453)
	at org.apache.beam.sdk.transforms.ParDoLifecycleTest$ExceptionThrowingFn.perElement(ParDoLifecycleTest.java:433)

@kennknowles
Copy link
Member

Leaving open but opportunistically moving to a bug I can repro. This one is in the unfortunate state where I know it is real so I can't close it but I cannot repro.

@kennknowles kennknowles removed their assignment Oct 18, 2022
@damccorm
Copy link
Contributor Author

I think this is fixed. If not, it should get auto-flagged by our tooling anyways, so this should be safe to close

@github-actions github-actions bot added this to the 2.61.0 Release milestone Oct 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants