Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport/server: fix race that could cause a stray header to be sent #5513

Merged
merged 2 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,15 +945,16 @@ func (t *http2Server) streamContextErr(s *Stream) error {

// WriteHeader sends the header metadata md back to the client.
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
if s.updateHeaderSent() {
return ErrIllegalHeaderWrite
}

s.hdrMu.Lock()
defer s.hdrMu.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is orthogonal to the bug fix right? I get that this fixes a correctness issue with s.updateHeaderSent, but you could provide me a concrete example of what this fixes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to hold the lock while calling s.updateHeaderSent and checking/setting s.getState at least. Otherwise it races with WriteStatus doing similar operations. So while this is not exactly the original bug, it's another related race that is possible if trying to send headers while returning from the method handler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah makes sense to me. Although you need to hold mutex for writeHeaderLocked() too right? (What you have currently...but your comment just mentions updateHeaderSent and getState).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, that too.

if s.getState() == streamDone {
return t.streamContextErr(s)
}

s.hdrMu.Lock()
if s.updateHeaderSent() {
return ErrIllegalHeaderWrite
}

if md.Len() > 0 {
if s.header.Len() > 0 {
s.header = metadata.Join(s.header, md)
Expand All @@ -962,10 +963,8 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
}
}
if err := t.writeHeaderLocked(s); err != nil {
s.hdrMu.Unlock()
return status.Convert(err).Err()
}
s.hdrMu.Unlock()
return nil
}

Expand Down Expand Up @@ -1013,17 +1012,19 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
// OK is adopted.
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
s.hdrMu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what you wrote in issue -" It should be fine to hold the stream's header mutex during these entire functions; they are very infrequently called" is why it is ok for this unlock to be past stuff like stats handlers which would block any operations waiting on the mutex?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could unlock before calling the stats handler (we technically just need to call updateHeaderSent and finishStream to prevent any races), but this should be okay considering a race between multiple calls to WriteStatus and/or WriteHeader should be extremely rare.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sg. Seems very minimal, but was just wondering your thought process.

defer s.hdrMu.Unlock()

if s.getState() == streamDone {
return nil
}
s.hdrMu.Lock()

// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
if !s.updateHeaderSent() { // No headers have been sent.
if len(s.header) > 0 { // Send a separate header frame.
if err := t.writeHeaderLocked(s); err != nil {
s.hdrMu.Unlock()
return err
}
} else { // Send a trailer only response.
Expand Down Expand Up @@ -1052,7 +1053,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
endStream: true,
onWrite: t.setResetPingStrikes,
}
s.hdrMu.Unlock()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: do we want this blank line (used to have no blanks)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. Helps to visually separate building the frame from writing it.

success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
if !success {
if err != nil {
Expand Down
32 changes: 32 additions & 0 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8077,3 +8077,35 @@ func (s) TestUnexpectedEOF(t *testing.T) {
}
}
}

// TestRecvWhileReturningStatus performs a Recv in a service handler while the
// handler returns its status. A race condition could result in the server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"A race condition could result in the server sending headers twice (once as a result of the status and once as a result of the failed Recv call)." This doesn't seem like the race to me? Isn't the race the bit where instead of writing the header with result from status first it writes the header from failed Recv call without status first due to s.updateHeaderSent is true). Regardless of ordering, both headers get sent out anyway? Hence your comment in issue "E.g. to ignore the extra headers on a stream after END_STREAM was already sent".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the race the bit where instead of writing the header with result from status first it writes the header from failed Recv call without status first due to s.updateHeaderSent is true

I'm not sure of the order -- it probably doesn't matter. The failed Recv call will also write a status, though, here:

grpc-go/stream.go

Lines 1586 to 1588 in b695a7f

if err != nil && err != io.EOF {
st, _ := status.FromError(toRPCErr(err))
ss.t.WriteStatus(ss.s, st)

Is there something you think needs to change here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I saw that line when triaging. All I was getting at with this comment was that I think the explanation of the race in the comment is wrong. The race is the fact that the second WriteStatus call reads updateHeaderSent as true, doesn't attach :status, and then sends that BEFORE the first WriteStatus call on the wire without the Status, which is what broke howardjohn. Saying the race is sending headers twice is accurate but not the explanation as to why it broke client in this scenario. (And yes it also breaks HTTP_2 because sends another frame after END_STREAM)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, I hadn't completely linked the reason for the failure on the client side, actually, but that makes more sense than my original thought that two :status headers were written and the client saw the second one and errored. So my comment about "ignore the extra headers on a stream after END_STREAM was already sent" is probably bogus. Updated this comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool thanks comment LGTM.

// sending the first headers frame without the HTTP :status header. This can
// happen when the failed Recv (due to the handler returning) and the handler's
// status both attempt to write the status, which would be the first headers
// frame sent, simultaneously.
func (s) TestRecvWhileReturningStatus(t *testing.T) {
ss := &stubserver.StubServer{
FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
// The client never sends, so this Recv blocks until the server
// returns and causes stream operations to return errors.
go stream.Recv()
return nil
},
}
if err := ss.Start(nil); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: 10 * time.Second

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spaces?

We have a pretty healthy existing mix of spaces/no spaces, and gofmt is OK with either.

$ grep -r '\* time.Second' . | wc -l
     235
$ grep -r '\*time.Second' . | wc -l
     134

I'm inclined to just leave this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, talked about offline, this was thought carried over from two of my internship's linters. Didn't realize we have both for some reason. This is fine.

defer cancel()
for i := 0; i < 100; i++ {
stream, err := ss.Client.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("Error while creating stream: %v", err)
}
if _, err := stream.Recv(); err != io.EOF {
t.Fatalf("stream.Recv() = %v, want io.EOF", err)
}
}
}