diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index 82eb62b9e207..3ef4867643bd 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -230,8 +230,6 @@ def createPrismValidatesRunnerTask = { name, environmentType -> // Not yet implemented in Prism // https://github.com/apache/beam/issues/32211 excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration' - // https://github.com/apache/beam/issues/32929 - excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState' // Not supported in Portable Java SDK yet. // https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/data.go b/sdks/go/pkg/beam/runners/prism/internal/engine/data.go index 7b8689f95112..e7f396528922 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/data.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/data.go @@ -17,13 +17,17 @@ package engine import ( "bytes" + "cmp" "fmt" "log/slog" + "slices" + "sort" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" + "google.golang.org/protobuf/encoding/protowire" ) // StateData is a "union" between Bag state and MultiMap state to increase common code. @@ -220,3 +224,95 @@ func (d *TentativeData) ClearMultimapKeysState(stateID LinkID, wKey, uKey []byte kmap[string(uKey)] = StateData{} slog.Debug("State() MultimapKeys.Clear", slog.Any("StateID", stateID), slog.Any("UserKey", uKey), slog.Any("WindowKey", wKey)) } + +// OrderedListState from the Java SDK is encoded as KVs with varint encoded millis +// Followed by the value. This is *not* the standard TimestampValueCoder encoding, which +// uses a big-endian long as a suffix to the value. +// +// Next, is we need to parse out the individual values from the data blob anyway. +// So we probably can't cheekily re-use the BagState like it currently is, even +// after fixing the timestamp issue. +// +// Currently assuming the Value is length prefixed, which isn't always going to +// be true for ints, bools, bytes, and other "known" coders.s + +// AppendOrderedListState appends the incoming timestamped data to the existing tentative data bundle. +// Assumes the data is TimestampedValue encoded, which has a BigEndian int64 suffixed to the data. +// This means we may always use the last 8 bytes to determine the value sorting. +// +// The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively. +func (d *TentativeData) AppendOrderedListState(stateID LinkID, wKey, uKey []byte, data []byte) { + kmap := d.appendState(stateID, wKey) + var datums [][]byte + + // Lets assume two length prefixed things in the KV. + for i := 0; i < len(data); { + _, tn := protowire.ConsumeVarint(data[i:]) + + // We need to fix that value to get the correct width + // of the bytes. + n, vn := protowire.ConsumeVarint(data[i+tn:]) + prev := i + i += tn + vn + int(n) + datums = append(datums, data[prev:i]) + } + + s := StateData{Bag: append(kmap[string(uKey)].Bag, datums...)} + sort.SliceStable(s.Bag, func(i, j int) bool { + vi := s.Bag[i] + vj := s.Bag[j] + return compareTimestampSuffixes(vi, vj) + }) + kmap[string(uKey)] = s + slog.Debug("State() OrderedList.Append", slog.Any("StateID", stateID), slog.Any("UserKey", uKey), slog.Any("Window", wKey), slog.Any("NewData", s)) +} + +func compareTimestampSuffixes(vi, vj []byte) bool { + // TODO FIX TO EXTRACT VARINTS + ims, _ := protowire.ConsumeVarint(vi) + jms, _ := protowire.ConsumeVarint(vj) + return (int64(ims)) < (int64(jms)) +} + +// GetOrderedListState available state from the tentative bundle data. +// The stateID has the Transform and Local fields populated, for the Transform and UserStateID respectively. +func (d *TentativeData) GetOrderedListState(stateID LinkID, wKey, uKey []byte, start, end int64) [][]byte { + winMap := d.state[stateID] + w := d.toWindow(wKey) + data := winMap[w][string(uKey)] + + lo, hi := findRange(data.Bag, start, end) + slog.Debug("State() OrderedList.Get", slog.Any("StateID", stateID), slog.Any("UserKey", uKey), slog.Any("Window", wKey), slog.Group("range", slog.Int64("start", start), slog.Int64("end", end)), slog.Group("outrange", slog.Int("lo", lo), slog.Int("hi", hi)), slog.Any("Data", data.Bag[lo:hi])) + return data.Bag[lo:hi] +} + +func cmpSuffix(vs [][]byte, target int64) func(i int) int { + return func(i int) int { + v := vs[i] + ims, _ := protowire.ConsumeVarint(v) + tvsbi := cmp.Compare(target, int64(ims)) + slog.Debug("cmpSuffix", "target", target, "bi", ims, "tvsbi", tvsbi) + return tvsbi + } +} + +func findRange(bag [][]byte, start, end int64) (int, int) { + lo, _ := sort.Find(len(bag), cmpSuffix(bag, start)) + hi, _ := sort.Find(len(bag), cmpSuffix(bag, end)) + return lo, hi +} + +func (d *TentativeData) ClearOrderedListState(stateID LinkID, wKey, uKey []byte, start, end int64) { + winMap := d.state[stateID] + w := d.toWindow(wKey) + kMap := winMap[w] + data := kMap[string(uKey)] + + lo, hi := findRange(data.Bag, start, end) + slog.Debug("State() OrderedList.Clear", slog.Any("StateID", stateID), slog.Any("UserKey", uKey), slog.Any("Window", wKey), slog.Group("range", slog.Int64("start", start), slog.Int64("end", end)), "lo", lo, "hi", hi, slog.Any("PreClearData", data.Bag)) + + cleared := slices.Delete(data.Bag, lo, hi) + // Zero the current entry to clear. + // Delete makes it difficult to delete the persisted stage state for the key. + kMap[string(uKey)] = StateData{Bag: cleared} +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/data_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/data_test.go new file mode 100644 index 000000000000..6e3b38d86ac6 --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/data_test.go @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package engine + +import ( + "encoding/binary" + "math" + "testing" +) + +func TestCompareTimestampSuffixes(t *testing.T) { + t.Run("simple", func(t *testing.T) { + loI := int64(math.MinInt64) + hiI := int64(math.MaxInt64) + + loB := binary.BigEndian.AppendUint64(nil, uint64(loI)) + hiB := binary.BigEndian.AppendUint64(nil, uint64(hiI)) + + if compareTimestampSuffixes(loB, hiB) != (loI < hiI) { + t.Errorf("lo vs Hi%v < %v: bytes %v vs %v, %v %v", loI, hiI, loB, hiB, loI < hiI, compareTimestampSuffixes(loB, hiB)) + } + }) +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index a2840760bf7a..cce0c378cba5 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -174,7 +174,8 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ * // Validate all the state features for _, spec := range pardo.GetStateSpecs() { isStateful = true - check("StateSpec.Protocol.Urn", spec.GetProtocol().GetUrn(), urns.UserStateBag, urns.UserStateMultiMap) + check("StateSpec.Protocol.Urn", spec.GetProtocol().GetUrn(), + urns.UserStateBag, urns.UserStateMultiMap, urns.UserStateOrderedList) } // Validate all the timer features for _, spec := range pardo.GetTimerFamilySpecs() { diff --git a/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go b/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go index 5312fd799c89..12e62ef84a81 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go +++ b/sdks/go/pkg/beam/runners/prism/internal/urns/urns.go @@ -95,8 +95,9 @@ var ( SideInputMultiMap = siUrn(pipepb.StandardSideInputTypes_MULTIMAP) // UserState kinds - UserStateBag = usUrn(pipepb.StandardUserStateTypes_BAG) - UserStateMultiMap = usUrn(pipepb.StandardUserStateTypes_MULTIMAP) + UserStateBag = usUrn(pipepb.StandardUserStateTypes_BAG) + UserStateMultiMap = usUrn(pipepb.StandardUserStateTypes_MULTIMAP) + UserStateOrderedList = usUrn(pipepb.StandardUserStateTypes_ORDERED_LIST) // WindowsFns WindowFnGlobal = quickUrn(pipepb.GlobalWindowsPayload_PROPERTIES) diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index c2c988aa097f..9d9058975b26 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -554,6 +554,11 @@ func (wk *W) State(state fnpb.BeamFnState_StateServer) error { case *fnpb.StateKey_MultimapKeysUserState_: mmkey := key.GetMultimapKeysUserState() data = b.OutputData.GetMultimapKeysState(engine.LinkID{Transform: mmkey.GetTransformId(), Local: mmkey.GetUserStateId()}, mmkey.GetWindow(), mmkey.GetKey()) + case *fnpb.StateKey_OrderedListUserState_: + olkey := key.GetOrderedListUserState() + data = b.OutputData.GetOrderedListState( + engine.LinkID{Transform: olkey.GetTransformId(), Local: olkey.GetUserStateId()}, + olkey.GetWindow(), olkey.GetKey(), olkey.GetRange().GetStart(), olkey.GetRange().GetEnd()) default: panic(fmt.Sprintf("unsupported StateKey Get type: %T: %v", key.GetType(), prototext.Format(key))) } @@ -578,6 +583,11 @@ func (wk *W) State(state fnpb.BeamFnState_StateServer) error { case *fnpb.StateKey_MultimapUserState_: mmkey := key.GetMultimapUserState() b.OutputData.AppendMultimapState(engine.LinkID{Transform: mmkey.GetTransformId(), Local: mmkey.GetUserStateId()}, mmkey.GetWindow(), mmkey.GetKey(), mmkey.GetMapKey(), req.GetAppend().GetData()) + case *fnpb.StateKey_OrderedListUserState_: + olkey := key.GetOrderedListUserState() + b.OutputData.AppendOrderedListState( + engine.LinkID{Transform: olkey.GetTransformId(), Local: olkey.GetUserStateId()}, + olkey.GetWindow(), olkey.GetKey(), req.GetAppend().GetData()) default: panic(fmt.Sprintf("unsupported StateKey Append type: %T: %v", key.GetType(), prototext.Format(key))) } @@ -601,6 +611,10 @@ func (wk *W) State(state fnpb.BeamFnState_StateServer) error { case *fnpb.StateKey_MultimapKeysUserState_: mmkey := key.GetMultimapUserState() b.OutputData.ClearMultimapKeysState(engine.LinkID{Transform: mmkey.GetTransformId(), Local: mmkey.GetUserStateId()}, mmkey.GetWindow(), mmkey.GetKey()) + case *fnpb.StateKey_OrderedListUserState_: + olkey := key.GetOrderedListUserState() + b.OutputData.ClearOrderedListState(engine.LinkID{Transform: olkey.GetTransformId(), Local: olkey.GetUserStateId()}, + olkey.GetWindow(), olkey.GetKey(), olkey.GetRange().GetStart(), olkey.GetRange().GetEnd()) default: panic(fmt.Sprintf("unsupported StateKey Clear type: %T: %v", key.GetType(), prototext.Format(key))) }