Skip to content

Commit

Permalink
Delegate working with AutoLockRenewer to receiver instance.
Browse files Browse the repository at this point in the history
  • Loading branch information
Konstantin Lapkovsky committed Sep 13, 2023
1 parent f1e06c2 commit 59fe35e
Showing 1 changed file with 6 additions and 9 deletions.
15 changes: 6 additions & 9 deletions kombu/transport/azureservicebus.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,14 @@ def _get_asb_receiver(
cache_key = queue_cache_key or queue
queue_obj = self._queue_cache.get(cache_key, None)
if queue_obj is None or queue_obj.receiver is None:
auto_lock_renewer = None
if self.use_lock_renewal:
auto_lock_renewer = AutoLockRenewer(max_lock_renewal_duration=self.max_lock_renewal_duration)

receiver = self.queue_service.get_queue_receiver(
queue_name=queue, receive_mode=recv_mode,
keep_alive=self.uamqp_keep_alive_interval)
keep_alive=self.uamqp_keep_alive_interval,
auto_lock_renewer=auto_lock_renewer)
queue_obj = self._add_queue_to_cache(cache_key, receiver=receiver)
return queue_obj

Expand Down Expand Up @@ -277,14 +282,6 @@ def _get(
# message.body is either byte or generator[bytes]
message = messages[0]

if self.use_lock_renewal:
with self.queue_service.get_queue_receiver(
queue_name=queue,
receive_mode=ServiceBusReceiveMode.PEEK_LOCK,
keep_alive=self.uamqp_keep_alive_interval
) as receiver, AutoLockRenewer() as lock_renewer:
lock_renewer.register(receiver, message, max_lock_renewal_duration=self.max_lock_renewal_duration)

if not isinstance(message.body, bytes):
body = b''.join(message.body)
else:
Expand Down

0 comments on commit 59fe35e

Please sign in to comment.