-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(v2.11) [IMPROVED] stream sourcing: wq/interest stream with limits #5774
base: main
Are you sure you want to change the base?
Conversation
e7b556a
to
25985e7
Compare
clseq uint64 // The current last seq being proposed to the NRG layer. | ||
clfs uint64 // The count (offset) of the number of failed NRG sequences used to compute clseq. | ||
inflight map[uint64]uint64 // Inflight message sizes per clseq. | ||
inflightSubjects map[string]uint64 // Inflight number of messages per subject. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could get pretty big in terms of in memory state..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assumed that because it's only storing in flight messages, that the size would be 0 most of the time and go maybe up to a few thousands temporarily during message bursts, so still acceptable IMHO.
I did hesitate adding the support for new per subject to the PR because I couldn't think of a good use case off the top of my head (doesn't mean one doesn't exist), but it needed to be handled and I thought it would be better to actually implement it rather than try to prevent it (which is not that easy/clean to do because what you want to forbid specifically is discard new+new per subject+sourcing (and you can add/remove sources at any time)).
@@ -2983,6 +2983,17 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco | |||
mset.clMu.Unlock() | |||
} | |||
|
|||
if mset.inflightSubjects != nil { | |||
mset.clMu.Lock() | |||
n := mset.inflightSubjects[subject] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always tracking these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or only for discard new per subject?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only for discard new + discard new per subject for wq or interest streams (see line#7932)
// Track inflight. | ||
if mset.inflight == nil { | ||
mset.inflight = make(map[uint64]uint64) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove nl?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can not as that's how go fmt wants it
if mset.inflightSubjects == nil { | ||
mset.inflightSubjects = make(map[string]uint64) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove nl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can not remove newline as go fmt puts it back
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really? It does not for me..
@@ -7944,9 +7964,24 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ | |||
if state.Bytes+bytesPending > uint64(maxBytes) { | |||
err = ErrMaxBytes | |||
} | |||
} else if maxMsgsPer > 0 && discardNewPerSubject { | |||
totals := mset.store.SubjectsTotals(subject) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be very large.. We are getting this each time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only for wq/interest + discard new + discard new per subject streams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be very slow based on the shape of the stream. This concerns me. As does tracking inflight.
You can have a (typically wq) stream that is sourcing have a discard new policy (so you can limit its size) was working only for the max messages limit, now is also working for the max bytes limit. Signed-off-by: Jean-Noël Moyne <[email protected]>
You can have a wq or interest stream that is sourcing have a discard new policy (so you can limit its size) was working only for the max messages and max bytes limits, now is also working for the max messages per subjects limit when discard new per subject is set. Signed-off-by: Jean-Noël Moyne <[email protected]>
25985e7
to
8221369
Compare
You can have a wq/interest stream that is sourcing have a discard new policy (so you can limit its size) was working only for the max messages limit, now is also working for the max bytes limit and for max messages per subject (and discard new messages per subject)