Skip to content

Commit

Permalink
[prism] Send job failure message. (#27573)
Browse files Browse the repository at this point in the history
Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored Jul 20, 2023
1 parent 3d501ee commit 51512ca
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 2 deletions.
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,6 @@ func (j *Job) Done() {

// Failed indicates that the job completed unsuccessfully.
func (j *Job) Failed(err error) {
j.sendState(jobpb.JobState_FAILED)
j.failureErr = err
j.sendState(jobpb.JobState_FAILED)
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,20 @@ func (s *Server) GetMessageStream(req *jobpb.JobMessagesRequest, stream jobpb.Jo
for {
for (curMsg >= job.maxMsg || len(job.msgs) == 0) && curState > job.stateIdx {
switch state {
case jobpb.JobState_CANCELLED, jobpb.JobState_DONE, jobpb.JobState_DRAINED, jobpb.JobState_FAILED, jobpb.JobState_UPDATED:
case jobpb.JobState_CANCELLED, jobpb.JobState_DONE, jobpb.JobState_DRAINED, jobpb.JobState_UPDATED:
// Reached terminal state.
return nil
case jobpb.JobState_FAILED:
// Ensure we send an error message with the cause of the job failure.
stream.Send(&jobpb.JobMessagesResponse{
Response: &jobpb.JobMessagesResponse_MessageResponse{
MessageResponse: &jobpb.JobMessage{
MessageText: job.failureErr.Error(),
Importance: jobpb.JobMessage_JOB_MESSAGE_ERROR,
},
},
})
return nil
}
job.streamCond.Wait()
select { // Quit out if the external connection is done.
Expand Down

0 comments on commit 51512ca

Please sign in to comment.