Skip to content

Commit

Permalink
Merge pull request apache#29322: Adds a list of requirements to the e…
Browse files Browse the repository at this point in the history
…xpansion request
  • Loading branch information
chamikaramj authored Nov 8, 2023
2 parents f929d08 + bb864ab commit 2ed876f
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ message ExpansionRequest {
// coders for the output PCollections. Note that the request
// may not be fulfilled.
map<string, string> output_coder_requests = 4;

// A set of requirements that must be used by the expansion service to
// interpret the components provided with this request.
repeated string requirements = 5;
}

message ExpansionResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ RunnerApi.Pipeline updateTransformViaTransformService(
.setComponents(runnerAPIpipeline.getComponents())
.setTransform(ptransformBuilder.build())
.setNamespace(UPGRADE_NAMESPACE)
.addAllRequirements(runnerAPIpipeline.getRequirementsList())
.build();

ExpansionApi.ExpansionResponse response =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ private Map<String, TransformProvider> loadRegisteredTransforms() {
// Needed to find which transform was new...
SdkComponents sdkComponents =
rehydratedComponents
.getSdkComponents(Collections.emptyList())
.getSdkComponents(request.getRequirementsList())
.withNewIdPrefix(request.getNamespace());
sdkComponents.registerEnvironment(
Environments.createOrGetDefaultEnvironment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def with_pipeline(component, pcoll_id=None):
context = pipeline_context.PipelineContext(
request.components,
default_environment=self._default_environment,
namespace=request.namespace)
namespace=request.namespace,
requirements=request.requirements)
producers = {
pcoll_id: (context.transforms.get_by_id(t_id), pcoll_tag)
for t_id,
Expand Down

0 comments on commit 2ed876f

Please sign in to comment.