Skip to content

Commit

Permalink
Merge pull request opendatahub-io#197 from leseb/use-kubernetes-pytho…
Browse files Browse the repository at this point in the history
…n-lib

feat: stop using kubectl to apply pytorchjob
  • Loading branch information
tumido authored Nov 22, 2024
2 parents 7df7f5a + 79f4786 commit c5133e2
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 364 deletions.
48 changes: 9 additions & 39 deletions pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ def pipeline_wrapper(mock: List[Literal[MOCKED_STAGES]]):
)
from utils.faked import (
huggingface_importer_op,
kubectl_apply_op,
kubectl_wait_for_op,
pvc_to_model_op,
pvc_to_mt_bench_op,
)
Expand All @@ -94,8 +92,6 @@ def pipeline_wrapper(mock: List[Literal[MOCKED_STAGES]]):
)
from utils import (
huggingface_importer_op,
kubectl_apply_op,
kubectl_wait_for_op,
pvc_to_model_op,
pvc_to_mt_bench_op,
)
Expand Down Expand Up @@ -305,9 +301,10 @@ def pipeline(
storage_class_name=k8s_storage_class_name,
)

# Training 1
# Using pvc_create_task.output as PyTorchJob name since dsl.PIPELINE_* global variables do not template/work in KFP v2
# https://github.com/kubeflow/pipelines/issues/10453
pytorchjob_manifest_task = pytorchjob_manifest_op(
training_phase_1 = pytorchjob_manifest_op(
model_pvc_name=model_pvc_task.output,
input_pvc_name=sdg_input_pvc_task.output,
name_suffix=sdg_input_pvc_task.output,
Expand All @@ -323,25 +320,11 @@ def pipeline(
max_batch_len=train_max_batch_len,
seed=train_seed,
)
pytorchjob_manifest_task.set_caching_options(False)

kubectl_apply_task = kubectl_apply_op(
manifest=pytorchjob_manifest_task.outputs["manifest"]
)
kubectl_apply_task.after(data_processing_task, model_to_pvc_task)
kubectl_apply_task.set_caching_options(False)

kubectl_wait_task = kubectl_wait_for_op(
condition="condition=Succeeded",
kind="pytorchjobs",
name=pytorchjob_manifest_task.outputs["name"],
)
kubectl_wait_task.after(kubectl_apply_task)
kubectl_wait_task.set_caching_options(False)
training_phase_1.after(data_processing_task, model_to_pvc_task)
training_phase_1.set_caching_options(False)

#### Train 2

pytorchjob_manifest_2_task = pytorchjob_manifest_op(
training_phase_2 = pytorchjob_manifest_op(
model_pvc_name=model_pvc_task.output,
input_pvc_name=sdg_input_pvc_task.output,
name_suffix=sdg_input_pvc_task.output,
Expand All @@ -358,33 +341,20 @@ def pipeline(
seed=train_seed,
)

pytorchjob_manifest_2_task.set_caching_options(False)
pytorchjob_manifest_2_task.after(kubectl_wait_task)
training_phase_2.set_caching_options(False)
training_phase_2.after(training_phase_1)

mount_pvc(
task=pytorchjob_manifest_2_task,
task=training_phase_2,
pvc_name=output_pvc_task.output,
mount_path="/output",
)

kubectl_apply_2_task = kubectl_apply_op(
manifest=pytorchjob_manifest_2_task.outputs["manifest"]
)
kubectl_apply_2_task.set_caching_options(False)

kubectl_wait_2_task = kubectl_wait_for_op(
condition="condition=Succeeded",
kind="pytorchjobs",
name=pytorchjob_manifest_2_task.outputs["name"],
)
kubectl_wait_2_task.after(kubectl_apply_2_task)
kubectl_wait_2_task.set_caching_options(False)

models_list_2_task = list_models_in_directory_op(
models_folder="/output/phase_2/model/hf_format",
)
models_list_2_task.set_caching_options(False)
models_list_2_task.after(kubectl_wait_2_task)
models_list_2_task.after(training_phase_2)
mount_pvc(
task=models_list_2_task,
pvc_name=output_pvc_task.output,
Expand Down
Loading

0 comments on commit c5133e2

Please sign in to comment.