Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and fix delayed user stopping in combination with on_stop #1560

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@
from time import time

import gevent
import greenlet
import psutil
from gevent.pool import Group

from . import User
from .log import greenlet_exception_logger
from .rpc import Message, rpc
from .stats import RequestStats, setup_distributed_stats_event_listeners

from .exception import RPCError
from .user.task import LOCUST_STATE_STOPPING


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -224,25 +228,28 @@ def stop_users(self, user_count, stop_rate=None):
sleep_time = 1.0 / stop_rate
logger.info("Stopping %i users at rate of %g users/s" % (user_count, stop_rate))

if self.environment.stop_timeout:
stop_group = Group()
async_calls_to_stop = Group()
stop_group = Group()

while True:
user_to_stop = to_stop.pop(random.randint(0, len(to_stop) - 1))
user_to_stop: User = to_stop.pop(random.randint(0, len(to_stop) - 1))
logger.debug("Stopping %s" % user_to_stop._greenlet.name)
if self.environment.stop_timeout:
if not user_to_stop.stop(self.user_greenlets, force=False):
# User.stop() returns False if the greenlet was not stopped, so we'll need
# to add it's greenlet to our stopping Group so we can wait for it to finish it's task
stop_group.add(user_to_stop._greenlet)
if user_to_stop._greenlet is greenlet.getcurrent():
# User called runner.quit(), so dont block waiting for killing to finish"
user_to_stop._group.killone(user_to_stop._greenlet, block=False)
elif self.environment.stop_timeout:
async_calls_to_stop.add(gevent.spawn_later(0, User.stop, user_to_stop, force=False))
stop_group.add(user_to_stop._greenlet)
else:
user_to_stop.stop(self.user_greenlets, force=True)
async_calls_to_stop.add(gevent.spawn_later(0, User.stop, user_to_stop, force=True))
if to_stop:
gevent.sleep(sleep_time)
else:
break

if self.environment.stop_timeout and not stop_group.join(timeout=self.environment.stop_timeout):
async_calls_to_stop.join()

if not stop_group.join(timeout=self.environment.stop_timeout):
logger.info(
"Not all users finished their tasks & terminated in %s seconds. Stopping them..."
% self.environment.stop_timeout
Expand Down
6 changes: 3 additions & 3 deletions locust/test/test_locust_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class MyUser(User):
user = MyUser(self.environment)
user.start(group)
sleep(0.05)
user.stop(group)
user.stop()
sleep(0)

self.assertTrue(user.t2_executed)
Expand Down Expand Up @@ -526,7 +526,7 @@ def t(self):
self.assertEqual(1, user.test_state)

# stop User gracefully
user.stop(group, force=False)
user.stop(force=False)
sleep(0)
# make sure instance is not killed right away
self.assertIn(greenlet, group)
Expand Down Expand Up @@ -555,7 +555,7 @@ def t(self):
self.assertEqual(1, user.test_state)

# stop User gracefully
user.stop(group, force=True)
user.stop(force=True)
sleep(0)
# make sure instance is killed right away, and that the task did NOT get to finish
self.assertEqual(0, len(group))
Expand Down
35 changes: 34 additions & 1 deletion locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import gevent
from gevent import sleep
from gevent.queue import Queue
import greenlet

import locust
from locust import runners, between, constant, LoadTestShape
Expand Down Expand Up @@ -360,14 +361,18 @@ def test_runner_reference_on_environment(self):
self.assertEqual(env, runner.environment)
self.assertEqual(runner, env.runner)

def test_users_can_call_runner_quit(self):
def test_users_can_call_runner_quit_without_deadlocking(self):
class BaseUser(User):
wait_time = constant(0)
stop_triggered = False

@task
def trigger(self):
self.environment.runner.quit()

def on_stop(self):
BaseUser.stop_triggered = True

runner = Environment(user_classes=[BaseUser]).create_local_runner()
runner.spawn_users(1, 1, wait=False)
timeout = gevent.Timeout(0.5)
Expand All @@ -379,6 +384,34 @@ def trigger(self):
finally:
timeout.cancel()

self.assertTrue(BaseUser.stop_triggered)

def test_runner_quit_can_run_on_stop_for_multiple_users_concurrently(self):
class BaseUser(User):
wait_time = constant(0)
stop_count = 0

@task
def trigger(self):
pass

def on_stop(self):
gevent.sleep(0.1)
BaseUser.stop_count += 1

runner = Environment(user_classes=[BaseUser]).create_local_runner()
runner.spawn_users(10, 10, wait=False)
timeout = gevent.Timeout(0.3)
timeout.start()
try:
runner.quit()
except gevent.Timeout:
self.fail("Got Timeout exception, runner must have hung somehow.")
finally:
timeout.cancel()

self.assertEqual(10, BaseUser.stop_count) # verify that all users executed on_stop

def test_stop_users_with_spawn_rate(self):
class MyUser(User):
wait_time = constant(1)
Expand Down
13 changes: 6 additions & 7 deletions locust/user/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ def run(self):
self.schedule_task(self.get_next_task())

try:
self._check_stop_condition()
if self.user._state == LOCUST_STATE_STOPPING:
raise StopUser()
self.execute_next_task()
except RescheduleTaskImmediately:
pass
Expand Down Expand Up @@ -372,19 +373,17 @@ def wait(self):
set a stop_timeout. If this behaviour is not desired you should make the user wait using
gevent.sleep() instead.
"""
self._check_stop_condition()
if self.user._state == LOCUST_STATE_STOPPING:
raise StopUser()
self.user._state = LOCUST_STATE_WAITING
self._sleep(self.wait_time())
self._check_stop_condition()
if self.user._state == LOCUST_STATE_STOPPING:
raise StopUser()
self.user._state = LOCUST_STATE_RUNNING

def _sleep(self, seconds):
gevent.sleep(seconds)

def _check_stop_condition(self):
if self.user._state == LOCUST_STATE_STOPPING:
raise StopUser()

def interrupt(self, reschedule=True):
"""
Interrupt the TaskSet and hand over execution control back to the parent TaskSet.
Expand Down
26 changes: 11 additions & 15 deletions locust/user/users.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from gevent import GreenletExit, greenlet

from gevent.pool import Group
from locust.clients import HttpSession
from locust.exception import LocustError, StopUser
from locust.util import deprecation
Expand Down Expand Up @@ -111,7 +111,8 @@ class ForumPage(TaskSet):

client = NoClientWarningRaiser()
_state = None
_greenlet = None
_greenlet: greenlet.Greenlet = None
_group: Group
_taskset_instance = None

def __init__(self, environment):
Expand Down Expand Up @@ -154,11 +155,11 @@ def wait(self):
"""
self._taskset_instance.wait()

def start(self, gevent_group):
def start(self, group: Group):
"""
Start a greenlet that runs this User instance.

:param gevent_group: Group instance where the greenlet will be spawned.
:param group: Group instance where the greenlet will be spawned.
:type gevent_group: gevent.pool.Group
:returns: The spawned greenlet.
"""
Expand All @@ -171,26 +172,21 @@ def run_user(user):
"""
user.run()

self._greenlet = gevent_group.spawn(run_user, self)
self._greenlet = group.spawn(run_user, self)
self._group = group
return self._greenlet

def stop(self, gevent_group, force=False):
def stop(self, force=False):
"""
Stop the user greenlet that exists in the gevent_group.
Stop the user greenlet.

:param gevent_group: Group instance where the greenlet will be spawned.
:type gevent_group: gevent.pool.Group
:param force: If False (the default) the stopping is done gracefully by setting the state to LOCUST_STATE_STOPPING
which will make the User instance stop once any currently running task is complete and on_stop
methods are called. If force is True the greenlet will be killed immediately.
:returns: True if the greenlet was killed immediately, otherwise False
"""
if self._greenlet is greenlet.getcurrent():
# the user is stopping itself (from within a task), so blocking would deadlock
gevent_group.killone(self._greenlet, block=False)
return True
elif force or self._state == LOCUST_STATE_WAITING:
gevent_group.killone(self._greenlet)
if force or self._state == LOCUST_STATE_WAITING:
self._group.killone(self._greenlet)
return True
elif self._state == LOCUST_STATE_RUNNING:
self._state = LOCUST_STATE_STOPPING
Expand Down