Skip to content

Commit

Permalink
fix: avoid publishing watermarks for duplicate messages. (#1649)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Apr 8, 2024
1 parent 5f33361 commit 75195d5
Show file tree
Hide file tree
Showing 14 changed files with 226 additions and 161 deletions.
13 changes: 9 additions & 4 deletions pkg/isb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ package isb

import "fmt"

var (
BufferFullMessage = "Buffer full!"
DuplicateIDMessage = "Duplicate ID!"
)

// MessageWriteErr is associated with message write errors.
type MessageWriteErr struct {
Name string
Expand Down Expand Up @@ -97,12 +102,12 @@ func (e MessageReadErr) Error() string {
return fmt.Sprintf("(%s) %s Header: %s Body:%s", e.Name, e.Message, string(e.Header), string(e.Body))
}

// NoRetryableBufferWriteErr indicates that the buffer is full and the writer, based on user specification, decides to not retry.
type NoRetryableBufferWriteErr struct {
// NonRetryableBufferWriteErr indicates that the buffer is full and the writer, based on user specification, decides to not retry.
type NonRetryableBufferWriteErr struct {
Name string
Message string
}

func (e NoRetryableBufferWriteErr) Error() string {
return fmt.Sprintf("(%s) %s %#v", e.Name, e.Message, e)
func (e NonRetryableBufferWriteErr) Error() string {
return e.Message
}
7 changes: 0 additions & 7 deletions pkg/isb/stores/jetstream/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,3 @@ var isbAckTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
Help: "Processing times of acks for jetstream",
Buckets: prometheus.ExponentialBucketsRange(100, 60000000*2, 10),
}, []string{"buffer"})

// isbDedupCount is used to indicate the number of messages that are duplicate
var isbDedupCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "isb_jetstream",
Name: "dedup_total",
Help: "Total number of jetstream dedup",
}, []string{"buffer"})
30 changes: 20 additions & 10 deletions pkg/isb/stores/jetstream/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,12 @@ func (jw *jetStreamWriter) Write(ctx context.Context, messages []isb.Message) ([
// user explicitly wants to discard the message when buffer if full.
// return no retryable error as a callback to let caller know that the message is discarded.
for i := 0; i < len(errs); i++ {
errs[i] = isb.NoRetryableBufferWriteErr{Name: jw.name, Message: "Buffer full!"}
errs[i] = isb.NonRetryableBufferWriteErr{Name: jw.name, Message: isb.BufferFullMessage}
}
default:
// Default behavior is to return a BufferWriteErr.
for i := 0; i < len(errs); i++ {
errs[i] = isb.BufferWriteErr{Name: jw.name, Full: true, Message: "Buffer full!"}
errs[i] = isb.BufferWriteErr{Name: jw.name, Full: true, Message: isb.BufferFullMessage}
}
}
isbWriteErrors.With(labels).Inc()
Expand Down Expand Up @@ -222,9 +222,16 @@ func (jw *jetStreamWriter) asyncWrite(_ context.Context, messages []isb.Message,
defer wg.Done()
select {
case pubAck := <-fu.Ok():
writeOffsets[idx] = &writeOffset{seq: pubAck.Sequence, partitionIdx: jw.partitionIdx}
errs[idx] = nil
jw.log.Debugw("Succeeded to publish a message", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.Bool("duplicate", pubAck.Duplicate), zap.String("domain", pubAck.Domain))
if pubAck.Duplicate {
// If a message gets repeated, it will have the same offset number as the one before it.
// We shouldn't try to publish watermark on these repeated messages. Doing so would
// violate the principle of publishing watermarks to monotonically increasing offsets.
errs[idx] = isb.NonRetryableBufferWriteErr{Name: jw.name, Message: isb.DuplicateIDMessage}
} else {
writeOffsets[idx] = &writeOffset{seq: pubAck.Sequence, partitionIdx: jw.partitionIdx}
errs[idx] = nil
jw.log.Debugw("Succeeded to publish a message", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.Bool("duplicate", pubAck.Duplicate), zap.String("domain", pubAck.Domain))
}
case err := <-fu.Err():
errs[idx] = err
isbWriteErrors.With(metricsLabels).Inc()
Expand Down Expand Up @@ -278,13 +285,16 @@ func (jw *jetStreamWriter) syncWrite(_ context.Context, messages []isb.Message,
errs[idx] = err
isbWriteErrors.With(metricsLabels).Inc()
} else {
writeOffsets[idx] = &writeOffset{seq: pubAck.Sequence, partitionIdx: jw.partitionIdx}
errs[idx] = nil
if pubAck.Duplicate {
isbDedupCount.With(metricsLabels).Inc()
jw.log.Infow("Duplicate message detected", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.String("msgID", message.Header.ID), zap.String("domain", pubAck.Domain))
// If a message gets repeated, it will have the same offset number as the one before it.
// We shouldn't try to publish watermark on these repeated messages. Doing so would
// violate the principle of publishing watermarks to monotonically increasing offsets.
errs[idx] = isb.NonRetryableBufferWriteErr{Name: jw.name, Message: isb.DuplicateIDMessage}
} else {
writeOffsets[idx] = &writeOffset{seq: pubAck.Sequence, partitionIdx: jw.partitionIdx}
errs[idx] = nil
jw.log.Debugw("Succeeded to publish a message", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.Bool("duplicate", pubAck.Duplicate), zap.String("msgID", message.Header.ID), zap.String("domain", pubAck.Domain))
}
jw.log.Debugw("Succeeded to publish a message", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.Bool("duplicate", pubAck.Duplicate), zap.String("msgID", message.Header.ID), zap.String("domain", pubAck.Domain))
}
}(msg, index)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/isb/stores/jetstream/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func TestJetStreamBufferWriterBufferFull_DiscardLatest(t *testing.T) {
_, errs = jw.Write(ctx, messages)
assert.Equal(t, len(errs), 2)
for _, errMsg := range errs {
assert.Equal(t, errMsg, isb.NoRetryableBufferWriteErr{Name: streamName, Message: "Buffer full!"})
assert.Equal(t, errMsg, isb.NonRetryableBufferWriteErr{Name: streamName, Message: isb.BufferFullMessage})
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/isb/stores/redis/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,10 @@ func (bw *BufferWrite) Write(_ context.Context, messages []isb.Message) ([]isb.O
case dfv1.DiscardLatest:
// user explicitly wants to discard the message when buffer if full.
// return no retryable error as a callback to let caller know that the message is discarded.
initializeErrorArray(errs, isb.NoRetryableBufferWriteErr{Name: bw.Name, Message: "Buffer full!"})
initializeErrorArray(errs, isb.NonRetryableBufferWriteErr{Name: bw.Name, Message: isb.BufferFullMessage})
default:
// Default behavior is to return a BufferWriteErr.
initializeErrorArray(errs, isb.BufferWriteErr{Name: bw.Name, Full: true, Message: "Buffer full!"})
initializeErrorArray(errs, isb.BufferWriteErr{Name: bw.Name, Full: true, Message: isb.BufferFullMessage})
}
isbWriteErrors.With(labels).Inc()
return nil, errs
Expand Down
10 changes: 5 additions & 5 deletions pkg/isb/stores/redis/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestRedisQWrite_WithInfoRefreshInterval(t *testing.T) {

// assert the actual error that the buffer is full
for _, err := range errs {
assert.Equal(t, err, isb.BufferWriteErr{Name: stream, Full: true, Message: "Buffer full!"})
assert.Equal(t, err, isb.BufferWriteErr{Name: stream, Full: true, Message: isb.BufferFullMessage})
}
}

Expand Down Expand Up @@ -212,9 +212,9 @@ func TestRedisQWrite_WithInfoRefreshInterval_WithBufferFullWritingStrategyIsDisc
defer func() { _ = client.DeleteKeys(ctx, internalKeys...) }()
_, errs = rqw.Write(ctx, writeMessages)

// assert the NoRetryableBufferWriteErr
// assert the NonRetryableBufferWriteErr
for _, err := range errs {
assert.Equal(t, err, isb.NoRetryableBufferWriteErr{Name: stream, Message: "Buffer full!"})
assert.Equal(t, err, isb.NonRetryableBufferWriteErr{Name: stream, Message: isb.BufferFullMessage})
}
}

Expand Down Expand Up @@ -309,7 +309,7 @@ func Test_updateIsFullFlag(t *testing.T) {

// assert the actual error that the buffer is full
for _, err := range errs {
assert.Equal(t, err, isb.BufferWriteErr{Name: stream, Full: true, Message: "Buffer full!"})
assert.Equal(t, err, isb.BufferWriteErr{Name: stream, Full: true, Message: isb.BufferFullMessage})
}
}

Expand Down Expand Up @@ -510,7 +510,7 @@ func TestXTrimOnIsFull(t *testing.T) {
// Buffer is full at this point so write will fail with errors because of usage limit
_, errs := rqw.Write(ctx, messages)
for _, err := range errs {
assert.Equal(t, err, isb.BufferWriteErr{Name: buffer, Full: true, Message: "Buffer full!"})
assert.Equal(t, err, isb.BufferWriteErr{Name: buffer, Full: true, Message: isb.BufferFullMessage})
}

// Read all the messages.
Expand Down
4 changes: 2 additions & 2 deletions pkg/isb/stores/simplebuffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ func (b *InMemoryBuffer) Write(_ context.Context, messages []isb.Message) ([]isb
} else {
switch b.options.bufferFullWritingStrategy {
case v1alpha1.DiscardLatest:
errs[idx] = isb.NoRetryableBufferWriteErr{Name: b.name, Message: "Buffer full!"}
errs[idx] = isb.NonRetryableBufferWriteErr{Name: b.name, Message: isb.BufferFullMessage}
default:
errs[idx] = isb.BufferWriteErr{Name: b.name, Full: true, Message: "Buffer full!"}
errs[idx] = isb.BufferWriteErr{Name: b.name, Full: true, Message: isb.BufferFullMessage}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/isb/stores/simplebuffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestNewSimpleBuffer(t *testing.T) {

// try to write 3 messages and it should fail (we have only space for 2)
_, errs3 := sb.Write(ctx, writeMessages[0:3])
assert.EqualValues(t, []error{nil, nil, isb.BufferWriteErr{Name: "test", Full: true, Message: "Buffer full!"}}, errs3)
assert.EqualValues(t, []error{nil, nil, isb.BufferWriteErr{Name: "test", Full: true, Message: isb.BufferFullMessage}}, errs3)

// let's read some more
readMessages, err = sb.Read(ctx, 2)
Expand All @@ -95,7 +95,7 @@ func TestNewSimpleBuffer_BufferFullWritingStrategyIsDiscard(t *testing.T) {
_, errors := sb.Write(ctx, writeMessages[0:3])
assert.NoError(t, errors[0])
assert.NoError(t, errors[1])
assert.EqualValues(t, []error{nil, nil, isb.NoRetryableBufferWriteErr{Name: "test", Message: "Buffer full!"}}, errors)
assert.EqualValues(t, []error{nil, nil, isb.NonRetryableBufferWriteErr{Name: "test", Message: isb.BufferFullMessage}}, errors)

// still full as we did not ack
assert.Equal(t, true, sb.IsFull())
Expand Down
4 changes: 2 additions & 2 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ var (
Subsystem: "forwarder",
Name: "drop_total",
Help: "Total number of Messages Dropped",
}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})
}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName, LabelReason})

// DropBytesCount is to indicate the number of bytes dropped
DropBytesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "forwarder",
Name: "drop_bytes_total",
Help: "Total number of Bytes Dropped",
}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})
}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName, LabelReason})

// AckMessagesCount is used to indicate the number of messages acknowledged
AckMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Expand Down
Loading

0 comments on commit 75195d5

Please sign in to comment.