Skip to content

Commit

Permalink
Iron out flaky kafka test (GSI-793) (#112)
Browse files Browse the repository at this point in the history
* Don't exit loop early

* Remove suppress statement

* Save someone a headache

* Make loop more idiomatic

* Handle multiple records returned as well as both topics in same loop

* Break once prefetched is empty

---------

Co-authored-by: TheByronHimes <[email protected]>
  • Loading branch information
TheByronHimes and TheByronHimes authored Jun 7, 2024
1 parent 1f4b186 commit 76914c8
Showing 1 changed file with 5 additions and 8 deletions.
13 changes: 5 additions & 8 deletions tests/integration/test_akafka_testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async def test_clear_topics_specific(kafka: KafkaFixture):
await kafka.clear_topics(topics="clear_topic")

# make sure the keep_topic still has its event but clear_topic is empty
prefetched = await consumer.getmany(timeout_ms=5000)
prefetched = await consumer.getmany(timeout_ms=500)
assert len(prefetched) == 1
records = next(iter(prefetched.values()))
assert len(records) == 1
Expand All @@ -136,15 +136,12 @@ async def test_clear_topics_specific(kafka: KafkaFixture):

# make sure messages are consumed again
records = []
while True:
prefetched = await consumer.getmany(timeout_ms=5000)
if not prefetched:
break
assert len(prefetched) <= 2
records.extend(next(iter(prefetched.values())))
while prefetched := await consumer.getmany(timeout_ms=500):
for records_for_topic in prefetched.values():
records.extend(records_for_topic)

records.sort(key=lambda record: record.topic)
assert len(records) == 2
records.sort(key=lambda record: record.topic)
assert records[0].topic == "clear_topic"
assert records[0].value
assert records[0].value.decode("utf-8") == json.dumps(
Expand Down

0 comments on commit 76914c8

Please sign in to comment.