Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Support memory limit for producer. #955

Merged
merged 5 commits into from
Mar 1, 2023

Conversation

shibd
Copy link
Member

@shibd shibd commented Feb 15, 2023

Master Issue: #927

Motivation

#927

Modifications

  • Added channel_cond class to enhance sync.cond to support wait that can accept context.
  • Added the memory_limit_controller class.
  • Producer adds memory limit relate logic.

Verifying this change

  • Added channel_cond_test to cover channel_cond
  • Added memory_limit_controller_test to cover mem_mory_limit_controller
  • Added TestMemLimitRejectProducerMessages and TestMemLimitContextCancel to cover producer relate logic.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (yes)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (GoDocs)

@shibd shibd self-assigned this Feb 15, 2023
@shibd shibd force-pushed the memlimit_producer branch 2 times, most recently from e152655 to 5da7a0e Compare February 21, 2023 09:19
@shibd shibd added this to the v0.10.0 milestone Feb 21, 2023
pulsar/internal/memory_limit_controller.go Outdated Show resolved Hide resolved
pulsar/internal/channel_cond_test.go Show resolved Hide resolved
pulsar/internal/memory_limit_controller_test.go Outdated Show resolved Hide resolved
pulsar/producer_partition.go Outdated Show resolved Hide resolved
pulsar/producer_test.go Outdated Show resolved Hide resolved
@shibd shibd force-pushed the memlimit_producer branch from 5da7a0e to a9823e5 Compare February 28, 2023 10:15
pulsar/internal/channel_cond.go Outdated Show resolved Hide resolved
Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I don't get why do we have to wrap a conditional variable? We can achieve the same goal with the channel itself, or just reuse the Semaphore here

@shibd
Copy link
Member Author

shibd commented Mar 1, 2023

And I don't get why do we have to wrap a conditional variable?

The native cond does not support timeout wait, and mainly wants to control the timeout or cancel the wait through context.

https://github.com/apache/pulsar-client-go/pull/955/files#diff-d4362cb0b0046577b68820bba2bde37b97ca5d7db7237e4e4b5eac54d7aa8bc3R89-R120

We can achieve the same goal with the channel itself,

Do you want mem_controller_limit all the methods into the channel first, and then select to read the signal and process it? Similar to #957 ackGroupingTracker ?

or just reuse the Semaphore here

This semaphore can only support acquire or release 1 signal.

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Mar 1, 2023

This semaphore can only support acquire or release 1 signal.

Yes. But we can adopt a similar way to handle this case.

For example, if we add a ch chan bool field to memoryLimitController, we can implement blocking ReserveMemory method like:

func (m *memoryLimitController) ReserveMemory(ctx context.Context, size int64) bool {
        // NOTE: maybe we need to check if m.currentUsage > m.limit first
        currentUsage := atomic.AddInt64(&m.currentUsage, size)
        for currentUsage > m.limit {
                select {
                case <-m.ch:
                        currentUsage = atomic.LoadInt64(&m.currentUsage)
                case <-ctx.Done(): // NOTE: Not sure if we need to reset some fields here
                        return false
                }
        }
        return true
}

func (m *memoryLimitController) ReleaseMemory(size int64) {
        newUsage := atomic.AddInt64(&m.currentUsage, -size)
        // newUsage+size > m.limit means m was blocked in ReserveMemory method
        if newUsage+size > m.limit && newUsage <= m.limit {
                m.ch <- true
        }
}

The code above is not verified yet. But with the channel the code looks more simple and clear.

@shibd
Copy link
Member Author

shibd commented Mar 1, 2023

This semaphore can only support acquire or release 1 signal.

Yes. But we can adopt a similar way to handle this case.

For example, if we add a ch chan bool field to memoryLimitController, we can implement blocking ReserveMemory method like:

func (m *memoryLimitController) ReserveMemory(ctx context.Context, size int64) bool {
        // NOTE: maybe we need to check if m.currentUsage > m.limit first
        currentUsage := atomic.AddInt64(&m.currentUsage, size)
        for currentUsage > m.limit {
                select {
                case <-m.ch:
                        currentUsage = atomic.LoadInt64(&m.currentUsage)
                case <-ctx.Done(): // NOTE: Not sure if we need to reset some fields here
                        return false
                }
        }
        return true
}

func (m *memoryLimitController) ReleaseMemory(size int64) {
        newUsage := atomic.AddInt64(&m.currentUsage, -size)
        // newUsage+size > m.limit means m was blocked in ReserveMemory method
        if newUsage+size > m.limit && newUsage <= m.limit {
                m.ch <- true
        }
}

The code above is not verified yet. But with the channel the code looks more simple and clear.

This implementation has a problem, and the broadcasting cannot be implemented using channels.

For example:

  1. currentUsage = 100, limit = 100
  2. goroutine 1 call ReseveMemory(10), it will blocked.
  3. goroutine 2 call ReseveMemory(10), it will blocked.
  4. goroutine 3 call ReleaseMemory(20), it only wake one goroutine(1 or 2). The expectation is that both are woken up and return true.

// NOTE: maybe we need to check if m.currentUsage > m.limit first

Like this note. If we want to handle this case, we may need to introduce a variable waitNum. So I feel like it's complicated to implement mem_controller_limit, and it's better to introduce channel_cond.

@BewareMyPower
Copy link
Contributor

If we want to handle this case, we may need to introduce a variable waitNum

I see. Currently let's go head with the conditional variable, but don't expose this struct to users.

@RobertIndie
Copy link
Member

Like this note. If we want to handle this case, we may need to introduce a variable waitNum. So I feel like it's complicated to implement mem_controller_limit, and it's better to introduce channel_cond.

@shibd For this case, another workaround is to close the channel instead of sending value. But it's actually what this PR does.

@shibd shibd force-pushed the memlimit_producer branch from a9823e5 to a0448be Compare March 1, 2023 06:28
@shibd shibd merged commit 5fa431d into apache:master Mar 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants