Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update to actor list limit to fix issue 803 #814

Open
wants to merge 12 commits into
base: dev
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
from typing import Any

import ray
from ray.experimental.state.api import list_actors
from data_processing.utils import GB, UnrecoverableException
from data_processing.utils import GB, UnrecoverableException, get_logger
from ray.actor import ActorHandle
from ray.exceptions import RayError
from ray.experimental.state.api import list_actors
from ray.util.actor_pool import ActorPool


MAX_LIST = 10000 # Max number of actors returned by list
cmadam marked this conversation as resolved.
Show resolved Hide resolved


class RayUtils:
"""
Class implementing support methods for Ray execution
Expand Down Expand Up @@ -109,16 +112,38 @@ def operator() -> ActorHandle:
time.sleep(creation_delay)
return clazz.options(**actor_options).remote(params)

cls_name = clazz.__class__.__name__.replace('ActorClass(', '').replace(')','')
logger = get_logger(__name__)
cls_name = clazz.__class__.__name__.replace("ActorClass(", "").replace(")", "")
current = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")], limit=MAX_LIST)
cmadam marked this conversation as resolved.
Show resolved Hide resolved
c_len = len(current)
actors = [operator() for _ in range(n_actors)]
for i in range(120):
time.sleep(1)
alive = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")])
if len(actors) == len(alive):
return actors
# failed - raise an exception
print(f"created {actors}, alive {alive}")
raise UnrecoverableException(f"out of {len(actors)} created actors only {len(alive)} alive")
overall = c_len + n_actors
if overall < MAX_LIST:
cmadam marked this conversation as resolved.
Show resolved Hide resolved
n_list = min(overall + 10, MAX_LIST)
alive = []
for i in range(120):
time.sleep(1)
alive = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")], limit=n_list)
if len(alive) >= n_actors + c_len:
return actors
# failed
cmadam marked this conversation as resolved.
Show resolved Hide resolved
if len(alive) >= n_actors / 2 + c_len:
# At least half of the actors were created
logger.info(f"created {n_actors}, alive {len(alive)} Running with less actors")
created_ids = [item.actor_id for item in alive if item not in current]
return [
actor
for actor in actors
if (str(actor._ray_actor_id).replace("ActorID(", "").replace(")", "") in created_ids)
]
else:
# too few actors created
raise UnrecoverableException(f"Created {n_actors}, alive {len(alive)}. Too few actors were created")
else:
raise UnrecoverableException(
f"Overall number of actors of class {cls_name} is {overall}. "
f"Too many actors to create, greater then {MAX_LIST}"
)

@staticmethod
def process_files(
Expand Down