Skip to content

Commit

Permalink
First successful run!
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Apr 26, 2024
1 parent 73d1fb7 commit c14e517
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 62 deletions.
94 changes: 59 additions & 35 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string)
for stageID := range advanced {
ss := em.stages[stageID]
watermark, ready := ss.bundleReady(em)
ptimeEventsReady := ss.processTimeEvents.Peek() <= emNow
ptimeEventsReady := ss.processTimeEvents.Peek() <= emNow || emNow == mtime.MaxTimestamp
if ready {
if ptimeEventsReady {
fmt.Println("XXXX Both ProcessingTime ready & Regularevents ready. stage", stageID) // Panic ?
Expand All @@ -367,17 +367,17 @@ func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string)
case runStageCh <- rb:
}
em.refreshCond.L.Lock()
} else if ss.processTimeEvents.Peek() <= emNow {
} else if ptimeEventsReady {
// TODO what if both regular events and processing time are ready?
fmt.Println("XXXX ProcessingTime ready stage", stageID, "watermark", watermark, ss.pending, "aggregate", ss.aggregate)
// Queue up elements!
// TODO impose processing strategy limits.
elems := ss.AdvanceProcessingTimeTo(emNow)

var toProcess []element
// TODO Sort out real holds (improve hold logic?)
minTs := mtime.MaxTimestamp
newKeys := set[string]{}

holdsInBundle := map[mtime.Time]int{}
for _, es := range elems {
for _, e := range es {
Expand All @@ -390,41 +390,46 @@ func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string)
}
}

bundID := nextBundID()
// Only do this if we have anything to process.
if len(toProcess) > 0 {
bundID := nextBundID()

es := elements{
es: toProcess,
minTimestamp: minTs,
}
if ss.inprogress == nil {
ss.inprogress = make(map[string]elements)
}
if ss.inprogressKeysByBundle == nil {
ss.inprogressKeysByBundle = make(map[string]set[string])
}
if ss.inprogressHoldsByBundle == nil {
ss.inprogressHoldsByBundle = make(map[string]map[mtime.Time]int)
}
ss.inprogress[bundID] = es
ss.inprogressKeysByBundle[bundID] = newKeys
ss.inprogressKeys.merge(newKeys)
ss.inprogressHoldsByBundle[bundID] = holdsInBundle
es := elements{
es: toProcess,
minTimestamp: minTs,
}
if ss.inprogress == nil {
ss.inprogress = make(map[string]elements)
}
if ss.inprogressKeysByBundle == nil {
ss.inprogressKeysByBundle = make(map[string]set[string])
}
if ss.inprogressHoldsByBundle == nil {
ss.inprogressHoldsByBundle = make(map[string]map[mtime.Time]int)
}
ss.inprogress[bundID] = es
ss.inprogressKeysByBundle[bundID] = newKeys
ss.inprogressKeys.merge(newKeys)
ss.inprogressHoldsByBundle[bundID] = holdsInBundle

fmt.Println("XXXXX QQQQQ ProcessingTimeBundle Started stage", stageID, "bundID", bundID, "watermark", watermark, ss.pending, "aggregate", ss.aggregate, "numElms", len(es.es))
fmt.Println("XXXXX QQQQQ ProcessingTimeBundle Started stage", stageID, "bundID", bundID, "watermark", watermark, ss.pending, "aggregate", ss.aggregate, "numElms", len(es.es), "holds in bundle", holdsInBundle, "stage", ss.watermarkHolds.counts, "pendingTime", ss.processTimeEvents.events)

rb := RunBundle{StageID: stageID, BundleID: bundID, Watermark: watermark}
rb := RunBundle{StageID: stageID, BundleID: bundID, Watermark: watermark}

em.inprogressBundles.insert(rb.BundleID)
em.refreshCond.L.Unlock()
em.inprogressBundles.insert(rb.BundleID)
em.refreshCond.L.Unlock()

select {
case <-ctx.Done():
return
case runStageCh <- rb:
select {
case <-ctx.Done():
return
case runStageCh <- rb:
}
em.refreshCond.L.Lock()
} else {
fmt.Println("XXXXXXX no processing time events")
}
em.refreshCond.L.Lock()
} else {
fmt.Println("XXXX BundleNotReady stage", stageID, "watermark", watermark, ss.pending, "aggregate", ss.aggregate)
fmt.Println("XXXX BundleNotReady stage", stageID, "watermark", watermark, ss.pending, "aggregate", ss.aggregate, ss.processTimeEvents.Peek(), emNow, ptimeEventsReady, ss.processTimeEvents.Peek()-emNow)
}
}
em.checkForQuiescence(advanced)
Expand Down Expand Up @@ -464,7 +469,7 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) {
// There are no further incoming watermark changes, see if there are test stream events for this job.
nextEvent := em.testStreamHandler.NextEvent()
if nextEvent != nil {
fmt.Println("XXXX TestStreamEvent!", nextEvent)
fmt.Printf("XXXX TestStreamEvent! %T %v\n", nextEvent, nextEvent)
nextEvent.Execute(em)
// Decrement pending for the event being processed.
em.addPending(-1)
Expand All @@ -475,6 +480,7 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) {
return
}
// If there are no refreshes, then there's no mechanism to make progress, so it's time to fast fail.
fmt.Println("XXXX TesStream event was a no-op!")
}

v := em.livePending.Load()
Expand All @@ -495,7 +501,7 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) {
outW := ss.OutputWatermark()
upPCol, upW := ss.UpstreamWatermark()
upS := em.pcolParents[upPCol]
stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", ss.watermarkHolds.counts))
stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", ss.watermarkHolds.counts, "ptEvents", ss.processTimeEvents.events))
}
panic(fmt.Sprintf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, "")))
}
Expand Down Expand Up @@ -767,6 +773,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
em.triageTimers(d, inputInfo, stage)

// Return unprocessed to this stage's pending
// TODO sort out pending element watermark holds for process continuation residuals.
unprocessedElements := reElementResiduals(residuals.Data, inputInfo, rb)

// Add unprocessed back to the pending stack.
Expand Down Expand Up @@ -839,6 +846,7 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag

var pendingEventTimers []element
var pendingProcessingTimers map[mtime.Time][]element
holds := map[mtime.Time]int{}
for tentativeKey, timers := range d.timers {
keyToTimers := map[timerKey]element{}
for _, t := range timers {
Expand Down Expand Up @@ -870,12 +878,22 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag
prev := pendingProcessingTimers[newTimerFire]
prev = append(prev, elm)
pendingProcessingTimers[newTimerFire] = prev
holds[elm.holdTimestamp] += holds[elm.holdTimestamp] + 1
} else {
pendingEventTimers = append(pendingEventTimers, elm)
}
}
}

// Add holds for pending processing time timers.
if len(holds) > 0 {
stage.mu.Lock()
for h, v := range holds {
stage.watermarkHolds.Add(h, v)
}
stage.mu.Unlock()
}

if len(pendingEventTimers) > 0 {
count := stage.AddPending(pendingEventTimers)
em.addPending(count)
Expand Down Expand Up @@ -1139,6 +1157,8 @@ func (ss *stageState) AddProcessingTimePending(newPending map[mtime.Time][]eleme
defer ss.mu.Unlock()
var count int

// TODO add pending watermark for input data for watermark propagation, for non-timers.

// TODO sort out processing time watermark holds here.
// TODO sort out per key processing time uniqueness here (only "one" can be set for a given key+fam+window, just like event times.)
// - Only the last set one can work.
Expand Down Expand Up @@ -1521,17 +1541,21 @@ func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time) {
// defer func() {
// fmt.Println("XXXX ProcessingTimeNow -> ", ret, em.processTimeEvents.order)
// }()
if em.testStreamHandler != nil {
if em.testStreamHandler != nil && !em.testStreamHandler.completed {
fmt.Println("XXXX ProcessingTimeNow via-testStream ", em.testStreamHandler.Now())
return em.testStreamHandler.Now()
}
// "Test" mode -> advance to next processing time event if any, to allow execution.
// if test mode...
if t, ok := em.processTimeEvents.Peek(); ok {
fmt.Println("XXXX ProcessingTimeNow via-event ", t)
return t
}

// "Production" mode, always real time now.
return mtime.Now()
now := mtime.Now()
fmt.Println("XXXX ProcessingTimeNow via-now ", now)
return now
}

// rebaseProcessingTime turns an absolute processing time to be relative to the provided local clock now.
Expand Down
13 changes: 9 additions & 4 deletions sdks/go/pkg/beam/runners/prism/internal/engine/processingtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package engine

import (
"container/heap"
"fmt"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
)
Expand Down Expand Up @@ -85,7 +84,6 @@ func newStageRefreshQueue() *stageRefreshQueue {

// Schedule a stage event at the given time.
func (q *stageRefreshQueue) Schedule(t mtime.Time, stageID string) {
fmt.Println("XXXX stageRefreshQueue scheduled! ", t, stageID)
if s, ok := q.events[t]; ok {
// We already have a trigger at this time, mutate that instead.
if s.present(stageID) {
Expand Down Expand Up @@ -114,8 +112,6 @@ func (q *stageRefreshQueue) AdvanceTo(now mtime.Time) set[string] {
notify := set[string]{}
for {
if len(q.order) == 0 || q.order[0] > now {
t, ok := q.Peek()
fmt.Println("XXXX stageRefreshQueue advanced to ", now, "refreshed", notify, "next time?", ok, t, t-now)
return notify
}
// pop elements off the queue until the next time is later than now.
Expand Down Expand Up @@ -167,3 +163,12 @@ func (q *processingTimeElementQueue) AdvanceTo(now mtime.Time) [][]element {
delete(q.events, next)
}
}

// Peek returns the next scheduled ProcessingTime event in the queue.
// Returns [mtime.MaxTimestamp] if the queue is empty.
func (q *processingTimeElementQueue) Peek() mtime.Time {
if len(q.order) == 0 {
return mtime.MaxTimestamp
}
return q.order[0]
}
16 changes: 14 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,18 @@ type tsProcessingTimeEvent struct {

// Execute this ProcessingTime event by advancing the synthetic processing time.
func (ev tsProcessingTimeEvent) Execute(em *ElementManager) {
em.testStreamHandler.processingTime = em.testStreamHandler.processingTime.Add(ev.AdvanceBy)
if ev.AdvanceBy == time.Duration(mtime.MaxTimestamp) {
em.testStreamHandler.processingTime = mtime.MaxTimestamp.ToTime()
} else {
em.testStreamHandler.processingTime = em.testStreamHandler.processingTime.Add(ev.AdvanceBy)
}

// // Add the refreshes now so our block prevention logic works.
emNow := em.ProcessingTimeNow()
em.watermarkRefreshes.merge(em.processTimeEvents.AdvanceTo(emNow))
toRefresh := em.processTimeEvents.AdvanceTo(emNow)
em.watermarkRefreshes.merge(toRefresh)

fmt.Println("XXXXX processing time advance by", ev.AdvanceBy, "event refreshes", emNow, toRefresh)
}

// tsFinalEvent is the "last" event we perform after all preceeding events.
Expand All @@ -251,6 +258,11 @@ func (ev tsFinalEvent) Execute(em *ElementManager) {
em.testStreamHandler.UpdateHold(em, mtime.MaxTimestamp)
ss := em.stages[ev.stageID]
kickSet := ss.updateWatermarks(em)
if ss.OutputWatermark() == mtime.MaxTimestamp {
for k := range em.stages {
kickSet.insert(k)
}
}
kickSet.insert(ev.stageID)
em.watermarkRefreshes.merge(kickSet)
}
Expand Down
7 changes: 6 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,12 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
case *pipepb.TestStreamPayload_Event_WatermarkEvent:
tsb.AddWatermarkEvent(ev.WatermarkEvent.GetTag(), mtime.Time(ev.WatermarkEvent.GetNewWatermark()))
case *pipepb.TestStreamPayload_Event_ProcessingTimeEvent:
tsb.AddProcessingTimeEvent(time.Duration(ev.ProcessingTimeEvent.GetAdvanceDuration()) * time.Millisecond)
if ev.ProcessingTimeEvent.GetAdvanceDuration() == int64(mtime.MaxTimestamp) {
// TODO: Determine the SDK common formalism for setting processing time to infinity.
tsb.AddProcessingTimeEvent(time.Duration(mtime.MaxTimestamp))
} else {
tsb.AddProcessingTimeEvent(time.Duration(ev.ProcessingTimeEvent.GetAdvanceDuration()) * time.Millisecond)
}
default:
return fmt.Errorf("prism error building stage %v - unknown TestStream event type: %T", stage.ID, ev)
}
Expand Down
28 changes: 8 additions & 20 deletions sdks/go/test/integration/primitives/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package primitives

import (
"context"
"fmt"
"strconv"
"time"

Expand Down Expand Up @@ -54,7 +53,6 @@ type inputFn[K, V any] struct {
}

func (fn *inputFn[K, V]) ProcessElement(imp []byte, emit func(K, V)) {
fmt.Println("XXXX inputFn.ProcessElement kv:", imp)
for _, in := range fn.Inputs {
emit(in.Key, in.Value)
}
Expand Down Expand Up @@ -178,16 +176,15 @@ type processingTimeFn struct {

Offset int
TimerOutput int
Cap int
}

func (fn *processingTimeFn) ProcessElement(sp state.Provider, tp timers.Provider, key string, value int, emit func(string, int)) {
fmt.Println("XXXX processingTimeFn.ProcessElement kv:", key, value)
fn.Callback.Set(tp, time.Now().Add(10*time.Second))
fn.Callback.Set(tp, time.Now().Add(9*time.Second))
fn.MyValue.Write(sp, 0)
}

func (fn *processingTimeFn) OnTimer(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key string, timer timers.Context, emit func(string, int)) {
fmt.Println("XXXX processingTimeFn.OnTimer k:", key, ts, timer)
switch timer.Family {
case fn.Callback.Family:
switch timer.Tag {
Expand All @@ -200,9 +197,9 @@ func (fn *processingTimeFn) OnTimer(ctx context.Context, ts beam.EventTime, sp s
panic("State must be set for key: " + key)
}
emit(key, read)
if read < 3 {
if read < fn.Cap-1 {
fn.MyValue.Write(sp, read+1)
fn.Callback.Set(tp, time.Now().Add(10*time.Second))
fn.Callback.Set(tp, time.Now().Add(9*time.Second))
}
default:
panic("unexpected timer tag: " + timer.Family + " tag:" + timer.Tag + " want: \"\", for key:" + key)
Expand All @@ -215,20 +212,14 @@ func (fn *processingTimeFn) OnTimer(ctx context.Context, ts beam.EventTime, sp s
}

func regroup(key string, vs func(*int) bool, emit func(kv[string, int])) {
fmt.Println("XXXXX GROUPING OCCURED for key ", key)
var v int
for vs(&v) {
emit(kvfn(key, v))
}
}

func dumbdumb(key string, v int) {
fmt.Println("XXXXX DUMBDUMB OCCURED for key ", key, v)
}

func init() {
register.Function3x0(regroup)
register.Function2x0(dumbdumb)
}

func timersProcessingTimePipelineBuilder(makeImp func(s beam.Scope) beam.PCollection) func(s beam.Scope) {
Expand All @@ -251,8 +242,6 @@ func timersProcessingTimePipelineBuilder(makeImp func(s beam.Scope) beam.PCollec

imp := makeImp(s)

fmt.Println("XXXX inputs and outputs", inputs, wantOutputs)

keyed := beam.ParDo(s, &inputFn[string, int]{
Inputs: inputs,
}, imp)
Expand All @@ -261,12 +250,12 @@ func timersProcessingTimePipelineBuilder(makeImp func(s beam.Scope) beam.PCollec
TimerOutput: timerOutput,
Callback: timers.InProcessingTime("Callback"),
MyValue: state.MakeValueState[int]("MyKey"),
Cap: numDuplicateTimers, // Syncs the cycles to the number of duplicate keyed inputs.
}, keyed)
beam.ParDo0(s, dumbdumb, times)
// We GroupByKey here so input to passert is blocked until teststream advances time to Infinity.
// gbk := beam.GroupByKey(s, times)
// beam.ParDo(s, regroup, gbk)
//passert.EqualsList(s, regrouped, wantOutputs)
gbk := beam.GroupByKey(s, times)
regrouped := beam.ParDo(s, regroup, gbk)
passert.EqualsList(s, regrouped, wantOutputs)
}
}

Expand All @@ -279,7 +268,6 @@ func TimersProcessingTimeTestStream_Infinity(s beam.Scope) {
c.AdvanceProcessingTime(int64(mtime.FromDuration(10 * time.Second)))
c.AdvanceProcessingTime(int64(mtime.FromDuration(10 * time.Second)))
c.AdvanceProcessingTime(int64(mtime.FromDuration(10 * time.Second)))
// We should get one emission per advancement.
c.AdvanceProcessingTimeToInfinity()
return teststream.Create(s, c)
})(s)
Expand Down

0 comments on commit c14e517

Please sign in to comment.