diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go index ea7b09c84413..cd8ab7943ce5 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go @@ -160,6 +160,6 @@ func (j *Job) Done() { // Failed indicates that the job completed unsuccessfully. func (j *Job) Failed(err error) { - j.sendState(jobpb.JobState_FAILED) j.failureErr = err + j.sendState(jobpb.JobState_FAILED) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index f65d2eb070f7..0c16b5eb34f4 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -186,9 +186,20 @@ func (s *Server) GetMessageStream(req *jobpb.JobMessagesRequest, stream jobpb.Jo for { for (curMsg >= job.maxMsg || len(job.msgs) == 0) && curState > job.stateIdx { switch state { - case jobpb.JobState_CANCELLED, jobpb.JobState_DONE, jobpb.JobState_DRAINED, jobpb.JobState_FAILED, jobpb.JobState_UPDATED: + case jobpb.JobState_CANCELLED, jobpb.JobState_DONE, jobpb.JobState_DRAINED, jobpb.JobState_UPDATED: // Reached terminal state. return nil + case jobpb.JobState_FAILED: + // Ensure we send an error message with the cause of the job failure. + stream.Send(&jobpb.JobMessagesResponse{ + Response: &jobpb.JobMessagesResponse_MessageResponse{ + MessageResponse: &jobpb.JobMessage{ + MessageText: job.failureErr.Error(), + Importance: jobpb.JobMessage_JOB_MESSAGE_ERROR, + }, + }, + }) + return nil } job.streamCond.Wait() select { // Quit out if the external connection is done.