Skip to content

Commit

Permalink
Garbage collect side inputs. (apache#29423)
Browse files Browse the repository at this point in the history
Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored Nov 15, 2023
1 parent 427faae commit 1f4bc68
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 176 deletions.
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/exec/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (b *builder) makeCoderForPCollection(id string) (*coder.Coder, *coder.Windo
}
wc, err := b.coders.WindowCoder(ws.GetWindowCoderId())
if err != nil {
return nil, nil, err
return nil, nil, errors.Errorf("could not unmarshal window coder for pcollection %v: %w", id, err)
}
return c, wc, nil
}
Expand Down
14 changes: 7 additions & 7 deletions sdks/go/pkg/beam/core/runtime/graphx/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (b *CoderUnmarshaller) WindowCoder(id string) (*coder.WindowCoder, error) {

c, err := b.peek(id)
if err != nil {
return nil, err
return nil, errors.Errorf("could not unmarshal window coder: %w", err)
}

w, err := urnToWindowCoder(c.GetSpec().GetUrn())
Expand Down Expand Up @@ -218,7 +218,7 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder,
id := components[1]
elm, err := b.peek(id)
if err != nil {
return nil, err
return nil, errors.Errorf("could not unmarshal kv coder value component: %w", err)
}

switch elm.GetSpec().GetUrn() {
Expand Down Expand Up @@ -261,7 +261,7 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder,

sub, err := b.peek(components[0])
if err != nil {
return nil, err
return nil, errors.Errorf("could not unmarshal length prefix coder component: %w", err)
}

// No payload means this coder was length prefixed by the runner
Expand Down Expand Up @@ -307,7 +307,7 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder,
}
w, err := b.WindowCoder(components[1])
if err != nil {
return nil, err
return nil, errors.Errorf("could not unmarshal window coder: %w", err)
}
t := typex.New(typex.WindowedValueType, elm.T)
wvc := &coder.Coder{Kind: coder.WindowedValue, T: t, Components: []*coder.Coder{elm}, Window: w}
Expand Down Expand Up @@ -356,7 +356,7 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder,
}
w, err := b.WindowCoder(components[1])
if err != nil {
return nil, err
return nil, errors.Errorf("could not unmarshal window coder for timer: %w", err)
}
return coder.NewT(elm, w), nil
case urnRowCoder:
Expand Down Expand Up @@ -389,7 +389,7 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder,
case urnGlobalWindow:
w, err := b.WindowCoder(id)
if err != nil {
return nil, err
return nil, errors.Errorf("could not unmarshal global window coder: %w", err)
}
return &coder.Coder{Kind: coder.Window, T: typex.New(reflect.TypeOf((*struct{})(nil)).Elem()), Window: w}, nil
default:
Expand All @@ -400,7 +400,7 @@ func (b *CoderUnmarshaller) makeCoder(id string, c *pipepb.Coder) (*coder.Coder,
func (b *CoderUnmarshaller) peek(id string) (*pipepb.Coder, error) {
c, ok := b.models[id]
if !ok {
return nil, errors.Errorf("coder with id %v not found", id)
return nil, errors.Errorf("(peek) coder with id %v not found", id)
}
return c, nil
}
Expand Down
82 changes: 71 additions & 11 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type ElementManager struct {
stages map[string]*stageState // The state for each stage.

consumers map[string][]string // Map from pcollectionID to stageIDs that consumes them as primary input.
sideConsumers map[string][]string // Map from pcollectionID to stageIDs that consumes them as side input.
sideConsumers map[string][]LinkID // Map from pcollectionID to the stage+transform+input that consumes them as side input.

pcolParents map[string]string // Map from pcollectionID to stageIDs that produce the pcollection.

Expand All @@ -131,12 +131,17 @@ type ElementManager struct {
pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully.
}

// LinkID represents a fully qualified input or output.
type LinkID struct {
Transform, Local, Global string
}

func NewElementManager(config Config) *ElementManager {
return &ElementManager{
config: config,
stages: map[string]*stageState{},
consumers: map[string][]string{},
sideConsumers: map[string][]string{},
sideConsumers: map[string][]LinkID{},
pcolParents: map[string]string{},
watermarkRefreshes: set[string]{},
inprogressBundles: set[string]{},
Expand All @@ -146,9 +151,9 @@ func NewElementManager(config Config) *ElementManager {

// AddStage adds a stage to this element manager, connecting it's PCollections and
// nodes to the watermark propagation graph.
func (em *ElementManager) AddStage(ID string, inputIDs, sides, outputIDs []string) {
func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, sides []LinkID) {
slog.Debug("AddStage", slog.String("ID", ID), slog.Any("inputs", inputIDs), slog.Any("sides", sides), slog.Any("outputs", outputIDs))
ss := makeStageState(ID, inputIDs, sides, outputIDs)
ss := makeStageState(ID, inputIDs, outputIDs, sides)

em.stages[ss.ID] = ss
for _, outputIDs := range ss.outputIDs {
Expand All @@ -158,7 +163,9 @@ func (em *ElementManager) AddStage(ID string, inputIDs, sides, outputIDs []strin
em.consumers[input] = append(em.consumers[input], ss.ID)
}
for _, side := range ss.sides {
em.sideConsumers[side] = append(em.sideConsumers[side], ss.ID)
// Note that we use the StageID as the global ID in the value since we need
// to be able to look up the consuming stage, from the global PCollectionID.
em.sideConsumers[side.Global] = append(em.sideConsumers[side.Global], LinkID{Global: ss.ID, Local: side.Local, Transform: side.Transform})
}
}

Expand Down Expand Up @@ -363,6 +370,11 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
consumer := em.stages[sID]
consumer.AddPending(newPending)
}
sideConsumers := em.sideConsumers[output]
for _, link := range sideConsumers {
consumer := em.stages[link.Global]
consumer.AddPendingSide(newPending, link.Transform, link.Local)
}
}

// Return unprocessed to this stage's pending
Expand Down Expand Up @@ -489,7 +501,7 @@ type stageState struct {
ID string
inputID string // PCollection ID of the parallel input
outputIDs []string // PCollection IDs of outputs to update consumers.
sides []string // PCollection IDs of side inputs that can block execution.
sides []LinkID // PCollection IDs of side inputs that can block execution.

// Special handling bits
aggregate bool // whether this state needs to block for aggregation.
Expand All @@ -501,12 +513,13 @@ type stageState struct {
output mtime.Time // Output watermark for the whole stage
estimatedOutput mtime.Time // Estimated watermark output from DoFns

pending elementHeap // pending input elements for this stage that are to be processesd
inprogress map[string]elements // inprogress elements by active bundles, keyed by bundle
pending elementHeap // pending input elements for this stage that are to be processesd
inprogress map[string]elements // inprogress elements by active bundles, keyed by bundle
sideInputs map[LinkID]map[typex.Window][][]byte // side input data for this stage, from {tid, inputID} -> window
}

// makeStageState produces an initialized stageState.
func makeStageState(ID string, inputIDs, sides, outputIDs []string) *stageState {
func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *stageState {
ss := &stageState{
ID: ID,
outputIDs: outputIDs,
Expand Down Expand Up @@ -536,6 +549,42 @@ func (ss *stageState) AddPending(newPending []element) {
heap.Init(&ss.pending)
}

// AddPendingSide adds elements to be consumed as side inputs.
func (ss *stageState) AddPendingSide(newPending []element, tID, inputID string) {
ss.mu.Lock()
defer ss.mu.Unlock()
if ss.sideInputs == nil {
ss.sideInputs = map[LinkID]map[typex.Window][][]byte{}
}
key := LinkID{Transform: tID, Local: inputID}
in, ok := ss.sideInputs[key]
if !ok {
in = map[typex.Window][][]byte{}
ss.sideInputs[key] = in
}
for _, e := range newPending {
in[e.window] = append(in[e.window], e.elmBytes)
}
}

func (ss *stageState) GetSideData(tID, inputID string, watermark mtime.Time) map[typex.Window][][]byte {
ss.mu.Lock()
defer ss.mu.Unlock()

d := ss.sideInputs[LinkID{Transform: tID, Local: inputID}]
ret := map[typex.Window][][]byte{}
for win, ds := range d {
if win.MaxTimestamp() <= watermark {
ret[win] = ds
}
}
return ret
}

func (em *ElementManager) GetSideData(sID, tID, inputID string, watermark mtime.Time) map[typex.Window][][]byte {
return em.stages[sID].GetSideData(tID, inputID, watermark)
}

// updateUpstreamWatermark is for the parent of the input pcollection
// to call, to update downstream stages with it's current watermark.
// This avoids downstream stages inverting lock orderings from
Expand Down Expand Up @@ -699,7 +748,18 @@ func (ss *stageState) updateWatermarks(minPending, minStateHold mtime.Time, em *
}
// Inform side input consumers, but don't update the upstream watermark.
for _, sID := range em.sideConsumers[outputCol] {
refreshes.insert(sID)
refreshes.insert(sID.Global)
}
}
// Garbage collect state, timers and side inputs, for all windows
// that are before the new output watermark.
// They'll never be read in again.
for _, wins := range ss.sideInputs {
for win := range wins {
// Clear out anything we've already used.
if win.MaxTimestamp() < newOut {
delete(wins, win)
}
}
}
}
Expand All @@ -725,7 +785,7 @@ func (ss *stageState) bundleReady(em *ElementManager) (mtime.Time, bool) {
}
ready := true
for _, side := range ss.sides {
pID, ok := em.pcolParents[side]
pID, ok := em.pcolParents[side.Global]
if !ok {
panic(fmt.Sprintf("stage[%v] no parent ID for side input %v", ss.ID, side))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestElementHeap(t *testing.T) {
func TestStageState_minPendingTimestamp(t *testing.T) {

newState := func() *stageState {
return makeStageState("test", []string{"testInput"}, nil, []string{"testOutput"})
return makeStageState("test", []string{"testInput"}, []string{"testOutput"}, nil)
}
t.Run("noElements", func(t *testing.T) {
ss := newState()
Expand Down Expand Up @@ -188,21 +188,21 @@ func TestStageState_minPendingTimestamp(t *testing.T) {
}

func TestStageState_UpstreamWatermark(t *testing.T) {
impulse := makeStageState("impulse", nil, nil, []string{"output"})
impulse := makeStageState("impulse", nil, []string{"output"}, nil)
_, up := impulse.UpstreamWatermark()
if got, want := up, mtime.MaxTimestamp; got != want {
t.Errorf("impulse.UpstreamWatermark() = %v, want %v", got, want)
}

dofn := makeStageState("dofn", []string{"input"}, nil, []string{"output"})
dofn := makeStageState("dofn", []string{"input"}, []string{"output"}, nil)
dofn.updateUpstreamWatermark("input", 42)

_, up = dofn.UpstreamWatermark()
if got, want := up, mtime.Time(42); got != want {
t.Errorf("dofn.UpstreamWatermark() = %v, want %v", got, want)
}

flatten := makeStageState("flatten", []string{"a", "b", "c"}, nil, []string{"output"})
flatten := makeStageState("flatten", []string{"a", "b", "c"}, []string{"output"}, nil)
flatten.updateUpstreamWatermark("a", 50)
flatten.updateUpstreamWatermark("b", 42)
flatten.updateUpstreamWatermark("c", 101)
Expand All @@ -216,7 +216,7 @@ func TestStageState_updateWatermarks(t *testing.T) {
inputCol := "testInput"
outputCol := "testOutput"
newState := func() (*stageState, *stageState, *ElementManager) {
underTest := makeStageState("underTest", []string{inputCol}, nil, []string{outputCol})
underTest := makeStageState("underTest", []string{inputCol}, []string{outputCol}, nil)
outStage := makeStageState("outStage", []string{outputCol}, nil, nil)
em := &ElementManager{
consumers: map[string][]string{
Expand Down Expand Up @@ -315,7 +315,7 @@ func TestStageState_updateWatermarks(t *testing.T) {
func TestElementManager(t *testing.T) {
t.Run("impulse", func(t *testing.T) {
em := NewElementManager(Config{})
em.AddStage("impulse", nil, nil, []string{"output"})
em.AddStage("impulse", nil, []string{"output"}, nil)
em.AddStage("dofn", []string{"output"}, nil, nil)

em.Impulse("impulse")
Expand Down Expand Up @@ -370,8 +370,8 @@ func TestElementManager(t *testing.T) {

t.Run("dofn", func(t *testing.T) {
em := NewElementManager(Config{})
em.AddStage("impulse", nil, nil, []string{"input"})
em.AddStage("dofn1", []string{"input"}, nil, []string{"output"})
em.AddStage("impulse", nil, []string{"input"}, nil)
em.AddStage("dofn1", []string{"input"}, []string{"output"}, nil)
em.AddStage("dofn2", []string{"output"}, nil, nil)
em.Impulse("impulse")

Expand Down Expand Up @@ -421,9 +421,9 @@ func TestElementManager(t *testing.T) {

t.Run("side", func(t *testing.T) {
em := NewElementManager(Config{})
em.AddStage("impulse", nil, nil, []string{"input"})
em.AddStage("dofn1", []string{"input"}, nil, []string{"output"})
em.AddStage("dofn2", []string{"input"}, []string{"output"}, nil)
em.AddStage("impulse", nil, []string{"input"}, nil)
em.AddStage("dofn1", []string{"input"}, []string{"output"}, nil)
em.AddStage("dofn2", []string{"input"}, nil, []LinkID{{Transform: "dofn2", Global: "output", Local: "local"}})
em.Impulse("impulse")

var i int
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestElementManager(t *testing.T) {
})
t.Run("residual", func(t *testing.T) {
em := NewElementManager(Config{})
em.AddStage("impulse", nil, nil, []string{"input"})
em.AddStage("impulse", nil, []string{"input"}, nil)
em.AddStage("dofn", []string{"input"}, nil, nil)
em.Impulse("impulse")

Expand Down
16 changes: 6 additions & 10 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,6 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
stages := map[string]*stage{}
var impulses []string

// Inialize the "dataservice cache" to support side inputs.
// TODO(https://github.com/apache/beam/issues/28543), remove this concept.
ds := &worker.DataService{}

for i, stage := range topo {
tid := stage.transforms[0]
t := ts[tid]
Expand Down Expand Up @@ -206,7 +202,7 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic

switch urn {
case urns.TransformGBK:
em.AddStage(stage.ID, []string{getOnlyValue(t.GetInputs())}, nil, []string{getOnlyValue(t.GetOutputs())})
em.AddStage(stage.ID, []string{getOnlyValue(t.GetInputs())}, []string{getOnlyValue(t.GetOutputs())}, nil)
for _, global := range t.GetInputs() {
col := comps.GetPcollections()[global]
ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
Expand All @@ -221,22 +217,22 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
em.StageAggregates(stage.ID)
case urns.TransformImpulse:
impulses = append(impulses, stage.ID)
em.AddStage(stage.ID, nil, nil, []string{getOnlyValue(t.GetOutputs())})
em.AddStage(stage.ID, nil, []string{getOnlyValue(t.GetOutputs())}, nil)
case urns.TransformFlatten:
inputs := maps.Values(t.GetInputs())
sort.Strings(inputs)
em.AddStage(stage.ID, inputs, nil, []string{getOnlyValue(t.GetOutputs())})
em.AddStage(stage.ID, inputs, []string{getOnlyValue(t.GetOutputs())}, nil)
}
stages[stage.ID] = stage
case wk.Env:
if err := buildDescriptor(stage, comps, wk, ds); err != nil {
if err := buildDescriptor(stage, comps, wk, em); err != nil {
return fmt.Errorf("prism error building stage %v: \n%w", stage.ID, err)
}
stages[stage.ID] = stage
slog.Debug("pipelineBuild", slog.Group("stage", slog.String("ID", stage.ID), slog.String("transformName", t.GetUniqueName())))
outputs := maps.Keys(stage.OutputsToCoders)
sort.Strings(outputs)
em.AddStage(stage.ID, []string{stage.primaryInput}, stage.sides, outputs)
em.AddStage(stage.ID, []string{stage.primaryInput}, outputs, stage.sideInputs)
default:
err := fmt.Errorf("unknown environment[%v]", t.GetEnvironmentId())
slog.Error("Execute", err)
Expand Down Expand Up @@ -273,7 +269,7 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
defer func() { <-maxParallelism }()
s := stages[rb.StageID]
wk := wks[s.envID]
if err := s.Execute(ctx, j, wk, ds, comps, em, rb); err != nil {
if err := s.Execute(ctx, j, wk, comps, em, rb); err != nil {
// Ensure we clean up on bundle failure
em.FailBundle(rb)
bundleFailed <- err
Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/runners/prism/internal/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error)
}

func executeWithT(ctx context.Context, t testing.TB, p *beam.Pipeline) (beam.PipelineResult, error) {
t.Helper()
t.Log("startingTest - ", t.Name())
s1 := rand.NewSource(time.Now().UnixNano())
r1 := rand.New(s1)
Expand Down
Loading

0 comments on commit 1f4bc68

Please sign in to comment.