Skip to content

Commit

Permalink
fix(recipe): No more memory leak once TreeCache was closed (#524)
Browse files Browse the repository at this point in the history
fix(recipe): Fix memory leak of TreeCache recipe.

Fix memory leak on idle handler and on closed TreeCache.
Add new memory tests for TreeCache recipe that uses
objgraph and other tests for various handler on TreeCache.
Let TreeCache start in a safe way. The doc now suggest to
close unused TreeCache.
  • Loading branch information
tonyseek authored and StephenSorriaux committed Nov 21, 2018
1 parent 03340fb commit c48f273
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 15 deletions.
4 changes: 4 additions & 0 deletions kazoo/handlers/eventlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ def _process_completion_queue(self):
except Exception:
LOG.warning("Exception in worker completion queue greenlet",
exc_info=True)
finally:
del cb # release before possible idle

def _process_callback_queue(self):
while True:
Expand All @@ -119,6 +121,8 @@ def _process_callback_queue(self):
except Exception:
LOG.warning("Exception in worker callback queue greenlet",
exc_info=True)
finally:
del cb # release before possible idle

def start(self):
if not self._started:
Expand Down
15 changes: 9 additions & 6 deletions kazoo/handlers/gevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,17 @@ def greenlet_worker():
while True:
try:
func = queue.get()
if func is _STOP:
break
func()
try:
if func is _STOP:
break
func()
except Exception as exc:
log.warning("Exception in worker greenlet")
log.exception(exc)
finally:
del func # release before possible idle
except self.queue_empty:
continue
except Exception as exc:
log.warning("Exception in worker greenlet")
log.exception(exc)
return gevent.spawn(greenlet_worker)

def start(self):
Expand Down
1 change: 1 addition & 0 deletions kazoo/handlers/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def _thread_worker(): # pragma: nocover
log.exception("Exception in worker queue thread")
finally:
queue.task_done()
del func # release before possible idle
except self.queue_empty:
continue
t = self.spawn(_thread_worker)
Expand Down
39 changes: 33 additions & 6 deletions kazoo/recipe/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os

from kazoo.exceptions import NoNodeError, KazooException
from kazoo.protocol.paths import _prefix_root
from kazoo.protocol.states import KazooState, EventType


Expand Down Expand Up @@ -57,6 +58,12 @@ def start(self):
After a cache started, all changes of subtree will be synchronized
from the ZooKeeper server. Events will be fired for those activity.
Don't forget to call :meth:`close` if a tree was started and you don't
need it anymore, or you will leak the memory of cached nodes, even if
you have released all references to the :class:`TreeCache` instance.
Because there are so many callbacks that have been registered to the
Kazoo client.
See also :meth:`~TreeCache.listen`.
.. note::
Expand All @@ -75,7 +82,10 @@ def start(self):
self._client.ensure_path(self._root._path)

if self._client.connected:
self._root.on_created()
# The on_created and other on_* methods must not be invoked outside
# the background task. This is the key to keep concurrency safe
# without lock.
self._in_background(self._root.on_created)

def close(self):
"""Closes the cache.
Expand All @@ -95,6 +105,10 @@ def close(self):
self._task_queue.put(self._STOP)
self._client.remove_listener(self._session_watcher)
with handle_exception(self._error_listeners):
# We must invoke on_deleted outside background queue because:
# 1. The background task has been stopped.
# 2. The on_deleted on closed tree does not communicate with
# ZooKeeper actually.
self._root.on_deleted()

def listen(self, listener):
Expand Down Expand Up @@ -185,6 +199,9 @@ def _do_background(self):
func, args, kwargs = cb
func(*args, **kwargs)

# release before possible idle
del cb, func, args, kwargs

def _session_watcher(self, state):
if state == KazooState.SUSPENDED:
self._publish_event(TreeEvent.CONNECTION_SUSPENDED)
Expand Down Expand Up @@ -241,6 +258,7 @@ def on_deleted(self):
old_child.on_deleted()

if self._tree._state == self._tree.STATE_CLOSED:
self._reset_watchers()
return

old_state, self._state = self._state, self.STATE_DEAD
Expand All @@ -253,10 +271,18 @@ def on_deleted(self):
child = self._path[len(self._parent._path) + 1:]
if self._parent._children.get(child) is self:
del self._parent._children[child]
self._reset_watchers()

def _publish_event(self, *args, **kwargs):
return self._tree._publish_event(*args, **kwargs)

def _reset_watchers(self):
client = self._tree._client
for _watchers in (client._data_watchers, client._child_watchers):
_path = _prefix_root(client.chroot, self._path)
_watcher = _watchers.get(_path, set())
_watcher.discard(self._process_watch)

def _refresh(self):
self._refresh_data()
self._refresh_children()
Expand Down Expand Up @@ -391,10 +417,11 @@ def handle_exception(listeners):
yield
except Exception as e:
logger.debug('processing error: %r', e)
for listener in listeners:
try:
listener(e)
except: # pragma: no cover
logger.exception('Exception handling exception') # oops
if listeners:
for listener in listeners:
try:
listener(e)
except BaseException: # pragma: no cover
logger.exception('Exception handling exception') # oops
else:
logger.exception('No listener to process %r', e)
125 changes: 122 additions & 3 deletions kazoo/tests/test_cache.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,44 @@
import gc
import importlib
import uuid

from mock import patch, call, Mock
from nose.tools import eq_, ok_, assert_not_equal, raises
from objgraph import count as count_refs_by_type

from kazoo.testing import KazooTestCase
from kazoo.testing import KazooTestHarness
from kazoo.exceptions import KazooException
from kazoo.recipe.cache import TreeCache, TreeNode, TreeEvent


class KazooTreeCacheTests(KazooTestCase):
class KazooAdaptiveHandlerTestCase(KazooTestHarness):
HANDLERS = (
('kazoo.handlers.gevent', 'SequentialGeventHandler'),
('kazoo.handlers.eventlet', 'SequentialEventletHandler'),
('kazoo.handlers.threading', 'SequentialThreadingHandler'),
)

def setUp(self):
self.handler = self.choose_an_installed_handler()
self.setup_zookeeper(handler=self.handler)

def tearDown(self):
self.handler = None
self.teardown_zookeeper()

def choose_an_installed_handler(self):
for handler_module, handler_class in self.HANDLERS:
try:
mod = importlib.import_module(handler_module)
cls = getattr(mod, handler_class)
except ImportError:
continue
else:
return cls()
raise ImportError('No available handler')


class KazooTreeCacheTests(KazooAdaptiveHandlerTestCase):
def setUp(self):
super(KazooTreeCacheTests, self).setUp()
self._event_queue = self.client.handler.queue_impl()
Expand All @@ -18,12 +47,15 @@ def setUp(self):
self.cache = None

def tearDown(self):
super(KazooTreeCacheTests, self).tearDown()
if not self._error_queue.empty():
try:
raise self._error_queue.get()
except FakeException:
pass
if self.cache is not None:
self.cache.close()
self.cache = None
super(KazooTreeCacheTests, self).tearDown()

def make_cache(self):
if self.cache is None:
Expand Down Expand Up @@ -51,6 +83,29 @@ def spy_client(self, method_name):
method = getattr(self.client, method_name)
return patch.object(self.client, method_name, wraps=method)

def _wait_gc(self):
# trigger switching on some coroutine handlers
self.client.handler.sleep_func(0.1)

completion_queue = getattr(self.handler, 'completion_queue', None)
if completion_queue is not None:
while not self.client.handler.completion_queue.empty():
self.client.handler.sleep_func(0.1)

for gen in range(3):
gc.collect(gen)

def count_tree_node(self):
# inspect GC and count tree nodes for checking memory leak
for retry in range(10):
result = set()
for _ in range(5):
self._wait_gc()
result.add(count_refs_by_type('TreeNode'))
if len(result) == 1:
return list(result)[0]
raise RuntimeError('could not count refs exactly')

def test_start(self):
self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
Expand All @@ -74,12 +129,29 @@ def test_start_closed(self):
self.cache.start()

def test_close(self):
eq_(self.count_tree_node(), 0)

self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
self.client.create(self.path + '/foo/bar/baz', makepath=True)
for _ in range(3):
self.wait_cache(TreeEvent.NODE_ADDED)

# setup stub watchers which are outside of tree cache
stub_data_watcher = Mock(spec=lambda event: None)
stub_child_watcher = Mock(spec=lambda event: None)
self.client.get(self.path + '/foo', stub_data_watcher)
self.client.get_children(self.path + '/foo', stub_child_watcher)

# watchers inside tree cache should be here
root_path = self.client.chroot + self.path
eq_(len(self.client._data_watchers[root_path + '/foo']), 2)
eq_(len(self.client._data_watchers[root_path + '/foo/bar']), 1)
eq_(len(self.client._data_watchers[root_path + '/foo/bar/baz']), 1)
eq_(len(self.client._child_watchers[root_path + '/foo']), 2)
eq_(len(self.client._child_watchers[root_path + '/foo/bar']), 1)
eq_(len(self.client._child_watchers[root_path + '/foo/bar/baz']), 1)

self.cache.close()

# nothing should be published since tree closed
Expand All @@ -93,6 +165,53 @@ def test_close(self):
# node state should not be changed
assert_not_equal(self.cache._root._state, TreeNode.STATE_DEAD)

# watchers should be reset
eq_(len(self.client._data_watchers[root_path + '/foo']), 1)
eq_(len(self.client._data_watchers[root_path + '/foo/bar']), 0)
eq_(len(self.client._data_watchers[root_path + '/foo/bar/baz']), 0)
eq_(len(self.client._child_watchers[root_path + '/foo']), 1)
eq_(len(self.client._child_watchers[root_path + '/foo/bar']), 0)
eq_(len(self.client._child_watchers[root_path + '/foo/bar/baz']), 0)

# outside watchers should not be deleted
eq_(list(self.client._data_watchers[root_path + '/foo'])[0],
stub_data_watcher)
eq_(list(self.client._child_watchers[root_path + '/foo'])[0],
stub_child_watcher)

# should not be any leaked memory (tree node) here
self.cache = None
eq_(self.count_tree_node(), 0)

def test_delete_operation(self):
self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)

eq_(self.count_tree_node(), 1)

self.client.create(self.path + '/foo/bar/baz', makepath=True)
for _ in range(3):
self.wait_cache(TreeEvent.NODE_ADDED)

self.client.delete(self.path + '/foo', recursive=True)
for _ in range(3):
self.wait_cache(TreeEvent.NODE_REMOVED)

# tree should be empty
eq_(self.cache._root._children, {})

# watchers should be reset
root_path = self.client.chroot + self.path
eq_(self.client._data_watchers[root_path + '/foo'], set())
eq_(self.client._data_watchers[root_path + '/foo/bar'], set())
eq_(self.client._data_watchers[root_path + '/foo/bar/baz'], set())
eq_(self.client._child_watchers[root_path + '/foo'], set())
eq_(self.client._child_watchers[root_path + '/foo/bar'], set())
eq_(self.client._child_watchers[root_path + '/foo/bar/baz'], set())

# should not be any leaked memory (tree node) here
eq_(self.count_tree_node(), 1)

def test_children_operation(self):
self.make_cache()
self.wait_cache(since=TreeEvent.INITIALIZED)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ mock==1.0.1
nose==1.3.3
pure-sasl==0.5.1
flake8==2.3.0
objgraph==3.4.0
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
'nose',
'flake8',
'pure-sasl',
'objgraph',
]

if not (PYTHON3 or PYPY):
Expand Down

0 comments on commit c48f273

Please sign in to comment.