Skip to content

Commit

Permalink
[prism] Add urns package (#25405)
Browse files Browse the repository at this point in the history
Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored Feb 9, 2023
1 parent 41ea9e5 commit e074985
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 0 deletions.
128 changes: 128 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/urns/urns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package urn handles extracting urns from all the protos.
package urns

import (
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
)

type protoEnum interface {
~int32
Descriptor() protoreflect.EnumDescriptor
}

// toUrn returns a function that can get the urn string from the proto.
func toUrn[Enum protoEnum]() func(Enum) string {
evd := (Enum)(0).Descriptor().Values()
return func(v Enum) string {
return proto.GetExtension(evd.ByNumber(protoreflect.EnumNumber(v)).Options(), pipepb.E_BeamUrn).(string)
}
}

// quickUrn handles one off urns instead of retaining a helper function.
// Notably useful for the windowFns due to their older design.
func quickUrn[Enum protoEnum](v Enum) string {
return toUrn[Enum]()(v)
}

var (
ptUrn = toUrn[pipepb.StandardPTransforms_Primitives]()
ctUrn = toUrn[pipepb.StandardPTransforms_Composites]()
cmbtUrn = toUrn[pipepb.StandardPTransforms_CombineComponents]()
sdfUrn = toUrn[pipepb.StandardPTransforms_SplittableParDoComponents]()
siUrn = toUrn[pipepb.StandardSideInputTypes_Enum]()
cdrUrn = toUrn[pipepb.StandardCoders_Enum]()
reqUrn = toUrn[pipepb.StandardRequirements_Enum]()
envUrn = toUrn[pipepb.StandardEnvironments_Environments]()
)

var (
// SDK transforms.
TransformParDo = ptUrn(pipepb.StandardPTransforms_PAR_DO)
TransformCombinePerKey = ctUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY)
TransformPreCombine = cmbtUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY_PRECOMBINE)
TransformMerge = cmbtUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY_MERGE_ACCUMULATORS)
TransformExtract = cmbtUrn(pipepb.StandardPTransforms_COMBINE_PER_KEY_EXTRACT_OUTPUTS)
TransformPairWithRestriction = sdfUrn(pipepb.StandardPTransforms_PAIR_WITH_RESTRICTION)
TransformSplitAndSize = sdfUrn(pipepb.StandardPTransforms_SPLIT_AND_SIZE_RESTRICTIONS)
TransformProcessSizedElements = sdfUrn(pipepb.StandardPTransforms_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS)
TransformTruncate = sdfUrn(pipepb.StandardPTransforms_TRUNCATE_SIZED_RESTRICTION)

// Window Manipulation
TransformAssignWindows = ptUrn(pipepb.StandardPTransforms_ASSIGN_WINDOWS)
TransformMapWindows = ptUrn(pipepb.StandardPTransforms_MAP_WINDOWS)
TransformMergeWindows = ptUrn(pipepb.StandardPTransforms_MERGE_WINDOWS)

// Undocumented Urns
GoDoFn = "beam:go:transform:dofn:v1" // Only used for Go DoFn.
TransformSource = "beam:runner:source:v1" // The data source reading transform.
TransformSink = "beam:runner:sink:v1" // The data sink writing transform.

// Runner transforms.
TransformImpulse = ptUrn(pipepb.StandardPTransforms_IMPULSE)
TransformGBK = ptUrn(pipepb.StandardPTransforms_GROUP_BY_KEY)
TransformFlatten = ptUrn(pipepb.StandardPTransforms_FLATTEN)

// Side Input access patterns
SideInputIterable = siUrn(pipepb.StandardSideInputTypes_ITERABLE)
SideInputMultiMap = siUrn(pipepb.StandardSideInputTypes_MULTIMAP)

// WindowsFns
WindowFnGlobal = quickUrn(pipepb.GlobalWindowsPayload_PROPERTIES)
WindowFnFixed = quickUrn(pipepb.FixedWindowsPayload_PROPERTIES)
WindowFnSliding = quickUrn(pipepb.SlidingWindowsPayload_PROPERTIES)
WindowFnSession = quickUrn(pipepb.SessionWindowsPayload_PROPERTIES)

// Coders
CoderBytes = cdrUrn(pipepb.StandardCoders_BYTES)
CoderBool = cdrUrn(pipepb.StandardCoders_BOOL)
CoderDouble = cdrUrn(pipepb.StandardCoders_DOUBLE)
CoderStringUTF8 = cdrUrn(pipepb.StandardCoders_STRING_UTF8)
CoderRow = cdrUrn(pipepb.StandardCoders_ROW)
CoderVarInt = cdrUrn(pipepb.StandardCoders_VARINT)

CoderGlobalWindow = cdrUrn(pipepb.StandardCoders_GLOBAL_WINDOW)
CoderIntervalWindow = cdrUrn(pipepb.StandardCoders_INTERVAL_WINDOW)
CoderCustomWindow = cdrUrn(pipepb.StandardCoders_CUSTOM_WINDOW)

CoderParamWindowedValue = cdrUrn(pipepb.StandardCoders_PARAM_WINDOWED_VALUE)
CoderWindowedValue = cdrUrn(pipepb.StandardCoders_WINDOWED_VALUE)
CoderTimer = cdrUrn(pipepb.StandardCoders_TIMER)

CoderKV = cdrUrn(pipepb.StandardCoders_KV)
CoderLengthPrefix = cdrUrn(pipepb.StandardCoders_LENGTH_PREFIX)
CoderNullable = cdrUrn(pipepb.StandardCoders_NULLABLE)
CoderIterable = cdrUrn(pipepb.StandardCoders_ITERABLE)
CoderStateBackedIterable = cdrUrn(pipepb.StandardCoders_STATE_BACKED_ITERABLE)
CoderShardedKey = cdrUrn(pipepb.StandardCoders_SHARDED_KEY)

// Requirements
RequirementSplittableDoFn = reqUrn(pipepb.StandardRequirements_REQUIRES_SPLITTABLE_DOFN)
RequirementBundleFinalization = reqUrn(pipepb.StandardRequirements_REQUIRES_BUNDLE_FINALIZATION)
RequirementOnWindowExpiration = reqUrn(pipepb.StandardRequirements_REQUIRES_ON_WINDOW_EXPIRATION)
RequirementStableInput = reqUrn(pipepb.StandardRequirements_REQUIRES_STABLE_INPUT)
RequirementStatefulProcessing = reqUrn(pipepb.StandardRequirements_REQUIRES_STATEFUL_PROCESSING)
RequirementTimeSortedInput = reqUrn(pipepb.StandardRequirements_REQUIRES_TIME_SORTED_INPUT)

// Environment types
EnvDocker = envUrn(pipepb.StandardEnvironments_DOCKER)
EnvProcess = envUrn(pipepb.StandardEnvironments_PROCESS)
EnvExternal = envUrn(pipepb.StandardEnvironments_EXTERNAL)
EnvDefault = envUrn(pipepb.StandardEnvironments_DEFAULT)
)
36 changes: 36 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/urns/urns_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package urn handles extracting urns from all the protos.
package urns

import (
"testing"

pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
)

// Test_toUrn validates that generic urn extraction mechnanism works, which is used for
// all the urns present.
func Test_toUrn(t *testing.T) {
want := "beam:transform:pardo:v1"
if got := TransformParDo; got != want {
t.Errorf("TransformParDo = %v, want %v", got, want)
}
// Validate that quickUrn gets the same thing
if got := quickUrn(pipepb.StandardPTransforms_PAR_DO); got != want {
t.Errorf("quickUrn(\"pipepb.StandardPTransforms_PAR_DO\") = %v, want %v", got, want)
}
}

0 comments on commit e074985

Please sign in to comment.