Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [issue 1051]: Fix inaccurate producer mem limit in chunking and schema #1055

Merged
merged 5 commits into from
Jul 20, 2023

Conversation

Gleiphir2769
Copy link
Contributor

@Gleiphir2769 Gleiphir2769 commented Jul 11, 2023

Fixes #1051

Motivation

The producer memory limit have some problem when EnableChunking=true or Schema is set.

  • When Schema is set, the actual message payload is msg.Value. The len(msg.Payload) may be 0 and memory can not be reserved acurate.

    uncompressedPayload := msg.Payload
    uncompressedPayloadSize := int64(len(uncompressedPayload))
    var schemaPayload []byte
    var err error
    if msg.Value != nil && msg.Payload != nil {
    p.log.Error("Can not set Value and Payload both")
    runCallback(request.callback, nil, request.msg, errors.New("can not set Value and Payload both"))
    return
    }
    // The block chan must be closed when returned with exception
    defer request.stopBlock()
    if !p.canAddToQueue(request, uncompressedPayloadSize) {
    return
    }

  • In chunking, if producer meets the memory limit, it should release the memory for chunks which has send out. But the calculate for this release is not accurate, it should be uncompressedPayloadSize - int64(lhs) instead of uncompressedPayloadSize - int64(rhs)

    if chunkID != 0 && !p.canAddToQueue(nsr, 0) {
    p.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(rhs))
    return

  • In chunking, if internalSingleSend is failed, it should release the memory for single chunk. But we release all the chunks memory repeatly now.

    if err != nil {
    runCallback(request.callback, nil, request.msg, err)
    p.releaseSemaphoreAndMem(int64(len(msg.Payload)))
    p.log.WithError(err).Errorf("Single message serialize failed %s", msg.Value)
    return
    }

  • When producer received the receipt from broker, it should release the memory it reserved before sending. But it releases wrong size in chunking and schema.

    if sr.msg != nil {
    atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
    p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
    p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
    p.metrics.MessagesPublished.Inc()
    p.metrics.MessagesPending.Dec()
    payloadSize := float64(len(sr.msg.Payload))
    p.metrics.BytesPublished.Add(payloadSize)
    p.metrics.BytesPending.Sub(payloadSize)
    }

Modifications

  • Fix all the memory limit problems relative to chunking and schema
  • Add unit tests to cover these scenarios

Verifying this change

  • Make sure that the change passes the CI checks.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@Gleiphir2769 Gleiphir2769 marked this pull request as draft July 11, 2023 15:35
@gunli
Copy link
Contributor

gunli commented Jul 12, 2023

@Gleiphir2769 Great job, actaully I am preparing to fix this issue and #1043 and refactor the callback/releaseSemaphore/releaseMemory/metrics together, it will be a BIG PR, I am busy these days, if you have time, you can do it together.

My idea is:

  1. Calculte the required resouce (semaphore/memory, when chunking, more than 1 semaphores, we cache the compressedPayload/meta in the sendRequest when Calculting) before we put a request into the dataChan, if there is no enough resource, fail fast, in this way, we can delete the sendRequest.blockCh field and no need to block;
  2. Add a sendRequest.done() method, when a request is done (succeed or failed), call it, release the resources a request holds, run the callback, report metrics, write debug logs in this method, in this way, we manage the resource/logic together and don't have to do these things across the whole file.

@Gleiphir2769
Copy link
Contributor Author

Gleiphir2769 commented Jul 12, 2023

Hi @gunli.

Calculte the required resouce before we put a request into the dataChan

Before chunking introduction, the semaphore is required before dataChan. I moved it from internalSendAsync to internalSend because chunking need to get maxMessageSize by asking broker.

maxMessageSize := int(p._getConn().GetMaxMessageSize())
// compress payload if not batching

If we can get it brefore internalSend, we can make a easier way to reserve resouce.

Add a sendRequest.done() method, when a request is done (succeed or failed), call it

Sounds great. It's a bit difficult to understand sendRequest.callback() now.

And I think we can fix these bugs firstly. Refactoring work can be done in parallel. What do you think?

@gunli
Copy link
Contributor

gunli commented Jul 12, 2023

Hi @Gleiphir2769

If we can get it brefore internalSend, we can make a easier way to reserve resouce.

I think it is not a problem, you can check the code of connectionPool.GetConnection()/conn.waitUntilReady()/conn.doHandshake, when we get a conn from connpool, the conn is ready, and MaxMessageSize is cached in the connection. And in newPartitionProducer, when a producer is newed, the conn is ready, it is safe to get the conn, so GetMaxMessageSize() is safe too.

It's a bit difficult to understand sendRequest.callback() now

sendRequest.callback is just a field to store the callback function, in sendRequest.done(), we call this callback, something like this

func (sr *sendRequest) done(id *MessageID, err error) {
	if (sr.semaphore){
            sr.semaphore.Release()   
        }
        sr.memLimit.Release()
        runcallback(sr.callback, id, err)
        metrics.IncXXX()
        log.Debug/Error
       ...
}

In any other logic where the request is done, we just call request.done(), no need to care about resources/callback/metrics/debug logs, the code will be more clear.

And I think we can fix these bugs firstly. Refactoring work can be done in parallel. What do you think?

I agree with that.

@Gleiphir2769 Gleiphir2769 marked this pull request as ready for review July 12, 2023 17:11
@Gleiphir2769
Copy link
Contributor Author

Gleiphir2769 commented Jul 12, 2023

Could you give a review? @gunli @RobertIndie @shibd

Please feel free to leave your comment, thanks!

@@ -489,14 +488,13 @@ func (p *partitionProducer) internalSend(request *sendRequest) {

// The block chan must be closed when returned with exception
defer request.stopBlock()
if !p.canAddToQueue(request, uncompressedPayloadSize) {
if !p.canAddToQueue(request) {
Copy link
Contributor

@gunli gunli Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forget to runCallback, it is better to add a debug log

Copy link
Contributor Author

@Gleiphir2769 Gleiphir2769 Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runCallback is invoked by canReserveMem if it's failed.

@@ -542,6 +538,11 @@ func (p *partitionProducer) internalSend(request *sendRequest) {

uncompressedSize := len(uncompressedPayload)

// try to reserve memory for uncompressedPayload
if !p.canReserveMem(request, int64(uncompressedSize)) {
return
Copy link
Contributor

@gunli gunli Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forget to release semaphore and runCallback, it is better to add a debug log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semaphore is released by canReserveMem if it failed, runCallback as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think Semaphore released by canReserveMem may not be a good idea. It makes canReserveMem must be invoked after canAddToQueue . What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, runCallback/releaseSemaphore/releaseMemory in canAddToQueue and canReserveMem violates the Single Responsibility Principle

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The final solution is encapsulating the resource releasing logic in request.done(), anywhere there is an error, just call request.done()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm +1 for moving the semaphore release out of canReserveMem. It's better that we release it here than in the canReserveMem before we find a good solution for it.

@gunli
Copy link
Contributor

gunli commented Jul 13, 2023

I think is better to merge #1052 first, and then this PR is better to rebase after that, or it will confilct. @Gleiphir2769 @RobertIndie @shibd

Copy link
Member

@RobertIndie RobertIndie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work! Overall looks good to me. Left some comments.

pulsar/producer_partition.go Outdated Show resolved Hide resolved
assert.Error(t, err)
assert.ErrorContains(t, err, getResultStr(ClientMemoryBufferIsFull))

// wait all the chunks have been released
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better that we add producer.flush before retryAssert?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of DisableBatching=true, producer.flush here is useless and cause panic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The producer.flush is also useful when disabling the batching and using sendAsync. But I just find that we already send a message synchronously at line 2047. So we don't need flush the producer here.

and cause panic.

Why does it panic? Seems like an unexpected behavior?

@RobertIndie
Copy link
Member

Calculte the required resouce (semaphore/memory, when chunking, more than 1 semaphores, we cache the compressedPayload/meta in the sendRequest when Calculting) before we put a request into the dataChan, if there is no enough resource, fail fast, in this way, we can delete the sendRequest.blockCh field and no need to block;

When enabling the chunking, we cannot get the number of total chunks before pushing the request to the dataChan. And there may be a deadlock issue similar to apache/pulsar#17446

Add a sendRequest.done() method, when a request is done (succeed or failed), call it, release the resources a request holds, run the callback, report metrics, write debug logs in this method, in this way, we manage the resource/logic together and don't have to do these things across the whole file.

+1 for this. It's a good practice to manage the resource.

@Gleiphir2769
Copy link
Contributor Author

Gleiphir2769 commented Jul 14, 2023

Please rerun workflow for this pr, thx! cc @RobertIndie

@gunli
Copy link
Contributor

gunli commented Jul 14, 2023

When enabling the chunking, we cannot get the number of total chunks before pushing the request to the dataChan. And there may be a deadlock issue similar to apache/pulsar#17446

@RobertIndie I think it is possible to do that, the trade off is we have to call sheame.Encode() and p.compressionProvider.Compress() before entering the dataChan, that will affect performance of the go routine of the application. Or, add another channel and go routine to the producer (not partitionProducer) to do these preparing work.

@gunli
Copy link
Contributor

gunli commented Jul 14, 2023

@RobertIndie Would you please review #1049 , if it is OK, please merge it, this PR should rebase and update after that PR.

@RobertIndie
Copy link
Member

I think it is possible to do that, the trade off is we have to call sheame.Encode() and p.compressionProvider.Compress() before entering the dataChan, that will affect performance of the go routine of the application. Or, add another channel and go routine to the producer (not partitionProducer) to do these preparing work.

@gunli Yes. That would be a performance issue. But if we introduce another channel, we still need to wait for the channel and block the user goroutine. And it also introduces more complexity.

@gunli
Copy link
Contributor

gunli commented Jul 14, 2023

Yes. That would be a performance issue.

@RobertIndie I checked the code of compress Provider, there is a CompressMaxSize method in it, so I think compressing can be avoided, I know little about schema, if all the schemas can provide a method like that, all will be perfectly done.

And I checked the code of java client ProducerImpl.sendAsync(), it seems schema encoding and compressing are done in the user's/application's thread.

@Gleiphir2769
Copy link
Contributor Author

Ping @RobertIndie

@RobertIndie
Copy link
Member

And I checked the code of java client ProducerImpl.sendAsync(), it seems schema encoding and compressing are done in the user's/application's thread.

Thanks. @gunli I also found a bug related to this: #1057 The initial idea I came up with is to have the operation of pushing a message to the producer queue happen in the user thread. Just like the Java client did. Let's move this discussion into that issue(or a new issue if it's not related).

@Gleiphir2769 Gleiphir2769 force-pushed the fix_producer_mem_limit branch from e9eb93b to cf128d4 Compare July 18, 2023 11:21
@Gleiphir2769
Copy link
Contributor Author

I have rebased this branch to master. Please rerun the workflow. Thx! @RobertIndie

@gunli
Copy link
Contributor

gunli commented Jul 19, 2023

@RobertIndie would you please merge the latest PRs #1051 #1057 #1059 , we are eager to do the left refactoring work after these PRs :)

@RobertIndie RobertIndie merged commit 28f61d2 into apache:master Jul 20, 2023
RobertIndie pushed a commit that referenced this pull request Sep 7, 2023
… schema (#1055)

### Motivation

The producer memory limit have some problem when `EnableChunking=true` or `Schema` is set.
- When `Schema` is set, the actual message payload is `msg.Value`. The `len(msg.Payload)` may be 0 and memory can not be reserved acurate.
https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L479-L494

- In chunking, if producer meets the memory limit, it should release the memory for **chunks which has send out**. But the calculate for this release is not accurate, it should be `uncompressedPayloadSize - int64(lhs)` instead of `uncompressedPayloadSize - int64(rhs)`
https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L662-L664

- In chunking, if `internalSingleSend` is failed, it should release the memory for **single chunk**. But we release all the chunks memory repeatly now.
https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L838-L843

- When producer received the receipt from broker, it should release the memory **it reserved before sending**. But it releases wrong size in `chunking` and `schema`.
https://github.com/apache/pulsar-client-go/blob/be3574019383ac0cdc65fec63e422fcfd6c82e4b/pulsar/producer_partition.go#L1221-L1230

### Modifications

- Fix all the memory limit problems relative to `chunking` and `schema`
- Add unit tests to cover these scenarios

---------

Co-authored-by: shenjiaqi.2769 <[email protected]>
(cherry picked from commit 28f61d2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug][Producer] Inaccurate producer memory limit issue in chunking and schema
3 participants