Replies: 1 comment 3 replies
-
Other Things I've Tried
This doesn't work - the deployment does not respect the task runner override on the task_runner = create_task_runner(
task_runner_class=kwargs.pop("task_runner_class", "ConcurrentTaskRunner"),
task_runner_kwargs=kwargs.pop("task_runner_kwargs", {}),
)
flow = flow.with_options(task_runner=task_runner)
# flow.task_runner = task_runner # setting this directly doesn't work either
return Deployment.build_from_flow(flow, name, **args)
def create_task_runner():
print("setting task runner . . .")
task_runner_class = prefect.context.get_run_context().parameters.get(
"task_runner_class", "ConcurrentTaskRunner"
)
task_runner_kwargs = prefect.context.get_run_context().parameters.get(
"task_runner_kwargs", {}
)
TASK_RUNNER_MAP = {
"SequentialTaskRunner": SequentialTaskRunner(),
"ConcurrentTaskRunner": ConcurrentTaskRunner(),
"DaskTaskRunner": DaskTaskRunner(),
}
if task_runner_class not in TASK_RUNNER_MAP.keys():
runners_csv_string = ", ".join(TASK_RUNNER_MAP.keys())
raise ValueError(
f"""
Unsupported task_runner_class: {task_runner_class}.
Must be one of {runners_csv_string}
See https://docs-2.prefect.io/unreleased/concepts/task-runners/
"""
)
task_runner = TASK_RUNNER_MAP[task_runner_class]
task_runner.__dict__.update(task_runner_kwargs)
return task_runner
@flow(
log_prints=True,
task_runner=create_task_runner,
)
def my_flow(task_runner_class: str = "ConcurrentTaskRunner", task_runner_kwargs: dict = {}):
pass ^^ This approach results in the error
|
Beta Was this translation helpful? Give feedback.
3 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I've been attempting an approach to dynamically set task runners via flow parameters, but this appears to not be well supported at the moment. For reference, I am using Prefect
2.15.0
which is a bit old, but from my research this doesn't appear to be well supported in later versions eitherRelated Slack discussion: https://prefect-community.slack.com/archives/CL09KU1K7/p1731688500092749
I have managed to achieve this for flows which are invoked as Python scripts (see below example), though I have not managed to get this working for deployments created with
Deployment.build_from_flow(flow)
; my intuition is that this is not possible because the task runner is "baked into" the flow definition at the time of deployment creation, and as such cannot be set at flow runtime. If this is not the case I'd love to hear of any more dynamic approaches the community is aware ofWorking Example Using Python Script
^^ This doesn't work for deployments built with
Deployment.build_from_flow(flow)
for 2 reasons:dynamic_task_runner
decorator function returns afunction
, not aFlow
object, which is required forDeployment.build_from_flow(flow)
. This is easily remedied by tweaking the decorator function definitionBeta Was this translation helpful? Give feedback.
All reactions