You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When running a streaming job (with DirectRunner locally and with DataflowRunner on GCP) that uses the apache_beam.io.kafka.ReadFromKafka connector without max_num_records, the job does not process any information and instead gets trapped in an infinite loop of creating consumers that subscribe and get assigned a partition and offset but do not process any information. We are forcing auto.offset.reset = earliest.
We verified that when setting max_num_records the job runs and processes the information correctly both locally and on DataFlow. All of this makes us conclude that this is not a GCP issue but rather a Beam one.
We noticed the infinite loop in the logs and we also noticed that Lenses never reports active members of the consumer group:
We have tried the default Kafka configurations as well as custom ones. I'm just sharing the latest:
pipeline
| "ReadFromStream" >> apache_beam.io.kafka.ReadFromKafka(
consumer_config={ # Also tested with a single broker
"bootstrap.servers": "kafka-1782273228-1-1908664276.wmt-uscentral.kafka-v2-qarth-reg-stg.ms-df-messaging.prod-wmt-uscentral-4.prod.walmart.com:9092,kafka-1782274279-1-1908664354.wmt-uscentral.kafka-v2-qarth-reg-stg.ms-df-messaging.prod-wmt-uscentral-5.prod.walmart.com:9092,kafka-1782274320-1-1908664432.wmt-uscentral.kafka-v2-qarth-reg-stg.ms-df-messaging.prod-wmt-uscentral-6.prod.walmart.com:9092",
"auto.offset.reset": "earliest",
"fetch.max.bytes": "52428800",
"fetch.min.bytes": "1",
"fetch.max.wait.ms": "1000",
"max.poll.interval.ms": "20000",
"max.poll.records": "10",
"request.timeout.ms": "30000",
"session.timeout.ms": "45000",
"timeout.ms": "10000",
"group.id": "test-group-id",
"heartbeat.interval.ms": "200",
"reconnect.backoff.ms": "100",
"reconnect.backoff.max.ms": "10000",
},
topics=["some-topic-i-cannot-share"],
with_metadata=True,
# max_num_records=1000 # For testing only
This does not seem to be a problem of our Kafka Topic, since custom python clients (that use kafka-python) run successfully with the exact same Kafka configuration.
In the support ticket, we learned that the original reporter was not setting enable.auto.commit to false, which was the actual cause of this reported issue. The max_num_record parameter was a red herring
What happened?
When running a streaming job (with DirectRunner locally and with DataflowRunner on GCP) that uses the apache_beam.io.kafka.ReadFromKafka connector without
max_num_records
, the job does not process any information and instead gets trapped in an infinite loop of creating consumers that subscribe and get assigned a partition and offset but do not process any information. We are forcingauto.offset.reset = earliest
.We verified that when setting
max_num_records
the job runs and processes the information correctly both locally and on DataFlow. All of this makes us conclude that this is not a GCP issue but rather a Beam one.We noticed the infinite loop in the logs and we also noticed that Lenses never reports active members of the consumer group:
We have tried the default Kafka configurations as well as custom ones. I'm just sharing the latest:
This does not seem to be a problem of our Kafka Topic, since custom python clients (that use kafka-python) run successfully with the exact same Kafka configuration.
Beam SDK language: Python
Beam SDK version: 2.52.0
Any feedback is greatly appreciated.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
The text was updated successfully, but these errors were encountered: