-
Notifications
You must be signed in to change notification settings - Fork 344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DefaultRouter] fix unnecessary system clock reads due to races accessing router state #694
[DefaultRouter] fix unnecessary system clock reads due to races accessing router state #694
Conversation
4056ea7
to
4860648
Compare
Indeed, however #684 introduces a mutex on the publish path which has a negative impact on performance, especially under load from multiple concurrent go-routines. A parallel DefaultRouter bench test was added to quantify the performance impact in #693 For details, see my comment on #684 here: #684 (comment) |
pulsar/default_router.go
Outdated
|
||
messageCountReached := previousMessageCount >= uint32(maxBatchingMessages-1) | ||
sizeReached := (size >= uint32(maxBatchingSize)-previousBatchingMaxSize) | ||
messageCountReached := messageCount%uint32(maxBatchingMessages) == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be messageCount >= uint32(maxBatchingMessages)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cckellogg ,
The initial approach taken in this commit was to only modify the messageCount
in the router state by using an atomic increment in order to avoid races between when the counter is modified (on a new message) and when the counter is reset (when the batch is completed). By doing this the count is now a running total of all messages that have passed through the router, so we use modulo arithmetic to determine when the maxBatchingMessages
has been reached.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Upon further reflection, I think the use of a running total may have introduced a regression whereby we switch partitions prematurely. The behaviour we want is documented in the comment on L74:
// ...sticking with a given partition for a certain amount of messages or volume buffered or the max delay to
// batch is reached so that we ensure having a decent amount of batching of the messages.
If the partition switch is triggered by either size or delay, the messages routed to the previous partition will be included in the count of messages being routed to the new partition as the message count is not reset on switch.
I'm thinking that we can safely reset the message count on partition switch. The use of atomic increment on the message count will still effectively latch the if
expression that protects the clock read on L88.
I'll make this change, gather bench results, and report back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated parellel bench results before and after this commit:
name old time/op new time/op delta
DefaultRouterParallel 14.7ns ± 1% 14.9ns ± 6% ~ (p=0.248 n=9+9)
DefaultRouterParallel-2 55.0ns ±13% 33.7ns ± 8% -38.71% (p=0.000 n=10+9)
DefaultRouterParallel-4 53.5ns ± 9% 28.8ns ± 4% -46.13% (p=0.000 n=10+9)
DefaultRouterParallel-8 54.2ns ± 8% 36.3ns ± 1% -33.04% (p=0.000 n=10+8)
DefaultRouterParallel-16 56.4ns ±21% 39.5ns ±21% -29.98% (p=0.000 n=10+10)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made a refinement that uses compare-and-swap to keep multiple go-routines racing to increment the partition cursor which was causing partitions to be skipped. This has the side-effect that only one of the racing go-routines will read the system clock to record the time the last batch was completed. Results before and after this commit are below:
name old time/op new time/op delta
DefaultRouterParallel 14.7ns ± 1% 18.5ns ± 4% +25.88% (p=0.000 n=9+9)
DefaultRouterParallel-2 55.0ns ±13% 43.5ns ± 2% -20.93% (p=0.000 n=10+8)
DefaultRouterParallel-4 53.5ns ± 9% 40.0ns ±11% -25.29% (p=0.000 n=10+10)
DefaultRouterParallel-8 54.2ns ± 8% 41.9ns ± 5% -22.68% (p=0.000 n=10+10)
DefaultRouterParallel-16 56.4ns ±21% 41.7ns ±22% -25.98% (p=0.000 n=10+10)
…sing router state Previously, we used atomic operations to read and update parts of the default router state. Unfortunately, the reads and updates could race under concurrent calls leading to unnecessary clock reads and an associated slowdown in performance. Now, we use atomic addition to increment the message count and batch size. This removes the race condition by ensuring that each go-routine will have a unique messageCount, and hence only one will perform the clock read. Furthermore, we use atomic compare-and-swap to ensure that partitions are not skipped if multiple go-routines attempt to increment the partition cursor. Signed-off-by: Daniel Ferstay <[email protected]>
a659bfb
to
1d8a0d3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
Motivation
Previously, we used atomic operations to read and update parts of the
default router state. Unfortunately, the reads and updates could race
under concurrent calls which leads to unnecessary clock reads and an
associated slowdown in performance.
Modifications
Now, we use atomic addition to increment the message count and batch size.
This removes the race condition by ensuring that each go-routine will have
a unique messageCount, and hence only one will perform the clock read.
Verifying this change
Run the default router unit tests to verify correctness.
Run the default router bench parallel bench tests (#693) and verify the performance speedup; results before and after below:
The large variance in the
DefaultRouterParallel-2
andDefaultRouterParallel-16
old test results is due to the nature of the race described above; with some test runs reading the system clock more than others.Does this pull request potentially affect one of the following parts:
Documentation