-
Notifications
You must be signed in to change notification settings - Fork 344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue 456] Support chunking for big messages. #805
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on Java implementation, ConsumerImpl.java, Chunking also requires changes at the consumer side to be able to assembly chunks into the original message. Are you going to add support on the consumer side?
The consumer side implement is already planned. It will be commited in another PR. |
…g of sequenceID generate; fix the bug where batch messages are compressed twice; add internal error handle, fix inaccurate PublishErrorsMsgTooLarge metric; fix the incorrect producer queueFullBlock
bf39e92
to
0456630
Compare
…ecause it's no need
Hi, the consumer side is implemented here. Looking forward to your review. @zzzming |
…atchIDX to pass test. Add the lisence headers to timewheel.go/timewheel_test.go/message_chunking_test.go
…e. 2. Add the unit test for chunk size. 3. fix some other bugs.
This PR will be possible to fix this issue #447. |
/pulsarbot run-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your PR seems to have made improvements unrelated to chunking, if so, I suggest you make a new PR to improve.
|
||
if mid.consumer != nil { | ||
return mid.Ack() | ||
if err := c.checkMsgIDPartition(msgID); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed covert the msgID
from the MessageID
to the trackingMessageID
type, I'm not sure if we need this.
Why not use messageID()
, what did I miss?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trackingMessageID
does not records chunking infomation.
For example, Ack()
a big message need to ack all the chunks of it. Using trackingMessageID
can not figure out which chunk (messageId) need to be ack.
trackingMessageID
is designed to tracking batch messages so it shoud not be the messageId type accepted by the method exposed by partitionConsumer
. I think the better way would be to accept MessageID
as the messageId type in partitionConsumer methods. However, only the necessary interfaces have been modified (Ack
, NAck
and Seed
) for the least changes
@@ -164,7 +167,11 @@ func (bc *batchContainer) hasSpace(payload []byte) bool { | |||
return true | |||
} | |||
msgSize := uint32(len(payload)) | |||
return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize) | |||
expectedSize := bc.buffer.ReadableBytes() + msgSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert this code(170-174).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
L171 compared the bc.buffer.ReadableBytes() + msgSize
with bc.maxMessageSize
and the original code does not.
It used to make sure that one batch size does not exceed the maxMessageSize
. It's a part of correctly calculation whether the message is too large.
By the way, the compare code is too long to be inline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, but using inline looks like more clear, so like:
return bc.numMessages+1 <= bc.maxMessages && expectedSize <= uint32(bc.maxBatchSize) && expectedSize <= bc.maxMessageSize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nodece This PR introduces the maxMessageSize
. I think these changes should be related to this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, but using inline looks like more clear, so like:
return bc.numMessages+1 <= bc.maxMessages && expectedSize <= uint32(bc.maxBatchSize) && expectedSize <= bc.maxMessageSize
Done, thx.
/pulsarbot run-failure-checks |
Does the chunking work on shared subscription introduced in Pulsar 2.11? |
Hi @maraiskruger1980, This PR is finished when pulsar 2.11 has not been released. So it doesn't support shared subscription chunking. I think I can take some time on it. Welcome to follow the progress. |
That will be great if it can support shared subscription |
Hi @maraiskruger1980. After I checked, it does no limit in consumer when subscription is |
Contribution Checklist
Master Issue: #456
Motivation
Make pulsar go client support chunking to produce/consume big messages. The earlier implementation (#717) didn't take into account many details, so I decided to reimplement it.
Modifications
internalSingleSend
to send message without batch because batch message will not be received by chunk.BlockIfQueueFull
check frominternalSendAsync
tointernalSend
(canAddQueue
) to ensure the normal block in chunking.chunkedMsgCtxMap
to store chunked messages meta and data.Verifying this change
This change added tests and can be verified as follows:
TestProducerChunking
to verify send big message by chunking.message_chunking_test
to verify message chunking.Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation