Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): Use google.protobuf.Value in v2 for passing parameters. #6804

Merged
merged 26 commits into from
Oct 28, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4966b0b
Use google.protobuf.Value in v2 for passing parameters.
neuromage Oct 26, 2021
8f14fb1
retest samples.
neuromage Oct 26, 2021
728f922
Fix tests.
neuromage Oct 27, 2021
5b13589
Merge branch 'master' into protobuf-value.
neuromage Oct 27, 2021
cf3407d
Update release, more cleanup.
neuromage Oct 27, 2021
7e6d18f
Use github.com/kubeflow/pipelines/api from same repo.
neuromage Oct 27, 2021
bde385f
Run go mod tidy
neuromage Oct 27, 2021
ac5f6e3
chore: go mod tidy
Bobgy Oct 27, 2021
fd8ee77
fix v2 compile error and clean up unused code
Bobgy Oct 27, 2021
216944a
pr comments.
neuromage Oct 27, 2021
87319b8
Merge branch 'protobuf-value' of github.com:neuromage/pipelines into …
neuromage Oct 27, 2021
107a492
Merge branch 'master' into protobuf-value
neuromage Oct 27, 2021
df770b8
Merge branch 'master' into protobuf-value
neuromage Oct 27, 2021
9b5e012
update goldens
neuromage Oct 27, 2021
c8d49a2
Fix metadata recording.
neuromage Oct 27, 2021
de1f943
Update kfp mlmd client.
neuromage Oct 27, 2021
bfc380b
fix test again
neuromage Oct 27, 2021
ad3e7f9
another try.
neuromage Oct 27, 2021
917d979
chore: migrate v2 DAG driver input parameters to protobuf.Value + sma…
Bobgy Oct 28, 2021
b0718ec
fix v2 launcher + clean up
Bobgy Oct 28, 2021
1df554a
fix a compile error
Bobgy Oct 28, 2021
24360f2
fix a few more tests
Bobgy Oct 28, 2021
7b9a681
fix number parsing
Bobgy Oct 28, 2021
547747e
clean up
Bobgy Oct 28, 2021
f2426b0
disable cache_v2 test.
neuromage Oct 28, 2021
35f8448
Merge branch 'protobuf-value' of github.com:neuromage/pipelines into …
neuromage Oct 28, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion api/v2alpha1/cache_key.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ syntax = "proto3";
option go_package = "github.com/kubeflow/pipelines/api/v2alpha1/go/cachekey";
package ml_pipelines;

import "google/protobuf/any.proto";
// import "google/protobuf/any.proto";
import "google/protobuf/struct.proto";
import "pipeline_spec.proto";

message CacheKey {
Expand All @@ -26,6 +27,7 @@ message CacheKey {
map<string, RuntimeArtifact> outputArtifactsSpec = 3;
map<string, string> outputParametersSpec=4;
ContainerSpec containerSpec=5;
map<string, google.protobuf.Value> input_parameter_values = 6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the other fields use camel case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, but proto style guide is explicitly underscore_separated_names for fields. This is what's used everywhere else as well:
https://developers.google.com/protocol-buffers/docs/style#message_and_field_names

My suggestion is to adopt the right style here for long term health, and consider replacing the camel case ones in a future PR.

}

message ContainerSpec {
Expand Down
189 changes: 107 additions & 82 deletions api/v2alpha1/go/cachekey/cache_key.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions api/v2alpha1/python/kfp/pipeline_spec/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

__path__ = __import__("pkgutil").extend_path(__path__, __name__)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is probably unnecessary because kfp-pipeline-spec is a native namespace package when we set it up:

packages=setuptools.find_namespace_packages(include=['kfp.*']),

ref: https://packaging.python.org/guides/packaging-namespace-packages/#native-namespace-packages

In comparison, to make the existing kfp package a namespace package, we do have to include this in its init file:

# `kfp` is a namespace package.
# https://packaging.python.org/guides/packaging-namespace-packages/#pkgutil-style-namespace-packages
__path__ = __import__("pkgutil").extend_path(__path__, __name__)

Also this change is not released, so it can't be needed for this PR, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, left over while trying to get both namespace packages to work nicely while locally installed. Removed.

8 changes: 4 additions & 4 deletions sdk/python/kfp/compiler/v2_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ def update_op(op: dsl.ContainerOp,
component_spec.input_definitions.parameters.items()):
parameter_info = {
"type":
pipeline_spec_pb2.PrimitiveType.PrimitiveTypeEnum.Name(spec.type
),
pipeline_spec_pb2.ParameterType.ParameterTypeEnum.Name(
spec.parameter_type),
}
op.command += [f"{parameter}={op._parameter_arguments[parameter]}"]
runtime_info["inputParameters"][parameter] = parameter_info
Expand All @@ -164,8 +164,8 @@ def update_op(op: dsl.ContainerOp,
component_spec.output_definitions.parameters.items()):
parameter_info = {
"type":
pipeline_spec_pb2.PrimitiveType.PrimitiveTypeEnum.Name(spec.type
),
pipeline_spec_pb2.ParameterType.ParameterTypeEnum.Name(
spec.parameter_type),
"path":
op.file_outputs[parameter],
}
Expand Down
37 changes: 25 additions & 12 deletions sdk/python/kfp/dsl/_component_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing import Any, Mapping, Optional

import kfp
from kfp.components import _structures
from kfp.components import _structures, _data_passing
from kfp.components import _components
from kfp.components import _naming
from kfp import dsl
Expand Down Expand Up @@ -195,10 +195,18 @@ def _create_container_op_from_component_and_arguments(
for input_spec in component_spec.inputs or []:
if input_spec.name not in arguments and input_spec.default is not None:
default_value = input_spec.default
print('Value: ', default_value, ' Type: ', type(default_value))
if input_spec.type == 'Integer':
default_value = int(default_value)
elif input_spec.type == 'Float':
default_value = float(default_value)
elif (type_utils.is_parameter_type(input_spec.type) and
kfp.COMPILING_FOR_V2):
parameter_type = type_utils.get_parameter_type(input_spec.type)
default_value = type_utils.deserialize_parameter_value(
value=default_value, parameter_type=parameter_type)

print('DESERIALIZED to: ', default_value)
arguments[input_spec.name] = default_value

# Check types of the reference arguments and serialize PipelineParams
Expand Down Expand Up @@ -573,24 +581,29 @@ def _resolve_ir_placeholders_v2(
input_type = component_spec._inputs_dict[input_name].type
if type_utils.is_parameter_type(input_type):
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant_value.string_value = (
argument_value)
input_name].runtime_value.constant.string_value = argument_value
elif isinstance(argument_value, int):
argument_type = 'Integer'
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant_value.int_value = (
argument_value)
input_name].runtime_value.constant.number_value = argument_value
elif isinstance(argument_value, float):
argument_type = 'Float'
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant_value.double_value = (
input_name].runtime_value.constant.number_value = argument_value
elif isinstance(argument_value, bool):
argument_type = 'Bool'
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant.bool_value = argument_value
elif isinstance(argument_value, list):
argument_type = 'List'
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant.list_value.extend(
argument_value)
elif isinstance(argument_value,
(dict, list, bool)) and kfp.COMPILING_FOR_V2:
argument_type = type(argument_value).__name__
elif isinstance(argument_value, dict):
argument_type = 'Dict'
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant_value.string_value = (
json.dumps(argument_value))
input_name].runtime_value.constant.struct_value.update(
argument_value)
elif isinstance(argument_value, _container_op.ContainerOp):
raise TypeError(
f'ContainerOp object {input_name} was passed to component as an '
Expand Down Expand Up @@ -625,7 +638,7 @@ def _resolve_ir_placeholders_v2(
if argument_is_parameter_type else 'Artifact',
input_name=input_name,
input_type=input_type,
input_category='Paramter'
input_category='Parameter'
if input_is_parameter_type else 'Artifact',
))

Expand Down
9 changes: 5 additions & 4 deletions sdk/python/kfp/dsl/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def build_component_spec_from_structure(
continue
if type_utils.is_parameter_type(input_spec.type):
result.input_definitions.parameters[
input_spec.name].type = type_utils.get_parameter_type(
input_spec.name].parameter_type = type_utils.get_parameter_type(
input_spec.type)
else:
result.input_definitions.artifacts[
Expand All @@ -108,7 +108,8 @@ def build_component_spec_from_structure(
for output_spec in component_spec.outputs or []:
if type_utils.is_parameter_type(output_spec.type):
result.output_definitions.parameters[
output_spec.name].type = type_utils.get_parameter_type(
output_spec
.name].parameter_type = type_utils.get_parameter_type(
output_spec.type)
else:
result.output_definitions.artifacts[
Expand Down Expand Up @@ -141,7 +142,7 @@ def build_component_inputs_spec(

if type_utils.is_parameter_type(param.param_type):
component_spec.input_definitions.parameters[
input_name].type = type_utils.get_parameter_type(
input_name].parameter_type = type_utils.get_parameter_type(
param.param_type)
elif input_name not in getattr(component_spec.input_definitions,
'parameters', []):
Expand All @@ -164,7 +165,7 @@ def build_component_outputs_spec(
output_name = param.full_name
if type_utils.is_parameter_type(param.param_type):
component_spec.output_definitions.parameters[
output_name].type = type_utils.get_parameter_type(
output_name].parameter_type = type_utils.get_parameter_type(
param.param_type)
elif output_name not in getattr(component_spec.output_definitions,
'parameters', []):
Expand Down
153 changes: 123 additions & 30 deletions sdk/python/kfp/v2/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,37 +549,120 @@ def _resolve_condition_operands(
operand2: Union[str, dsl.PipelineParam]) -> Tuple[str, str]:
"""Resolves values and PipelineParams for condition operands."""

# Pre-scan the operand to get the type of constant value if there's any.
# The value_type can be used to backfill missing PipelineParam.param_type.
value_type = None
for value_or_reference in [operand1, operand2]:
if not isinstance(value_or_reference,
(dsl.PipelineParam, int, float, bool, str)):
raise ValueError('Conditional requires scalar constant values'
' for comparison. Found "{}" of type {}'
' in pipeline definition instead.'.format(
value_or_reference,
type(value_or_reference)))

# Check specified type of PipelineParam is a scalar as well.
if isinstance(value_or_reference, dsl.PipelineParam):
continue
if isinstance(value_or_reference, float):
value_type = 'Float'
elif isinstance(value_or_reference, int):
value_type = 'Integer'
parameter_type = type_utils.get_parameter_type(
value_or_reference.param_type)

if parameter_type in [
pipeline_spec_pb2.ParameterType.STRUCT,
pipeline_spec_pb2.ParameterType.LIST,
pipeline_spec_pb2.ParameterType
.PARAMETER_TYPE_ENUM_UNSPECIFIED,
]:
input_name = dsl_component_spec.additional_input_name_for_pipelineparam(
value_or_reference)
raise ValueError(
'Conditional requires scalar parameter values'
' for comparison. Found input "{}" of type {}'
' in pipeline definition instead.'.format(
input_name, value_or_reference.param_type))

parameter_types = set()
for value_or_reference in [operand1, operand2]:
if isinstance(value_or_reference, dsl.PipelineParam):
parameter_type = type_utils.get_parameter_type(
value_or_reference.param_type)
else:
value_type = 'String'
parameter_type = type_utils.get_parameter_type(
type(value_or_reference).__name__)
print('{} GOT TYPE: {}, typename {}, isinstance_dict {}'.format(
value_or_reference, parameter_type,
str(type(value_or_reference)),
isinstance(value_or_reference, dict)))

parameter_types.add(parameter_type)

if len(parameter_types) == 2:
# Two different types being compared. The only possible types are
# String, Boolean, Double and Integer. We'll promote the other type
# using the following precedence:
# String > Boolean > Double > Integer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't aware that CEL can work this way.
The rest of the change to support Protobuf.Value in condition operand is also very nice. I'm going to steal your code in my experimental fork :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if pipeline_spec_pb2.ParameterType.STRING in parameter_types:
canonical_parameter_type = pipeline_spec_pb2.ParameterType.STRING
elif pipeline_spec_pb2.ParameterType.BOOLEAN in parameter_types:
canonical_parameter_type = pipeline_spec_pb2.ParameterType.BOOLEAN
else:
# Must be a double and int, promote to double.
assert pipeline_spec_pb2.ParameterType.NUMBER_DOUBLE in parameter_types, 'Types: {} [{} {}]'.format(
parameter_types, operand1, operand2)
assert pipeline_spec_pb2.ParameterType.NUMBER_INTEGER in parameter_types, 'Types: {} [{} {}]'.format(
parameter_types, operand1, operand2)
canonical_parameter_type = pipeline_spec_pb2.ParameterType.NUMBER_DOUBLE
elif len(parameter_types) == 1: # Both operands are the same type.
canonical_parameter_type = parameter_types.pop()
else:
# Probably shouldn't happen.
raise ValueError('Unable to determine operand types for'
' "{}" and "{}"'.format(operand1, operand2))

operand_values = []
for value_or_reference in [operand1, operand2]:
if isinstance(value_or_reference, dsl.PipelineParam):
input_name = dsl_component_spec.additional_input_name_for_pipelineparam(
value_or_reference)
# Condition operand is always parameters for now.
value_or_reference.param_type = (
value_or_reference.param_type or value_type)
operand_values.append(
"inputs.parameters['{input_name}'].{value_field}".format(
input_name=input_name,
value_field=type_utils.get_parameter_type_field_name(
value_or_reference.param_type)))
operand_value = "inputs.parameter_values['{input_name}']".format(
input_name=input_name)
parameter_type = type_utils.get_parameter_type(
value_or_reference.param_type)
if parameter_type == pipeline_spec_pb2.ParameterType.NUMBER_INTEGER:
operand_value = 'int({})'.format(operand_value)
elif isinstance(value_or_reference, str):
operand_value = "'{}'".format(value_or_reference)
parameter_type = pipeline_spec_pb2.ParameterType.STRING
elif isinstance(value_or_reference, bool):
# Booleans need to be compared as 'true' or 'false' in CEL.
operand_value = str(value_or_reference).lower()
parameter_type = pipeline_spec_pb2.ParameterType.BOOLEAN
elif isinstance(value_or_reference, int):
operand_value = str(value_or_reference)
parameter_type = pipeline_spec_pb2.ParameterType.NUMBER_INTEGER
else:
if isinstance(value_or_reference, str):
operand_values.append("'{}'".format(value_or_reference))
assert isinstance(value_or_reference, float), value_or_reference
operand_value = str(value_or_reference)
parameter_type = pipeline_spec_pb2.ParameterType.NUMBER_DOUBLE

if parameter_type != canonical_parameter_type:
# Type-cast to so CEL does not complain.
if canonical_parameter_type == pipeline_spec_pb2.ParameterType.STRING:
assert parameter_type in [
pipeline_spec_pb2.ParameterType.BOOLEAN,
pipeline_spec_pb2.ParameterType.NUMBER_INTEGER,
pipeline_spec_pb2.ParameterType.NUMBER_DOUBLE,
]
operand_value = "'{}'".format(operand_value)
elif canonical_parameter_type == pipeline_spec_pb2.ParameterType.BOOLEAN:
assert parameter_type in [
pipeline_spec_pb2.ParameterType.NUMBER_INTEGER,
pipeline_spec_pb2.ParameterType.NUMBER_DOUBLE,
]
operand_value = 'true' if int(
operand_value) == 0 else 'false'
else:
operand_values.append(str(value_or_reference))
assert canonical_parameter_type == pipeline_spec_pb2.ParameterType.NUMBER_DOUBLE
assert parameter_type == pipeline_spec_pb2.ParameterType.NUMBER_INTEGER
operand_value = 'double({})'.format(operand_value)

operand_values.append(operand_value)

return tuple(operand_values)

Expand Down Expand Up @@ -822,7 +905,7 @@ def _group_to_dag_spec(
_for_loop.LoopArguments.LOOP_ITEM_NAME_BASE)

subgroup_component_spec.input_definitions.parameters[
loop_arguments_item].type = pipeline_spec_pb2.PrimitiveType.STRING
loop_arguments_item].parameter_type = pipeline_spec_pb2.ParameterType.STRING
subgroup_task_spec.parameter_iterator.items.input_parameter = (
input_parameter_name)
subgroup_task_spec.parameter_iterator.item_input = (
Expand Down Expand Up @@ -986,8 +1069,8 @@ def _create_pipeline_spec(

pipeline_spec.pipeline_info.name = pipeline.name
pipeline_spec.sdk_version = 'kfp-{}'.format(kfp.__version__)
# Schema version 2.0.0 is required for kfp-pipeline-spec>0.1.3.1
pipeline_spec.schema_version = '2.0.0'
# Schema version 2.1.0 is required for kfp-pipeline-spec>0.1.3.1
pipeline_spec.schema_version = '2.1.0'

dsl_component_spec.build_component_inputs_spec(
component_spec=pipeline_spec.root,
Expand Down Expand Up @@ -1170,6 +1253,7 @@ def _create_pipeline_v2(
pipeline_func)
pipeline_name = pipeline_name or pipeline_meta.name

print('PIPELINE META: ', pipeline_meta)
pipeline_root = getattr(pipeline_func, 'pipeline_root', None)

args_list = []
Expand Down Expand Up @@ -1204,13 +1288,22 @@ def _create_pipeline_v2(
# Fill in the default values.
args_list_with_defaults = []
if pipeline_meta.inputs:
args_list_with_defaults = [
dsl.PipelineParam(
sanitize_k8s_name(input_spec.name, True),
param_type=input_spec.type,
value=input_spec.default)
for input_spec in pipeline_meta.inputs
]
args_list_with_defaults = []
for input_spec in pipeline_meta.inputs:
default_value = input_spec.default

if input_spec.default is not None:
parameter_type = type_utils.get_parameter_type(
input_spec.type)
default_value = type_utils.deserialize_parameter_value(
value=input_spec.default, parameter_type=parameter_type)

print('DEFAULT: {}: {}'.format(input_spec.name, default_value))
args_list_with_defaults.append(
dsl.PipelineParam(
sanitize_k8s_name(input_spec.name, True),
param_type=input_spec.type,
value=default_value))

# Making the pipeline group name unique to prevent name clashes with templates
pipeline_group = dsl_pipeline.groups[0]
Expand Down
Loading