-
Notifications
You must be signed in to change notification settings - Fork 344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Exposing broker metadata #745
Conversation
6471877
to
d6d871d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Shoothzj work for this , just a little comments, please check
|
||
var messageIndex *uint64 | ||
var brokerPublishTime *time.Time | ||
if brokerMetadata != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we've checked above that the brokerMetadata object is nil by doing the following:
brokerMetadata, err := reader.ReadBrokerMetadata()
if err != nil { // here
// todo optimize use more appropriate error codes
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_BatchDeSerializeError)
return err
}
Do we still need to double check here? Will someone concurrently modify the value of the brokerMetadata object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
brokermetadata
can be null if broker not enable this feature
@@ -597,7 +602,18 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header | |||
pc.AckID(msgID) | |||
continue | |||
} | |||
|
|||
var messageIndex *uint64 | |||
var brokerPublishTime *time.Time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe instead of using pointers we can use the following definition:
var messageIndex uint64
var brokerPublishTime time.Time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wolfstudy It can be null if broker not enable brokerMetadata
feature
magicNumber := binary.BigEndian.Uint16(r.buffer.Get(r.buffer.ReaderIndex(), 2)) | ||
if magicNumber != magicBrokerEntryMetadata { | ||
return nil, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the error object returned as nil here? If we don't need this error field, maybe we can cancel it in function definition? Or we can use errors.new("xxx")
to customize the error message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not an error, it means broker not enable brokermetadata
feature
678fd68
to
ab51f2d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
### Motivation pulsar-client-cpp doesn't support parse broker metadata. Which will error whiling connect to enabledbrokerMetada pulsar. This PR makes pulsar-client-cpp can consume messages well, but haven't exposing index interface yet. See also apache/pulsar-client-go#745 https://github.com/apache/pulsar/blob/e38091044c428af002b16110531497e2abc897d2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1289 ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc-required` (Your PR needs to update docs and you will update later) - [x] `doc-not-needed` (Please explain why) - [ ] `doc` (Your PR contains doc changes) - [ ] `doc-complete` (Docs have been already added)
Motivation
Adapt PIP 90 in go client
allow go client retrieve brokermetadata
Modifications
CommandConnect
featureflags, turnSupportsBrokerEntryMetadata
to trueindex()
brokerPublishTime
methods onmessage
interfaceSkip
method on bytebuffer