Skip to content

Commit

Permalink
refactor: Replace usage of six for their py3 implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
a-ungurianu committed Nov 16, 2022
1 parent 4b13135 commit e05b50a
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 107 deletions.
53 changes: 24 additions & 29 deletions kazoo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import re
import warnings

import six

from kazoo.exceptions import (
AuthFailedError,
ConfigurationError,
Expand Down Expand Up @@ -66,9 +64,6 @@
from kazoo.recipe.watchers import ChildrenWatch, DataWatch


string_types = six.string_types
bytes_types = (six.binary_type,)

CLOSED_STATES = (
KeeperState.EXPIRED_SESSION,
KeeperState.AUTH_FAILED,
Expand Down Expand Up @@ -415,10 +410,10 @@ def _reset(self):

def _reset_watchers(self):
watchers = []
for child_watchers in six.itervalues(self._child_watchers):
for child_watchers in self._child_watchers.values():
watchers.extend(child_watchers)

for data_watchers in six.itervalues(self._data_watchers):
for data_watchers in self._data_watchers.values():
watchers.extend(data_watchers)

self._child_watchers = defaultdict(set)
Expand Down Expand Up @@ -821,7 +816,7 @@ def _is_valid(version):
version = _try_fetch()
if _is_valid(version):
return version
for _i in six.moves.range(0, retries):
for _i in range(0, retries):
version = _try_fetch()
if _is_valid(version):
return version
Expand Down Expand Up @@ -854,9 +849,9 @@ def add_auth_async(self, scheme, credential):
:rtype: :class:`~kazoo.interfaces.IAsyncResult`
"""
if not isinstance(scheme, string_types):
if not isinstance(scheme, str):
raise TypeError("Invalid type for 'scheme' (string expected)")
if not isinstance(credential, string_types):
if not isinstance(credential, str):
raise TypeError("Invalid type for 'credential' (string expected)")

# we need this auth data to re-authenticate on reconnect
Expand Down Expand Up @@ -1034,15 +1029,15 @@ def create_async(
if acl is None and self.default_acl:
acl = self.default_acl

if not isinstance(path, string_types):
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if acl and (
isinstance(acl, ACL) or not isinstance(acl, (tuple, list))
):
raise TypeError(
"Invalid type for 'acl' (acl must be a tuple/list" " of ACL's"
)
if value is not None and not isinstance(value, bytes_types):
if value is not None and not isinstance(value, bytes):
raise TypeError("Invalid type for 'value' (must be a byte string)")
if not isinstance(ephemeral, bool):
raise TypeError("Invalid type for 'ephemeral' (bool expected)")
Expand Down Expand Up @@ -1205,7 +1200,7 @@ def exists_async(self, path, watch=None):
:rtype: :class:`~kazoo.interfaces.IAsyncResult`
"""
if not isinstance(path, string_types):
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if watch and not callable(watch):
raise TypeError("Invalid type for 'watch' (must be a callable)")
Expand Down Expand Up @@ -1248,7 +1243,7 @@ def get_async(self, path, watch=None):
:rtype: :class:`~kazoo.interfaces.IAsyncResult`
"""
if not isinstance(path, string_types):
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if watch and not callable(watch):
raise TypeError("Invalid type for 'watch' (must be a callable)")
Expand Down Expand Up @@ -1304,7 +1299,7 @@ def get_children_async(self, path, watch=None, include_data=False):
:rtype: :class:`~kazoo.interfaces.IAsyncResult`
"""
if not isinstance(path, string_types):
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if watch and not callable(watch):
raise TypeError("Invalid type for 'watch' (must be a callable)")
Expand Down Expand Up @@ -1346,7 +1341,7 @@ def get_acls_async(self, path):
:rtype: :class:`~kazoo.interfaces.IAsyncResult`
"""
if not isinstance(path, string_types):
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")

async_result = self.handler.async_result()
Expand Down Expand Up @@ -1389,7 +1384,7 @@ def set_acls_async(self, path, acls, version=-1):
:rtype: :class:`~kazoo.interfaces.IAsyncResult`
"""
if not isinstance(path, string_types):
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if isinstance(acls, ACL) or not isinstance(acls, (tuple, list)):
raise TypeError(
Expand Down Expand Up @@ -1447,9 +1442,9 @@ def set_async(self, path, value, version=-1):
:rtype: :class:`~kazoo.interfaces.IAsyncResult`
"""
if not isinstance(path, string_types):
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if value is not None and not isinstance(value, bytes_types):
if value is not None and not isinstance(value, bytes):
raise TypeError("Invalid type for 'value' (must be a byte string)")
if not isinstance(version, int):
raise TypeError("Invalid type for 'version' (int expected)")
Expand Down Expand Up @@ -1523,7 +1518,7 @@ def delete_async(self, path, version=-1):
:rtype: :class:`~kazoo.interfaces.IAsyncResult`
"""
if not isinstance(path, string_types):
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if not isinstance(version, int):
raise TypeError("Invalid type for 'version' (int expected)")
Expand Down Expand Up @@ -1632,11 +1627,11 @@ def reconfig_async(self, joining, leaving, new_members, from_config):
:rtype: :class:`~kazoo.interfaces.IAsyncResult`
"""
if joining and not isinstance(joining, string_types):
if joining and not isinstance(joining, str):
raise TypeError("Invalid type for 'joining' (string expected)")
if leaving and not isinstance(leaving, string_types):
if leaving and not isinstance(leaving, str):
raise TypeError("Invalid type for 'leaving' (string expected)")
if new_members and not isinstance(new_members, string_types):
if new_members and not isinstance(new_members, str):
raise TypeError(
"Invalid type for 'new_members' (string " "expected)"
)
Expand Down Expand Up @@ -1690,13 +1685,13 @@ def create(
if acl is None and self.client.default_acl:
acl = self.client.default_acl

if not isinstance(path, string_types):
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if acl and not isinstance(acl, (tuple, list)):
raise TypeError(
"Invalid type for 'acl' (acl must be a tuple/list" " of ACL's"
)
if not isinstance(value, bytes_types):
if not isinstance(value, bytes):
raise TypeError("Invalid type for 'value' (must be a byte string)")
if not isinstance(ephemeral, bool):
raise TypeError("Invalid type for 'ephemeral' (bool expected)")
Expand All @@ -1722,7 +1717,7 @@ def delete(self, path, version=-1):
`recursive`.
"""
if not isinstance(path, string_types):
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if not isinstance(version, int):
raise TypeError("Invalid type for 'version' (int expected)")
Expand All @@ -1733,9 +1728,9 @@ def set_data(self, path, value, version=-1):
arguments as :meth:`KazooClient.set`.
"""
if not isinstance(path, string_types):
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if not isinstance(value, bytes_types):
if not isinstance(value, bytes):
raise TypeError("Invalid type for 'value' (must be a byte string)")
if not isinstance(version, int):
raise TypeError("Invalid type for 'version' (int expected)")
Expand All @@ -1750,7 +1745,7 @@ def check(self, path, version):
does not match the specified version.
"""
if not isinstance(path, string_types):
if not isinstance(path, str):
raise TypeError("Invalid type for 'path' (string expected)")
if not isinstance(version, int):
raise TypeError("Invalid type for 'version' (int expected)")
Expand Down
29 changes: 12 additions & 17 deletions kazoo/handlers/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,15 @@
from __future__ import absolute_import

import logging
import queue
import socket
import threading
import time

import six

import kazoo.python2atexit as python2atexit
from kazoo.handlers import utils
from kazoo.handlers.utils import selector_select

try:
import Queue
except ImportError: # pragma: nocover
import queue as Queue

# sentinel objects
_STOP = object()
Expand All @@ -35,11 +30,11 @@


def _to_fileno(obj):
if isinstance(obj, six.integer_types):
if isinstance(obj, int):
fd = int(obj)
elif hasattr(obj, "fileno"):
fd = obj.fileno()
if not isinstance(fd, six.integer_types):
if not isinstance(fd, int):
raise TypeError("fileno() returned a non-integer")
fd = int(fd)
else:
Expand Down Expand Up @@ -98,8 +93,8 @@ class SequentialThreadingHandler(object):
name = "sequential_threading_handler"
timeout_exception = KazooTimeoutError
sleep_func = staticmethod(time.sleep)
queue_impl = Queue.Queue
queue_empty = Queue.Empty
queue_impl = queue.Queue
queue_empty = queue.Empty

def __init__(self):
"""Create a :class:`SequentialThreadingHandler` instance"""
Expand All @@ -113,19 +108,19 @@ def __init__(self):
def running(self):
return self._running

def _create_thread_worker(self, queue):
def _create_thread_worker(self, work_queue):
def _thread_worker(): # pragma: nocover
while True:
try:
func = queue.get()
func = work_queue.get()
try:
if func is _STOP:
break
func()
except Exception:
log.exception("Exception in worker queue thread")
finally:
queue.task_done()
work_queue.task_done()
del func # release before possible idle
except self.queue_empty:
continue
Expand All @@ -142,8 +137,8 @@ def start(self):
# Spawn our worker threads, we have
# - A callback worker for watch events to be called
# - A completion worker for completion events to be called
for queue in (self.completion_queue, self.callback_queue):
w = self._create_thread_worker(queue)
for work_queue in (self.completion_queue, self.callback_queue):
w = self._create_thread_worker(work_queue)
self._workers.append(w)
self._running = True
python2atexit.register(self.stop)
Expand All @@ -156,8 +151,8 @@ def stop(self):

self._running = False

for queue in (self.completion_queue, self.callback_queue):
queue.put(_STOP)
for work_queue in (self.completion_queue, self.callback_queue):
work_queue.put(_STOP)

self._workers.reverse()
while self._workers:
Expand Down
22 changes: 5 additions & 17 deletions kazoo/handlers/utils.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
"""Kazoo handler helpers"""

from collections import defaultdict
import errno
import functools
import os
import select
import selectors
import ssl
import socket
import time

from collections import defaultdict

import six

if six.PY34:
import selectors
else:
import selectors2 as selectors

HAS_FNCTL = True
try:
import fcntl
Expand Down Expand Up @@ -363,14 +356,9 @@ def selector_select(
):
"""Selector-based drop-in replacement for select to overcome select
limitation on a maximum filehandle value.
Need backport selectors2 package in python 2.
"""
if timeout is not None:
if not (
isinstance(timeout, six.integer_types)
or isinstance(timeout, float)
):
if not isinstance(timeout, (int, float)):
raise TypeError("timeout must be a number")
if timeout < 0:
raise ValueError("timeout must be non-negative")
Expand Down Expand Up @@ -400,9 +388,9 @@ def selector_select(

for info in ready:
k, events = info
if events & selectors.EVENT_READ:
if events & selectors_module.EVENT_READ:
revents.extend(fd_fileobjs[k.fd])
elif events & selectors.EVENT_WRITE:
elif events & selectors_module.EVENT_WRITE:
wevents.extend(fd_fileobjs[k.fd])

return revents, wevents, xevents
4 changes: 2 additions & 2 deletions kazoo/hosts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from six.moves import urllib_parse
import urllib.parse


def collect_hosts(hosts):
Expand All @@ -20,7 +20,7 @@ def collect_hosts(hosts):
for host_port in host_ports:
# put all complexity of dealing with
# IPv4 & IPv6 address:port on the urlsplit
res = urllib_parse.urlsplit("xxx://" + host_port)
res = urllib.parse.urlsplit("xxx://" + host_port)
host = res.hostname
if host is None:
raise ValueError("bad hostname")
Expand Down
Loading

0 comments on commit e05b50a

Please sign in to comment.