Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: KafkaIO Should not throw errors for empty topics. #31964

Closed
2 of 17 tasks
damnMeddlingKid opened this issue Jul 24, 2024 · 1 comment
Closed
2 of 17 tasks

[Bug]: KafkaIO Should not throw errors for empty topics. #31964

damnMeddlingKid opened this issue Jul 24, 2024 · 1 comment

Comments

@damnMeddlingKid
Copy link

What happened?

We ingest a large number of Kafka topics using Apache Beam (Dataflow) and write to BigQuery. Periodically we will have traffic stop on some of the low volume topics and the retention policy will empty out the topic.

If the topic becomes empty while Beam is processing our pipelines start throwing errors like

Error Stack Trace

Error message from worker: java.io.IOException: Failed to advance source: org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@392e80b1 org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:849) org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:816) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1238) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.lambda$scheduleWorkItem$11(StreamingDataflowWorker.java:974) org.apache.beam.runners.dataflow.worker.streaming.Work.run(Work.java:81) org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeMonitorHeld$0(BoundedQueueExecutor.java:232) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.io.IOException: Exception while reading from Kafka org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:630) org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:230) org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:844) org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:816) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1238) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.lambda$scheduleWorkItem$11(StreamingDataflowWorker.java:974) org.apache.beam.runners.dataflow.worker.streaming.Work.run(Work.java:81) org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeMonitorHeld$0(BoundedQueueExecutor.java:232) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) java.base/java.lang.Thread.run(Thread.java:833) Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [monorail_runtime_client_failure_6] java.io.IOException: Failed to advance source: org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@86b8dd3 org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:849) org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:816) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1238) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.lambda$scheduleWorkItem$11(StreamingDataflowWorker.java:974) org.apache.beam.runners.dataflow.worker.streaming.Work.run(Work.java:81) org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeMonitorHeld$0(BoundedQueueExecutor.java:232) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.io.IOException: Exception while reading from Kafka org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:630) org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:230) org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.advance(WorkerCustomSources.java:844) org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:816) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1238) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.lambda$scheduleWorkItem$11(StreamingDataflowWorker.java:974) org.apache.beam.runners.dataflow.worker.streaming.Work.run(Work.java:81) org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeMonitorHeld$0(BoundedQueueExecutor.java:232) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) java.base/java.lang.Thread.run(Thread.java:833) Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [my_topic_name]

These errors effect the ingestion of other topics running in the same job. KafkaIO should not consider this an invalid state and should just ignore topics that are empty.

Note: Jobs will work fine after a restart so it seems like the error only occurs if a topic becomes empty after the job has already started

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@johnjcasey
Copy link
Contributor

Tracked by support ticket

@github-actions github-actions bot added this to the 2.61.0 Release milestone Oct 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants