Skip to content

Commit

Permalink
fix(backend/kafka): consumer unsubscribe not awaitable
Browse files Browse the repository at this point in the history
  • Loading branch information
pwoolvett committed May 6, 2022
1 parent aaf59e8 commit 9255c29
Showing 1 changed file with 3 additions and 5 deletions.
8 changes: 3 additions & 5 deletions broadcaster/_backends/kafka.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import typing
from urllib.parse import urlparse

Expand All @@ -14,9 +13,8 @@ def __init__(self, url: str):
self._consumer_channels: typing.Set = set()

async def connect(self) -> None:
loop = asyncio.get_event_loop()
self._producer = AIOKafkaProducer(loop=loop, bootstrap_servers=self._servers)
self._consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self._servers)
self._producer = AIOKafkaProducer(bootstrap_servers=self._servers)
self._consumer = AIOKafkaConsumer(bootstrap_servers=self._servers)
await self._producer.start()
await self._consumer.start()

Expand All @@ -29,7 +27,7 @@ async def subscribe(self, channel: str) -> None:
self._consumer.subscribe(topics=self._consumer_channels)

async def unsubscribe(self, channel: str) -> None:
await self._consumer.unsubscribe()
self._consumer.unsubscribe()

async def publish(self, channel: str, message: typing.Any) -> None:
await self._producer.send_and_wait(channel, message.encode("utf8"))
Expand Down

0 comments on commit 9255c29

Please sign in to comment.