diff --git a/locust/runners.py b/locust/runners.py index c4ab2f870c..a2980408d6 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -9,14 +9,18 @@ from time import time import gevent +import greenlet import psutil from gevent.pool import Group +from . import User from .log import greenlet_exception_logger from .rpc import Message, rpc from .stats import RequestStats, setup_distributed_stats_event_listeners from .exception import RPCError +from .user.task import LOCUST_STATE_STOPPING + logger = logging.getLogger(__name__) @@ -224,25 +228,28 @@ def stop_users(self, user_count, stop_rate=None): sleep_time = 1.0 / stop_rate logger.info("Stopping %i users at rate of %g users/s" % (user_count, stop_rate)) - if self.environment.stop_timeout: - stop_group = Group() + async_calls_to_stop = Group() + stop_group = Group() while True: - user_to_stop = to_stop.pop(random.randint(0, len(to_stop) - 1)) + user_to_stop: User = to_stop.pop(random.randint(0, len(to_stop) - 1)) logger.debug("Stopping %s" % user_to_stop._greenlet.name) - if self.environment.stop_timeout: - if not user_to_stop.stop(self.user_greenlets, force=False): - # User.stop() returns False if the greenlet was not stopped, so we'll need - # to add it's greenlet to our stopping Group so we can wait for it to finish it's task - stop_group.add(user_to_stop._greenlet) + if user_to_stop._greenlet is greenlet.getcurrent(): + # User called runner.quit(), so dont block waiting for killing to finish" + user_to_stop._group.killone(user_to_stop._greenlet, block=False) + elif self.environment.stop_timeout: + async_calls_to_stop.add(gevent.spawn_later(0, User.stop, user_to_stop, force=False)) + stop_group.add(user_to_stop._greenlet) else: - user_to_stop.stop(self.user_greenlets, force=True) + async_calls_to_stop.add(gevent.spawn_later(0, User.stop, user_to_stop, force=True)) if to_stop: gevent.sleep(sleep_time) else: break - if self.environment.stop_timeout and not stop_group.join(timeout=self.environment.stop_timeout): + async_calls_to_stop.join() + + if not stop_group.join(timeout=self.environment.stop_timeout): logger.info( "Not all users finished their tasks & terminated in %s seconds. Stopping them..." % self.environment.stop_timeout diff --git a/locust/test/test_locust_class.py b/locust/test/test_locust_class.py index 2bb79cfe6d..6122cafffe 100644 --- a/locust/test/test_locust_class.py +++ b/locust/test/test_locust_class.py @@ -225,7 +225,7 @@ class MyUser(User): user = MyUser(self.environment) user.start(group) sleep(0.05) - user.stop(group) + user.stop() sleep(0) self.assertTrue(user.t2_executed) @@ -526,7 +526,7 @@ def t(self): self.assertEqual(1, user.test_state) # stop User gracefully - user.stop(group, force=False) + user.stop(force=False) sleep(0) # make sure instance is not killed right away self.assertIn(greenlet, group) @@ -555,7 +555,7 @@ def t(self): self.assertEqual(1, user.test_state) # stop User gracefully - user.stop(group, force=True) + user.stop(force=True) sleep(0) # make sure instance is killed right away, and that the task did NOT get to finish self.assertEqual(0, len(group)) diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index a3ce70bdc5..b28c43acc4 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -4,6 +4,7 @@ import gevent from gevent import sleep from gevent.queue import Queue +import greenlet import locust from locust import runners, between, constant, LoadTestShape @@ -360,14 +361,18 @@ def test_runner_reference_on_environment(self): self.assertEqual(env, runner.environment) self.assertEqual(runner, env.runner) - def test_users_can_call_runner_quit(self): + def test_users_can_call_runner_quit_without_deadlocking(self): class BaseUser(User): wait_time = constant(0) + stop_triggered = False @task def trigger(self): self.environment.runner.quit() + def on_stop(self): + BaseUser.stop_triggered = True + runner = Environment(user_classes=[BaseUser]).create_local_runner() runner.spawn_users(1, 1, wait=False) timeout = gevent.Timeout(0.5) @@ -379,6 +384,34 @@ def trigger(self): finally: timeout.cancel() + self.assertTrue(BaseUser.stop_triggered) + + def test_runner_quit_can_run_on_stop_for_multiple_users_concurrently(self): + class BaseUser(User): + wait_time = constant(0) + stop_count = 0 + + @task + def trigger(self): + pass + + def on_stop(self): + gevent.sleep(0.1) + BaseUser.stop_count += 1 + + runner = Environment(user_classes=[BaseUser]).create_local_runner() + runner.spawn_users(10, 10, wait=False) + timeout = gevent.Timeout(0.3) + timeout.start() + try: + runner.quit() + except gevent.Timeout: + self.fail("Got Timeout exception, runner must have hung somehow.") + finally: + timeout.cancel() + + self.assertEqual(10, BaseUser.stop_count) # verify that all users executed on_stop + def test_stop_users_with_spawn_rate(self): class MyUser(User): wait_time = constant(1) diff --git a/locust/user/task.py b/locust/user/task.py index 2c79780518..9dfb68a840 100644 --- a/locust/user/task.py +++ b/locust/user/task.py @@ -280,7 +280,8 @@ def run(self): self.schedule_task(self.get_next_task()) try: - self._check_stop_condition() + if self.user._state == LOCUST_STATE_STOPPING: + raise StopUser() self.execute_next_task() except RescheduleTaskImmediately: pass @@ -372,19 +373,17 @@ def wait(self): set a stop_timeout. If this behaviour is not desired you should make the user wait using gevent.sleep() instead. """ - self._check_stop_condition() + if self.user._state == LOCUST_STATE_STOPPING: + raise StopUser() self.user._state = LOCUST_STATE_WAITING self._sleep(self.wait_time()) - self._check_stop_condition() + if self.user._state == LOCUST_STATE_STOPPING: + raise StopUser() self.user._state = LOCUST_STATE_RUNNING def _sleep(self, seconds): gevent.sleep(seconds) - def _check_stop_condition(self): - if self.user._state == LOCUST_STATE_STOPPING: - raise StopUser() - def interrupt(self, reschedule=True): """ Interrupt the TaskSet and hand over execution control back to the parent TaskSet. diff --git a/locust/user/users.py b/locust/user/users.py index b85c1642a6..356cf40c41 100644 --- a/locust/user/users.py +++ b/locust/user/users.py @@ -1,5 +1,5 @@ from gevent import GreenletExit, greenlet - +from gevent.pool import Group from locust.clients import HttpSession from locust.exception import LocustError, StopUser from locust.util import deprecation @@ -111,7 +111,8 @@ class ForumPage(TaskSet): client = NoClientWarningRaiser() _state = None - _greenlet = None + _greenlet: greenlet.Greenlet = None + _group: Group _taskset_instance = None def __init__(self, environment): @@ -154,11 +155,11 @@ def wait(self): """ self._taskset_instance.wait() - def start(self, gevent_group): + def start(self, group: Group): """ Start a greenlet that runs this User instance. - :param gevent_group: Group instance where the greenlet will be spawned. + :param group: Group instance where the greenlet will be spawned. :type gevent_group: gevent.pool.Group :returns: The spawned greenlet. """ @@ -171,26 +172,21 @@ def run_user(user): """ user.run() - self._greenlet = gevent_group.spawn(run_user, self) + self._greenlet = group.spawn(run_user, self) + self._group = group return self._greenlet - def stop(self, gevent_group, force=False): + def stop(self, force=False): """ - Stop the user greenlet that exists in the gevent_group. + Stop the user greenlet. - :param gevent_group: Group instance where the greenlet will be spawned. - :type gevent_group: gevent.pool.Group :param force: If False (the default) the stopping is done gracefully by setting the state to LOCUST_STATE_STOPPING which will make the User instance stop once any currently running task is complete and on_stop methods are called. If force is True the greenlet will be killed immediately. :returns: True if the greenlet was killed immediately, otherwise False """ - if self._greenlet is greenlet.getcurrent(): - # the user is stopping itself (from within a task), so blocking would deadlock - gevent_group.killone(self._greenlet, block=False) - return True - elif force or self._state == LOCUST_STATE_WAITING: - gevent_group.killone(self._greenlet) + if force or self._state == LOCUST_STATE_WAITING: + self._group.killone(self._greenlet) return True elif self._state == LOCUST_STATE_RUNNING: self._state = LOCUST_STATE_STOPPING