Skip to content

Commit

Permalink
[apache#32211] Fail job on SDK worker disconnect.
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Dec 5, 2024
1 parent d24d838 commit 3bb619f
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 15 deletions.
28 changes: 23 additions & 5 deletions sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,26 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan struct{} {
slog.Debug("processing", "bundle", b, "worker", wk)

// Tell the SDK to start processing the bundle.
wk.InstReqs <- &fnpb.InstructionRequest{
req := &fnpb.InstructionRequest{
InstructionId: b.InstID,
Request: &fnpb.InstructionRequest_ProcessBundle{
ProcessBundle: &fnpb.ProcessBundleRequest{
ProcessBundleDescriptorId: b.PBDID,
},
},
}
select {
case <-wk.StoppedChan:
// The worker was stopped before req was sent.
// Quit to avoid sending on a closed channel.
outCap := b.OutputCount + len(b.HasTimers)
for i := 0; i < outCap; i++ {
b.DataOrTimerDone()
}
return b.DataWait
case wk.InstReqs <- req:
// desired outcome
}

// TODO: make batching decisions on the maxium to send per elements block, to reduce processing time overhead.
for _, block := range b.Input {
Expand Down Expand Up @@ -163,10 +175,13 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan struct{} {
}

select {
case wk.DataReqs <- elms:
case <-wk.StoppedChan:
b.DataOrTimerDone()
return b.DataWait
case <-ctx.Done():
b.DataOrTimerDone()
return b.DataWait
case wk.DataReqs <- elms:
}
}

Expand All @@ -181,6 +196,12 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan struct{} {
})
}
select {
case <-wk.StoppedChan:
b.DataOrTimerDone()
return b.DataWait
case <-ctx.Done():
b.DataOrTimerDone()
return b.DataWait
case wk.DataReqs <- &fnpb.Elements{
Timers: timers,
Data: []*fnpb.Elements_Data{
Expand All @@ -191,9 +212,6 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan struct{} {
},
},
}:
case <-ctx.Done():
b.DataOrTimerDone()
return b.DataWait
}

return b.DataWait
Expand Down
53 changes: 43 additions & 10 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type W struct {
// These are the ID sources
inst uint64
connected, stopped atomic.Bool
StoppedChan chan struct{} // Channel to Broadcast stopped state.

InstReqs chan *fnpb.InstructionRequest
DataReqs chan *fnpb.Elements
Expand Down Expand Up @@ -96,8 +97,9 @@ func New(id, env string) *W {
lis: lis,
server: grpc.NewServer(opts...),

InstReqs: make(chan *fnpb.InstructionRequest, 10),
DataReqs: make(chan *fnpb.Elements, 10),
InstReqs: make(chan *fnpb.InstructionRequest, 10),
DataReqs: make(chan *fnpb.Elements, 10),
StoppedChan: make(chan struct{}),

activeInstructions: make(map[string]controlResponder),
Descriptors: make(map[string]*fnpb.ProcessBundleDescriptor),
Expand Down Expand Up @@ -132,12 +134,26 @@ func (wk *W) LogValue() slog.Value {
)
}

// shutdown safely closes channels, and can be called in the event of SDK crashes.
//
// Splitting this logic from the GRPC server Stop is necessary, since a worker
// crash would be handled in a streaming RPC context, which will block GRPC
// stop calls.
func (wk *W) shutdown() {
// If this is the first call to "stop" this worker, also close the channels.
if wk.stopped.CompareAndSwap(false, true) {
slog.Debug("shutdown", "worker", wk, "firstTime", true)
close(wk.StoppedChan)
close(wk.InstReqs)
close(wk.DataReqs)
} else {
slog.Debug("shutdown", "worker", wk, "firstTime", false)
}
}

// Stop the GRPC server.
func (wk *W) Stop() {
slog.Debug("stopping", "worker", wk)
wk.stopped.Store(true)
close(wk.InstReqs)
close(wk.DataReqs)
wk.shutdown()

// Give the SDK side 5 seconds to gracefully stop, before
// hard stopping all RPCs.
Expand Down Expand Up @@ -331,17 +347,21 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
case <-ctrl.Context().Done():
wk.mu.Lock()
// Fail extant instructions
slog.Debug("SDK Disconnected", "worker", wk, "ctx_error", ctrl.Context().Err(), "outstanding_instructions", len(wk.activeInstructions))
err := context.Cause(ctrl.Context())
slog.Debug("SDK Disconnected", "worker", wk, "ctx_error", err, "outstanding_instructions", len(wk.activeInstructions))

msg := fmt.Sprintf("SDK worker disconnected: %v, %v active instructions", wk.String(), len(wk.activeInstructions))
msg := fmt.Sprintf("SDK worker disconnected: %v, %v active instructions, error: %v", wk.String(), len(wk.activeInstructions), err)
for instID, b := range wk.activeInstructions {
b.Respond(&fnpb.InstructionResponse{
InstructionId: instID,
Error: msg,
})
}
// Soft shutdown to prevent GRPC shutdown from being blocked by this
// streaming call.
wk.shutdown()
wk.mu.Unlock()
return context.Cause(ctrl.Context())
return err
case err := <-done:
if err != nil {
slog.Warn("Control done", "error", err, "worker", wk)
Expand Down Expand Up @@ -639,9 +659,22 @@ func (wk *W) sendInstruction(ctx context.Context, req *fnpb.InstructionRequest)
if wk.Stopped() {
return nil
}
wk.InstReqs <- req
select {
case <-wk.StoppedChan:
return &fnpb.InstructionResponse{
InstructionId: progInst,
Error: "worker stopped before send",
}
case wk.InstReqs <- req:
// desired outcome
}

select {
case <-wk.StoppedChan:
return &fnpb.InstructionResponse{
InstructionId: progInst,
Error: "worker stopped before receive",
}
case <-ctx.Done():
return &fnpb.InstructionResponse{
InstructionId: progInst,
Expand Down

0 comments on commit 3bb619f

Please sign in to comment.