diff --git a/tests/integration/test_akafka_testutils.py b/tests/integration/test_akafka_testutils.py index b0fb6d15..5e6ff769 100644 --- a/tests/integration/test_akafka_testutils.py +++ b/tests/integration/test_akafka_testutils.py @@ -116,7 +116,7 @@ async def test_clear_topics_specific(kafka: KafkaFixture): await kafka.clear_topics(topics="clear_topic") # make sure the keep_topic still has its event but clear_topic is empty - prefetched = await consumer.getmany(timeout_ms=5000) + prefetched = await consumer.getmany(timeout_ms=500) assert len(prefetched) == 1 records = next(iter(prefetched.values())) assert len(records) == 1 @@ -136,15 +136,12 @@ async def test_clear_topics_specific(kafka: KafkaFixture): # make sure messages are consumed again records = [] - while True: - prefetched = await consumer.getmany(timeout_ms=5000) - if not prefetched: - break - assert len(prefetched) <= 2 - records.extend(next(iter(prefetched.values()))) + while prefetched := await consumer.getmany(timeout_ms=500): + for records_for_topic in prefetched.values(): + records.extend(records_for_topic) - records.sort(key=lambda record: record.topic) assert len(records) == 2 + records.sort(key=lambda record: record.topic) assert records[0].topic == "clear_topic" assert records[0].value assert records[0].value.decode("utf-8") == json.dumps(