Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support nack backoff policy for SDK #660

Merged
merged 12 commits into from
Nov 8, 2021

Conversation

wolfstudy
Copy link
Member

@wolfstudy wolfstudy commented Nov 3, 2021

Signed-off-by: xiaolongran [email protected]

Master Issue: #658

Motivation

Support nack backoff policy

Modifications

  • Add EnableDefaultNackBackoffPolicy and NackBackoffPolicy options in ConsumerOptions.
  • Modify the original implementation of the Nack(Message) interface
  • Add test case
  • Add defaultNackBackoffPolicy

@wolfstudy wolfstudy self-assigned this Nov 3, 2021
@wolfstudy wolfstudy changed the title [WIP]Support nack backoff policy for SDK Support nack backoff policy for SDK Nov 4, 2021
@wolfstudy wolfstudy modified the milestones: 0.7.0, v0.8.0 Nov 4, 2021
Signed-off-by: xiaolongran <[email protected]>
Signed-off-by: xiaolongran <[email protected]>
Signed-off-by: xiaolongran <[email protected]>
@wolfstudy
Copy link
Member Author

==================
WARNING: DATA RACE
Read at 0x00c000158578 by goroutine 447:
  github.com/apache/pulsar-client-go/pulsar.(*negativeAcksTracker).track()
      /pulsar-client-go/pulsar/negative_acks_tracker.go:142 +0x7dc

Previous write at 0x00c000158578 by goroutine 291:
  github.com/apache/pulsar-client-go/pulsar.(*negativeAcksTracker).AddMessage()
      /pulsar-client-go/pulsar/negative_acks_tracker.go:103 +0x15e
  github.com/apache/pulsar-client-go/pulsar.TestNackBackoffTracker()
      /pulsar-client-go/pulsar/negative_acks_tracker_test.go:157 +0x124
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202

Goroutine 447 (running) created at:
  github.com/apache/pulsar-client-go/pulsar.(*negativeAcksTracker).AddMessage()
      /pulsar-client-go/pulsar/negative_acks_tracker.go:108 +0x4a4
  github.com/apache/pulsar-client-go/pulsar.TestNackBackoffTracker()
      /pulsar-client-go/pulsar/negative_acks_tracker_test.go:156 +0xeb
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202

Goroutine 291 (running) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:1168 +0x5bb
  testing.runTests.func1()
      /usr/local/go/src/testing/testing.go:1439 +0xa6
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202
  testing.runTests()
      /usr/local/go/src/testing/testing.go:1437 +0x612
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:1345 +0x3b3
  main.main()
      _testmain.go:467 +0x356
==================
==================
WARNING: DATA RACE
Read at 0x00c000a1ff40 by goroutine 447:
  github.com/apache/pulsar-client-go/pulsar.(*negativeAcksTracker).track()
      /pulsar-client-go/pulsar/negative_acks_tracker.go:142 +0x7fb

Previous write at 0x00c000a1ff40 by goroutine 291:
  time.NewTicker()
      /usr/local/go/src/time/tick.go:30 +0x9a
  github.com/apache/pulsar-client-go/pulsar.(*negativeAcksTracker).AddMessage()
      /pulsar-client-go/pulsar/negative_acks_tracker.go:103 +0x134
  github.com/apache/pulsar-client-go/pulsar.TestNackBackoffTracker()
      /pulsar-client-go/pulsar/negative_acks_tracker_test.go:157 +0x124
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202

Goroutine 447 (running) created at:
  github.com/apache/pulsar-client-go/pulsar.(*negativeAcksTracker).AddMessage()
      /pulsar-client-go/pulsar/negative_acks_tracker.go:108 +0x4a4
  github.com/apache/pulsar-client-go/pulsar.TestNackBackoffTracker()
      /pulsar-client-go/pulsar/negative_acks_tracker_test.go:156 +0xeb
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202

Goroutine 291 (running) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:1168 +0x5bb
  testing.runTests.func1()
      /usr/local/go/src/testing/testing.go:1439 +0xa6
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1123 +0x202
  testing.runTests()
      /usr/local/go/src/testing/testing.go:1437 +0x612
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:1345 +0x3b3
  main.main()
      _testmain.go:467 +0x356
==================
--- FAIL: TestNackBackoffTracker (0.40s)

Signed-off-by: xiaolongran <[email protected]>
Signed-off-by: xiaolongran <[email protected]>
Signed-off-by: xiaolongran <[email protected]>
Signed-off-by: xiaolongran <[email protected]>
Signed-off-by: xiaolongran <[email protected]>

// If enabled, the default implementation of NackBackoffPolicy will be used to calculate the delay time of
// nack backoff, Default: false.
EnableDefaultNackBackoffPolicy bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed EnableDefaultNackBackoffPolicy?. If the NackBackoffPolicy is not supplied we can just the default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no EnableDefaultNackBackoffPolicy, it will invade the existing code logic. When the NackBackoffPolicy policy is empty, suppose we use the default NackBackoffPolicy, then when the user uses the Nack(Message) interface, the new implementation will be used.

Copy link
Contributor

@cckellogg cckellogg Nov 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me a cleaner API is to just have NackBackoffPolicy and expose the basic/default policy. If the policy is not set than it uses the current behavior. This way there is only 1 configuration knob to worry about.

// current behavior
ConsumerOpts{}

// custom behavior
ConsumerOpts{
  NackBackoffPolicy: pulsar.NewExpNackBackoffPolicy(),
}

type defaultNackBackoffPolicy struct{}

func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
minNackTimeMs := int64(1000 * 10) // 10sec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the time.Duration constants

10 * time.Seconds
10 * time.Minutes

Copy link
Member Author

@wolfstudy wolfstudy Nov 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the << operation is required to cooperate with redeliveryCount, the unit conversion is still required, which will be converted to time.Duration in subsequent use.

return minNackTimeMs
}

return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments to what this logic is doing. For me it's difficult to look and just understand it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will add comment for this change

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we will first get the redeliveryCount object from the CommandMessage, and then start the << operation from minNackTimeMs to calculate the current length of time that nack needs to be executed, and then compare it with maxNackTimeMs, and take their maximum value as the nack duration. nack will increase from minNackTimeMs to maxNackTimeMs according to the above rule

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit let's make these constants

minNackTimeMs := int64(1000 * 30) // 30sec
maxNackTimeMs := 1000 * 60 * 10   // 10min

and please add some comments about how this int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs))) works

}
}
}

func (t *negativeAcksTracker) Close() {
// allow Close() to be invoked multiple times by consumer_partition to avoid panic
t.doneOnce.Do(func() {
t.tick.Stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the ticker getting cleanup now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current implementation situation, if we use the t.ticker in the struct, there will be a data race, so now we use the temporary variables of the ticker, and there is no good way to see how to close the temporarily created ticker.

func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
nackBackoffPolicy NackBackoffPolicy, logger log.Logger) *negativeAcksTracker {

t := new(negativeAcksTracker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can just be var t *negativeAcksTracker

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are the same effect

// the CommandMessage, so for the original default Nack() logic, we still keep the negativeAcksTracker created
// when we open a gorutine to execute the logic of `t.track()`. But for the NackBackoffPolicy method, we need
// to execute the logic of `t.track()` when AddMessage().
if nackBackoffPolicy != nil {
Copy link
Contributor

@cckellogg cckellogg Nov 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused on why we need an if statement. Shouldn't the default Implementation of the NackBackoffPolicy be what the current behavior is? The benefit of the interface is to simply the code and delegate to the implementation.

bp := nackBackoffPolicy
if bp == nil {
  bp = newDefaultBackoffPolicy(delay)
}
t = &negativeAcksTracker{
			doneCh:       make(chan interface{}),
			negativeAcks: make(map[messageID]time.Time),
			nackBackoff:  bp,
			rc:           rc,
			log:          logger,
		}

Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agree with your point of view. The problem here is because, for nackbackoff, we can't directly get the corresponding nackDelayTime, we need to get the redeliveryCount through the CommandMessage and then calculate the nackDelayTime, then we can determine the time.NewTicker based on the nackDelayTime. It is precisely because of such a relationship that the if statement is added

@@ -76,14 +95,48 @@ func (t *negativeAcksTracker) Add(msgID messageID) {
t.negativeAcks[batchMsgID] = targetTime
}

func (t *negativeAcksTracker) track() {
func (t *negativeAcksTracker) AddMessage(msg Message) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there a new method here?

Also, it looks like state is changing here without a lock. If multiple go routines call this at once multiple tracking routines could be started right?

Can the tracking go routine just be started at creation time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we need to get redeliveryCount through the Message interface

Signed-off-by: xiaolongran <[email protected]>
Signed-off-by: xiaolongran <[email protected]>
@wolfstudy wolfstudy merged commit 567263f into apache:master Nov 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants