-
Notifications
You must be signed in to change notification settings - Fork 344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix logic of command for sendError #622
Fix logic of command for sendError #622
Conversation
Signed-off-by: xiaolongran <[email protected]>
Signed-off-by: xiaolongran <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
case pb.ServerError_TopicTerminatedError: | ||
request, ok := c.deletePendingRequest(requestID) | ||
_, ok := c.deletePendingProducers(producerID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought all commands sent to the broker will have a request id? Do we still need to clean those up from the the pending request queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In CommandSendError, just only ProducerId
and SequenceId
type CommandSendError struct {
ProducerId *uint64 `protobuf:"varint,1,req,name=producer_id,json=producerId" json:"producer_id,omitempty"`
SequenceId *uint64 `protobuf:"varint,2,req,name=sequence_id,json=sequenceId" json:"sequence_id,omitempty"`
Error *ServerError `protobuf:"varint,3,req,name=error,enum=pulsar.proto.ServerError" json:"error,omitempty"`
Message *string `protobuf:"bytes,4,req,name=message" json:"message,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, we only need to deal with the map of listeners responsible for managing the producer objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. The thing I'm still confused about is. Each request sent on the connection can get added into the pending request map right?
https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/connection.go#L620
func (c *connection) internalSendRequest(req *request) {
if c.closed() {
c.log.Warnf("internalSendRequest failed for connectionClosed")
if req.callback != nil {
req.callback(req.cmd, ErrConnectionClosed)
}
} else {
c.pendingLock.Lock()
if req.id != nil {
c.pendingReqs[*req.id] = req
}
c.pendingLock.Unlock()
c.writeCommand(req.cmd)
}
}
If a command is sent and gets added to the pending requests map and then we get this response from the broker pb.ServerError_TopicTerminatedError
will we end up leaving/leaking commands in the pending requests map? If there is no request id maybe it can't be avoided. Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Agree with your ideas.
The first point: Here we really should deal with SendError Command, not ErrorCommand. This should be determined. But the requestID is not included in SendError.
The second point: This requestID should be obtained from the Protobuf protocol. Processing PendingRequest needs to rely on requestID, so now I am also a bit confused. After receiving SendError, what should we do here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Agree with your ideas.
The first point: Here we really should deal with SendError Command, not ErrorCommand. This should be determined. But the requestID is not included in SendError.
The second point: This requestID should be obtained from the Protobuf protocol. Processing PendingRequest needs to rely on requestID, so now I am also a bit confused. After receiving SendError, what should we do here?
default: | ||
// By default, for transient error, let the reconnection logic | ||
// to take place and re-establish the produce again | ||
c.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we need to close the connection here anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refer to the above ideas, if we need to clean the pendingRequest cache, then we'd better close the connection here. If we only need to clean up the map of the producer of listeners, then here we only trigger the logic of reconnection should be enough. Because of this connection, there may be other producers in use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the java client closes the connection in this case? What does it do for the other cases above like pb.ServerError_TopicTerminatedError
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, will fix this.
return | ||
} | ||
|
||
errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage()) | ||
request.callback(nil, errors.New(errMsg)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the producer still need to be notified somehow?
case pb.ServerError_TopicTerminatedError: | ||
request, ok := c.deletePendingRequest(requestID) | ||
_, ok := c.deletePendingProducers(producerID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. The thing I'm still confused about is. Each request sent on the connection can get added into the pending request map right?
https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/connection.go#L620
func (c *connection) internalSendRequest(req *request) {
if c.closed() {
c.log.Warnf("internalSendRequest failed for connectionClosed")
if req.callback != nil {
req.callback(req.cmd, ErrConnectionClosed)
}
} else {
c.pendingLock.Lock()
if req.id != nil {
c.pendingReqs[*req.id] = req
}
c.pendingLock.Unlock()
c.writeCommand(req.cmd)
}
}
If a command is sent and gets added to the pending requests map and then we get this response from the broker pb.ServerError_TopicTerminatedError
will we end up leaving/leaking commands in the pending requests map? If there is no request id maybe it can't be avoided. Am I missing something?
Signed-off-by: xiaolongran <[email protected]> Fixes apache#623 ### Motivation As apache#623 said, when the topic is deleted forced, we don't should trying to reconnect, instead of giving up reconnection. ### Modifications - Fix prodcuer reconnetion logic - Fix consumer reconnection logic
Signed-off-by: xiaolongran <[email protected]>
Signed-off-by: xiaolongran <[email protected]>
Signed-off-by: xiaolongran <[email protected]>
Signed-off-by: xiaolongran <[email protected]>
As @cckellogg said, the current method may cause the leak of pendingRequest resources, here we first merge the current pull request, and then create a new issue to track the problem here. And the issue is: #636. |
Motivation
As shown in the figure above, the
ServerError
returned by the broker isUnknownError
when the client receives it. In fact, we handled the wrong command here. Here we should deal withCommandSendError
instead ofCommandError
. Correspondingly, we should deal with thelistener
map used to cache the producer instead of the correspondingpendingRequest
map.Modifications