Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat]: Support auto scaled consumer receiver queue #976

Merged
merged 7 commits into from
Mar 15, 2023

Conversation

Gleiphir2769
Copy link
Contributor

@Gleiphir2769 Gleiphir2769 commented Mar 3, 2023

Master Issue: #927

Motivation

Note: This is part of the work for PIP 74 in go client.

The memory limit in consumer side relys on flow control. So we need to firstly support auto scaled consumer receiver queue before we implement the consumer side memory limitation.

Why should we support auto scaled consumer first before supporting consumer client memory limit?

The main memory consumption on the consumer side comes from the messages that the consumer prefetches from the broker. Currently, the number of prefetched messages depends on the "current available permits" which represents the messages that have been delivered to the user. When the availablePermits reach a certain threshold, the client will send a Flow request to the broker to prefetch messages. The current threshold is half of the "receiverQueueSize" (default 1000).

flowThreshold := int32(math.Max(float64(p.pc.queueSize/2), 1))
if ap >= flowThreshold {
availablePermits := ap
requestedPermits := ap
// check if permits changed
if !atomic.CompareAndSwapInt32(&p.permits, ap, 0) {
return
}
p.pc.log.Debugf("requesting more permits=%d available=%d", requestedPermits, availablePermits)
if err := p.pc.internalFlow(uint32(requestedPermits)); err != nil {
p.pc.log.WithError(err).Error("unable to send permits")
}
}

If we do not support auto scaled consumer receiver queue, the Flow threshold will be fixed. We cannot dynamically adjust the number of prefetched messages, resulting in the inability to control consumer memory. So auto scaled consumer receiver queue needs to be supported first.

How does "auto scaled consumer receiver queue" work?

In consumer side, all controls related to message pulling are actually controlling the Flow. For example, receiverQueueSize controls the Flow by controlling the size of the receiver queue. ReceiverQueue queueCh can receive more messages than receiverQueueSize because queueCh is a chan []*message.

queueCh: make(chan []*message, options.receiverQueueSize),

If we want to control the Flow threshold, we should make the queueSize can be auto scaled. There are mainly two problems to be solved.

  • Under what condition is the queue considered to need to be scaled?

When the consumption rate required by the user is greater than the configured rate, we increase the configured rate. This condition can be expressed as, before triggering a new Flow to obtain new messages, the number of messages that the current consumer has received (i.e., current available permits + current messages in queueCh) is greater than the current queue size.

  • When is the timing to scale receiver queue size?

In the Java client, expectMoreIncomingMessages is triggered by "empty reveive".

https://github.com/apache/pulsar/blob/da30b2e211d0a097abfa8d0392f1fd4b13a46328/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L478-L482

In the Go Client, we can not get the "empty reveive" because the messageCh is exposed to user to async consume. So we expectMoreIncomingMessages when messageCh is not full.

case messageCh <- nextMessage:
// allow this message to be garbage collected
messages[0] = nil
messages = messages[1:]
pc.availablePermits.inc()

More details in #927 .

Modifications

  • Add currentQueueSize and scaleReceiverQueueHint to control Flow rate.
  • Add AutoScaledReceiverQueueSize option for Consumer

Verifying this change

  • Make sure that the change passes the CI checks.

Here is a test with pulsar-perf on local standalone server to confirm the effect of this PR.

Non-partitioned topic consumer

$  ./pulsar-perf produce my-test -r 1000 -s 128 -b 0 
$  ./pulsar-perf consume my-test -r 10000 --enable-auto-scaled-queue-size=true
RATE Final receiver queue size
1 1
5 2
10 2
100 8
1000 64
10000 1000

3-partitioned topic consumer

$  ./pulsar-perf produce my-test-partitioned -r 1000 -s 128 -b 0 
$  ./pulsar-perf consume my-test-partitioned -r 10000 --enable-auto-scaled-queue-size=true
RATE Final total receiver queue size Sub-consumers receiver queue size
1 4 2,1,1
10 8 4,2,2
100 16 4,8,4
1000 192 64,64,64
10000 1536 512,512,512
20000 3000 1000,1000,1000

Copy link
Member

@shibd shibd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! After a preliminary look, I left some comments.

I see that the unit tests are commented on, and you can finish them up. Can you do a benchmark? Just like this one: apache/pulsar#14494 (comment)

pulsar/consumer_partition.go Outdated Show resolved Hide resolved
pulsar/consumer_partition.go Outdated Show resolved Hide resolved
pulsar/consumer_partition.go Outdated Show resolved Hide resolved
@Gleiphir2769 Gleiphir2769 changed the title [feat]: Support auto scaled consumer receiver queue [feat]: Draft: Support auto scaled consumer receiver queue Mar 7, 2023
@Gleiphir2769 Gleiphir2769 changed the title [feat]: Draft: Support auto scaled consumer receiver queue Draft: [feat]: Support auto scaled consumer receiver queue Mar 7, 2023
@shibd shibd marked this pull request as draft March 7, 2023 03:07
@Gleiphir2769 Gleiphir2769 changed the title Draft: [feat]: Support auto scaled consumer receiver queue [feat]: Support auto scaled consumer receiver queue Mar 10, 2023
@shibd shibd marked this pull request as ready for review March 10, 2023 11:40
@Gleiphir2769 Gleiphir2769 force-pushed the auto_scaled_consumer branch from 4a2ebb0 to 57a1340 Compare March 14, 2023 01:30
pulsar/consumer_partition.go Outdated Show resolved Hide resolved
pulsar/consumer_test.go Outdated Show resolved Hide resolved
pulsar/consumer_test.go Outdated Show resolved Hide resolved
@shibd shibd merged commit b8563cd into apache:master Mar 15, 2023
@RobertIndie RobertIndie added this to the v0.10.0 milestone Mar 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants