-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
transport/server: fix race that could cause a stray header to be sent #5513
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -945,15 +945,16 @@ func (t *http2Server) streamContextErr(s *Stream) error { | |
|
||
// WriteHeader sends the header metadata md back to the client. | ||
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { | ||
if s.updateHeaderSent() { | ||
return ErrIllegalHeaderWrite | ||
} | ||
|
||
s.hdrMu.Lock() | ||
defer s.hdrMu.Unlock() | ||
if s.getState() == streamDone { | ||
return t.streamContextErr(s) | ||
} | ||
|
||
s.hdrMu.Lock() | ||
if s.updateHeaderSent() { | ||
return ErrIllegalHeaderWrite | ||
} | ||
|
||
if md.Len() > 0 { | ||
if s.header.Len() > 0 { | ||
s.header = metadata.Join(s.header, md) | ||
|
@@ -962,10 +963,8 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { | |
} | ||
} | ||
if err := t.writeHeaderLocked(s); err != nil { | ||
s.hdrMu.Unlock() | ||
return status.Convert(err).Err() | ||
} | ||
s.hdrMu.Unlock() | ||
return nil | ||
} | ||
|
||
|
@@ -1013,17 +1012,19 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error { | |
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early | ||
// OK is adopted. | ||
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { | ||
s.hdrMu.Lock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So what you wrote in issue -" It should be fine to hold the stream's header mutex during these entire functions; they are very infrequently called" is why it is ok for this unlock to be past stuff like stats handlers which would block any operations waiting on the mutex? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could unlock before calling the stats handler (we technically just need to call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sg. Seems very minimal, but was just wondering your thought process. |
||
defer s.hdrMu.Unlock() | ||
|
||
if s.getState() == streamDone { | ||
return nil | ||
} | ||
s.hdrMu.Lock() | ||
|
||
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields | ||
// first and create a slice of that exact size. | ||
headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else. | ||
if !s.updateHeaderSent() { // No headers have been sent. | ||
if len(s.header) > 0 { // Send a separate header frame. | ||
if err := t.writeHeaderLocked(s); err != nil { | ||
s.hdrMu.Unlock() | ||
return err | ||
} | ||
} else { // Send a trailer only response. | ||
|
@@ -1052,7 +1053,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { | |
endStream: true, | ||
onWrite: t.setResetPingStrikes, | ||
} | ||
s.hdrMu.Unlock() | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: do we want this blank line (used to have no blanks) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think so. Helps to visually separate building the frame from writing it. |
||
success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader) | ||
if !success { | ||
if err != nil { | ||
|
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -8077,3 +8077,33 @@ func (s) TestUnexpectedEOF(t *testing.T) { | |||||||
} | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
// TestRecvWhileReturningStatus performs a Recv in a service handler while the | ||||||||
// handler returns its status. A race condition could result in the server | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "A race condition could result in the server sending headers twice (once as a result of the status and once as a result of the failed Recv call)." This doesn't seem like the race to me? Isn't the race the bit where instead of writing the header with result from status first it writes the header from failed Recv call without status first due to s.updateHeaderSent is true). Regardless of ordering, both headers get sent out anyway? Hence your comment in issue "E.g. to ignore the extra headers on a stream after END_STREAM was already sent". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'm not sure of the order -- it probably doesn't matter. The failed Lines 1586 to 1588 in b695a7f
Is there something you think needs to change here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I saw that line when triaging. All I was getting at with this comment was that I think the explanation of the race in the comment is wrong. The race is the fact that the second WriteStatus call reads updateHeaderSent as true, doesn't attach :status, and then sends that BEFORE the first WriteStatus call on the wire without the Status, which is what broke howardjohn. Saying the race is sending headers twice is accurate but not the explanation as to why it broke client in this scenario. (And yes it also breaks HTTP_2 because sends another frame after END_STREAM) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Aha, I hadn't completely linked the reason for the failure on the client side, actually, but that makes more sense than my original thought that two There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool thanks comment LGTM. |
||||||||
// sending headers twice (once as a result of the status and once as a result | ||||||||
// of the failed Recv call). | ||||||||
func (s) TestRecvWhileReturningStatus(t *testing.T) { | ||||||||
ss := &stubserver.StubServer{ | ||||||||
FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error { | ||||||||
// The client never sends, so this Recv blocks until the server | ||||||||
// returns and causes stream operations to return errors. | ||||||||
go stream.Recv() | ||||||||
return nil | ||||||||
}, | ||||||||
} | ||||||||
if err := ss.Start(nil); err != nil { | ||||||||
t.Fatalf("Error starting endpoint server: %v", err) | ||||||||
} | ||||||||
defer ss.Stop() | ||||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: 10 * time.Second There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Spaces? We have a pretty healthy existing mix of spaces/no spaces, and
I'm inclined to just leave this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, talked about offline, this was thought carried over from two of my internship's linters. Didn't realize we have both for some reason. This is fine. |
||||||||
defer cancel() | ||||||||
for i := 0; i < 100; i++ { | ||||||||
stream, err := ss.Client.FullDuplexCall(ctx) | ||||||||
if err != nil { | ||||||||
t.Fatalf("Error while creating stream: %v", err) | ||||||||
} | ||||||||
if _, err := stream.Recv(); err != io.EOF { | ||||||||
t.Fatalf("stream.Recv() = %v, want io.EOF", err) | ||||||||
} | ||||||||
} | ||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is orthogonal to the bug fix right? I get that this fixes a correctness issue with s.updateHeaderSent, but you could provide me a concrete example of what this fixes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to hold the lock while calling
s.updateHeaderSent
and checking/settings.getState
at least. Otherwise it races withWriteStatus
doing similar operations. So while this is not exactly the original bug, it's another related race that is possible if trying to send headers while returning from the method handler.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah makes sense to me. Although you need to hold mutex for writeHeaderLocked() too right? (What you have currently...but your comment just mentions updateHeaderSent and getState).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, that too.