-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Updates ExpansionService to support dynamically discovering and expanding SchemaTransforms #23413
Conversation
Codecov Report
@@ Coverage Diff @@
## master #23413 +/- ##
==========================================
+ Coverage 73.08% 73.31% +0.23%
==========================================
Files 725 711 -14
Lines 96664 96195 -469
==========================================
- Hits 70647 70530 -117
+ Misses 24705 24353 -352
Partials 1312 1312
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Assigning reviewers. If you would like to opt out of this review, comment R: @AnandInguva for label python. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
R: @robertwb |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
Outdated
Show resolved
Hide resolved
...main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java
Outdated
Show resolved
Hide resolved
return transformProvider; | ||
} | ||
|
||
static class RowTransform extends PTransform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps at least a 1-line docstring as to why this class exists?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at this a bit more, inputs always come in (and leave) as dicts. Could we just preserve them in this case rather than adding another level of indirection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a docstring explaining why this is needed. May be we could get rid of this in the future if PCollectionRowTuple becomes a portable type that is understood by all SDKs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this class basically exists to undo the logic at https://github.com/apache/beam/blob/release-2.42.0/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L348 .
I think we should instead override the createInput (and possibly extractOutputs) methods in ExpansionServiceSchemaTransformProvider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that worked! Updated. Thanks.
} | ||
|
||
@Override | ||
public POutput expand(PInput input) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For UI and uniqueness considerations, should we inherit the stage name of its underlying transform?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can use the exact same name due to uniqueness requirement of runners but changed the name of this transform to a derivative of the underlying transform.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One can nest transforms of the same name. (In fact, nesting with distinct prefixes is how we get around users having to specify names everywhere.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RowTransform class was removed. So this is obsolete now.
} | ||
PCollectionRowTuple output = inputRowTuple.apply(this.rowTuplePTransform); | ||
|
||
if (output.getAll().size() > 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any advantage to doing this? (In particular, the name of the single output gets lost.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just converts the PCollectionRowTuple to a PCollectionTuple. I think currently we need this conversion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant why can't we just return the PCollectionRowTuple itself? (Possibly we need to update the expansion service code to handle this type, but that should be done anyway.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RowTransform class was removed. So this is obsolete now.
@@ -104,6 +105,43 @@ def payload(self): | |||
""" | |||
return self.build().SerializeToString() | |||
|
|||
def _get_schema_proto_and_payload(self, ignored_arg_format, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ignored_arg_format
param feels a bit awkward, and it'd be easy to accidentally forget it and grab the first arg as well (and I'm not sure the JavaClassLookupPayloadBuilder.IGNORED_ARG_FORMAT logic should be in this superclass as well).
Instead, could we have the java-class calling one just make a dict using JavaClassLookupPayloadBuilder.IGNORED_ARG_FORMAT for the positional args, and then call shared code to go from a dict to a schema and payload?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
self._expansion_service) | ||
|
||
@staticmethod | ||
def discover(expansion_service, regex=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For simplicity I think we can skip the regex filtering until we know we need it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
} | ||
|
||
Iterable<org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider> getAllProviders() { | ||
return schemaTransformProviders.values(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so if a provider is not (duh) provided, even if the SchemaTransform implementation exists, it would not show up in the Discover call. Is that fine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would hope that we can eventually replace writing a Provider with a decorator on the (suitably configured) PTransform class itself, but that's future work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. Resolving.
@pabloem I don't think a SchemaTransform is useful without the provider. Is that not the case ? BTW I was held up due to some other work but hope to address comments here and push a change soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. PTAL.
return transformProvider; | ||
} | ||
|
||
static class RowTransform extends PTransform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a docstring explaining why this is needed. May be we could get rid of this in the future if PCollectionRowTuple becomes a portable type that is understood by all SDKs.
} | ||
|
||
@Override | ||
public POutput expand(PInput input) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can use the exact same name due to uniqueness requirement of runners but changed the name of this transform to a derivative of the underlying transform.
...main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java
Outdated
Show resolved
Hide resolved
@@ -104,6 +105,43 @@ def payload(self): | |||
""" | |||
return self.build().SerializeToString() | |||
|
|||
def _get_schema_proto_and_payload(self, ignored_arg_format, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
self._expansion_service) | ||
|
||
@staticmethod | ||
def discover(expansion_service, regex=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
} | ||
PCollectionRowTuple output = inputRowTuple.apply(this.rowTuplePTransform); | ||
|
||
if (output.getAll().size() > 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This just converts the PCollectionRowTuple to a PCollectionTuple. I think currently we need this conversion.
Retest this please |
Run Java PreCommit |
Run Python PreCommit |
Friendly ping :) |
|
||
@Override | ||
public String getName() { | ||
return "RowTransform_of_" + this.rowTuplePTransform.getName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the "RowTransform_of_" prefix add value? Maybe simply drop it, as this will be the name the user sees before drilling down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RowTransform class was removed. So this is obsolete now.
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public POutput expand(PInput input) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One can nest transforms of the same name. (In fact, nesting with distinct prefixes is how we get around users having to specify names everywhere.)
} | ||
PCollectionRowTuple output = inputRowTuple.apply(this.rowTuplePTransform); | ||
|
||
if (output.getAll().size() > 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant why can't we just return the PCollectionRowTuple itself? (Possibly we need to update the expansion service code to handle this type, but that should be done anyway.)
return transformProvider; | ||
} | ||
|
||
static class RowTransform extends PTransform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this class basically exists to undo the logic at https://github.com/apache/beam/blob/release-2.42.0/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L348 .
I think we should instead override the createInput (and possibly extractOutputs) methods in ExpansionServiceSchemaTransformProvider.
} | ||
|
||
Iterable<org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider> getAllProviders() { | ||
return schemaTransformProviders.values(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would hope that we can eventually replace writing a Provider with a decorator on the (suitably configured) PTransform class itself, but that's future work.
# Expand the transform using the expansion service and the config_row. | ||
if self._expansion_service is None: | ||
self._expansion_service = BeamJarExpansionService( | ||
':sdks:java:expansion-service:app:shadowJar', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we expect many schema transforms to live in this jar, or should we make identifying the expansion service mandatory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't expect many (or any) SchemaTransforms to live in this jar. So I believe we should make specifying an expansion service mandatory.
ComplexType = typing.NamedTuple( | ||
"ComplexType", [ | ||
("str_sub_field", str), | ||
("int_sub_field", np.int32), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we just use int here? (Does that give us an int64?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah just "int" works. Updated.
Thanks. Hope to respond by the end of the week (have been bit busy due to the release). |
1dde786
to
a36c489
Compare
a36c489
to
82a0f28
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Robert. PTAL.
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto
Outdated
Show resolved
Hide resolved
return transformProvider; | ||
} | ||
|
||
static class RowTransform extends PTransform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that worked! Updated. Thanks.
} | ||
|
||
@Override | ||
public POutput expand(PInput input) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RowTransform class was removed. So this is obsolete now.
} | ||
PCollectionRowTuple output = inputRowTuple.apply(this.rowTuplePTransform); | ||
|
||
if (output.getAll().size() > 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RowTransform class was removed. So this is obsolete now.
|
||
@Override | ||
public String getName() { | ||
return "RowTransform_of_" + this.rowTuplePTransform.getName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RowTransform class was removed. So this is obsolete now.
} | ||
|
||
Iterable<org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider> getAllProviders() { | ||
return schemaTransformProviders.values(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. Resolving.
# Expand the transform using the expansion service and the config_row. | ||
if self._expansion_service is None: | ||
self._expansion_service = BeamJarExpansionService( | ||
':sdks:java:expansion-service:app:shadowJar', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't expect many (or any) SchemaTransforms to live in this jar. So I believe we should make specifying an expansion service mandatory.
ComplexType = typing.NamedTuple( | ||
"ComplexType", [ | ||
("str_sub_field", str), | ||
("int_sub_field", np.int32), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah just "int" works. Updated.
return PCollectionRowTuple.of( | ||
DEFAULT_INPUT_TAG, (PCollection<Row>) inputs.values().iterator().next()); | ||
} else { | ||
PCollectionRowTuple inputRowTuple = PCollectionRowTuple.empty(p); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we use this for all arities, rather than special-casing the 0 and 1 sizes above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
} else if (output.getAll().size() == 1) { | ||
return ImmutableMap.of("output", output.getAll().values().iterator().next()); | ||
} else { | ||
ImmutableMap.Builder<String, PCollection<?>> pCollectionMap = ImmutableMap.builder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
Thanks! |
Run Java PreCommit |
Run GoPortable PreCommit |
Run Java PreCommit |
1 similar comment
Run Java PreCommit |
Run GoPortable PreCommit |
Run Java_Examples_Dataflow PreCommit |
Run GoPortable PreCommit |
Run Java_Examples_Dataflow_Java11 PreCommit |
Run Java_Examples_Dataflow_Java17 PreCommit |
…ding SchemaTransforms (apache#23413) * Updates ExpansionService to support dynamically discovering and expanding SchemaTransforms * Fixing checker framework errors. * Address reviewer comments * Addressing reviewer comments * Addressing reviewer comments
This adds:
Support for dynamically discovering and registering SchemaTransforms in the Java expansion service.
Support for dynamically discovering registered SchemaTransforms from the Python side.
Support for using SchemaTransforms in Python pipelines.
This addresses #23412
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.