Skip to content

Commit

Permalink
filter adding impulse only for python
Browse files Browse the repository at this point in the history
  • Loading branch information
riteshghorse committed Sep 28, 2022
1 parent 045d31a commit dabe9f0
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions sdks/go/pkg/beam/core/runtime/xlangx/expand.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package xlangx
import (
"context"
"fmt"
"strings"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
Expand Down Expand Up @@ -65,8 +66,11 @@ func Expand(edge *graph.MultiEdge, ext *graph.ExternalTransform) error {
extTransform = transforms[extTransformID]
}

names := strings.Split(ext.Urn, ":")
// Python external transform needs the producer of input PCollection in expansion request.
graphx.AddFakeImpulses(p)
if len(names) > 2 && names[2] == "python" {
graphx.AddFakeImpulses(p)
}

// Scoping the ExternalTransform with respect to it's unique namespace, thus
// avoiding future collisions
Expand All @@ -80,7 +84,9 @@ func Expand(edge *graph.MultiEdge, ext *graph.ExternalTransform) error {
}

// Remove fake impulses added earlier.
graphx.RemoveFakeImpulses(res.GetComponents(), res.GetTransform())
if len(names) > 2 && names[2] == "python" {
graphx.RemoveFakeImpulses(res.GetComponents(), res.GetTransform())
}

exp := &graph.ExpandedTransform{
Components: res.GetComponents(),
Expand Down

0 comments on commit dabe9f0

Please sign in to comment.