Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid publishing watermarks for duplicate messages. #1649

Merged
merged 3 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions pkg/isb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ package isb

import "fmt"

var (
BufferFullMessage = "Buffer full!"
DuplicateIDMessage = "Duplicate ID!"
)

// MessageWriteErr is associated with message write errors.
type MessageWriteErr struct {
Name string
Expand Down Expand Up @@ -97,12 +102,12 @@ func (e MessageReadErr) Error() string {
return fmt.Sprintf("(%s) %s Header: %s Body:%s", e.Name, e.Message, string(e.Header), string(e.Body))
}

// NoRetryableBufferWriteErr indicates that the buffer is full and the writer, based on user specification, decides to not retry.
type NoRetryableBufferWriteErr struct {
// NonRetryableBufferWriteErr indicates that the buffer is full and the writer, based on user specification, decides to not retry.
type NonRetryableBufferWriteErr struct {
Name string
Message string
}

func (e NoRetryableBufferWriteErr) Error() string {
return fmt.Sprintf("(%s) %s %#v", e.Name, e.Message, e)
func (e NonRetryableBufferWriteErr) Error() string {
return e.Message
}
7 changes: 0 additions & 7 deletions pkg/isb/stores/jetstream/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,3 @@ var isbAckTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
Help: "Processing times of acks for jetstream",
Buckets: prometheus.ExponentialBucketsRange(100, 60000000*2, 10),
}, []string{"buffer"})

// isbDedupCount is used to indicate the number of messages that are duplicate
var isbDedupCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "isb_jetstream",
Name: "dedup_total",
Help: "Total number of jetstream dedup",
}, []string{"buffer"})
30 changes: 20 additions & 10 deletions pkg/isb/stores/jetstream/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,12 @@ func (jw *jetStreamWriter) Write(ctx context.Context, messages []isb.Message) ([
// user explicitly wants to discard the message when buffer if full.
// return no retryable error as a callback to let caller know that the message is discarded.
for i := 0; i < len(errs); i++ {
errs[i] = isb.NoRetryableBufferWriteErr{Name: jw.name, Message: "Buffer full!"}
errs[i] = isb.NonRetryableBufferWriteErr{Name: jw.name, Message: isb.BufferFullMessage}
}
default:
// Default behavior is to return a BufferWriteErr.
for i := 0; i < len(errs); i++ {
errs[i] = isb.BufferWriteErr{Name: jw.name, Full: true, Message: "Buffer full!"}
errs[i] = isb.BufferWriteErr{Name: jw.name, Full: true, Message: isb.BufferFullMessage}
}
}
isbWriteErrors.With(labels).Inc()
Expand Down Expand Up @@ -222,9 +222,16 @@ func (jw *jetStreamWriter) asyncWrite(_ context.Context, messages []isb.Message,
defer wg.Done()
select {
case pubAck := <-fu.Ok():
writeOffsets[idx] = &writeOffset{seq: pubAck.Sequence, partitionIdx: jw.partitionIdx}
errs[idx] = nil
jw.log.Debugw("Succeeded to publish a message", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.Bool("duplicate", pubAck.Duplicate), zap.String("domain", pubAck.Domain))
if pubAck.Duplicate {
// If a message gets repeated, it will have the same offset number as the one before it.
// We shouldn't try to publish watermark on these repeated messages. Doing so would
// violate the principle of publishing watermarks to monotonically increasing offsets.
errs[idx] = isb.NonRetryableBufferWriteErr{Name: jw.name, Message: isb.DuplicateIDMessage}
} else {
writeOffsets[idx] = &writeOffset{seq: pubAck.Sequence, partitionIdx: jw.partitionIdx}
errs[idx] = nil
jw.log.Debugw("Succeeded to publish a message", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.Bool("duplicate", pubAck.Duplicate), zap.String("domain", pubAck.Domain))
}
case err := <-fu.Err():
errs[idx] = err
isbWriteErrors.With(metricsLabels).Inc()
Expand Down Expand Up @@ -278,13 +285,16 @@ func (jw *jetStreamWriter) syncWrite(_ context.Context, messages []isb.Message,
errs[idx] = err
isbWriteErrors.With(metricsLabels).Inc()
} else {
writeOffsets[idx] = &writeOffset{seq: pubAck.Sequence, partitionIdx: jw.partitionIdx}
errs[idx] = nil
if pubAck.Duplicate {
isbDedupCount.With(metricsLabels).Inc()
jw.log.Infow("Duplicate message detected", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.String("msgID", message.Header.ID), zap.String("domain", pubAck.Domain))
// If a message gets repeated, it will have the same offset number as the one before it.
// We shouldn't try to publish watermark on these repeated messages. Doing so would
// violate the principle of publishing watermarks to monotonically increasing offsets.
errs[idx] = isb.NonRetryableBufferWriteErr{Name: jw.name, Message: isb.DuplicateIDMessage}
} else {
writeOffsets[idx] = &writeOffset{seq: pubAck.Sequence, partitionIdx: jw.partitionIdx}
errs[idx] = nil
jw.log.Debugw("Succeeded to publish a message", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.Bool("duplicate", pubAck.Duplicate), zap.String("msgID", message.Header.ID), zap.String("domain", pubAck.Domain))
}
jw.log.Debugw("Succeeded to publish a message", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.Bool("duplicate", pubAck.Duplicate), zap.String("msgID", message.Header.ID), zap.String("domain", pubAck.Domain))
}
}(msg, index)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/isb/stores/jetstream/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func TestJetStreamBufferWriterBufferFull_DiscardLatest(t *testing.T) {
_, errs = jw.Write(ctx, messages)
assert.Equal(t, len(errs), 2)
for _, errMsg := range errs {
assert.Equal(t, errMsg, isb.NoRetryableBufferWriteErr{Name: streamName, Message: "Buffer full!"})
assert.Equal(t, errMsg, isb.NonRetryableBufferWriteErr{Name: streamName, Message: isb.BufferFullMessage})
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/isb/stores/redis/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,10 @@ func (bw *BufferWrite) Write(_ context.Context, messages []isb.Message) ([]isb.O
case dfv1.DiscardLatest:
// user explicitly wants to discard the message when buffer if full.
// return no retryable error as a callback to let caller know that the message is discarded.
initializeErrorArray(errs, isb.NoRetryableBufferWriteErr{Name: bw.Name, Message: "Buffer full!"})
initializeErrorArray(errs, isb.NonRetryableBufferWriteErr{Name: bw.Name, Message: isb.BufferFullMessage})
default:
// Default behavior is to return a BufferWriteErr.
initializeErrorArray(errs, isb.BufferWriteErr{Name: bw.Name, Full: true, Message: "Buffer full!"})
initializeErrorArray(errs, isb.BufferWriteErr{Name: bw.Name, Full: true, Message: isb.BufferFullMessage})
}
isbWriteErrors.With(labels).Inc()
return nil, errs
Expand Down
10 changes: 5 additions & 5 deletions pkg/isb/stores/redis/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestRedisQWrite_WithInfoRefreshInterval(t *testing.T) {

// assert the actual error that the buffer is full
for _, err := range errs {
assert.Equal(t, err, isb.BufferWriteErr{Name: stream, Full: true, Message: "Buffer full!"})
assert.Equal(t, err, isb.BufferWriteErr{Name: stream, Full: true, Message: isb.BufferFullMessage})
}
}

Expand Down Expand Up @@ -212,9 +212,9 @@ func TestRedisQWrite_WithInfoRefreshInterval_WithBufferFullWritingStrategyIsDisc
defer func() { _ = client.DeleteKeys(ctx, internalKeys...) }()
_, errs = rqw.Write(ctx, writeMessages)

// assert the NoRetryableBufferWriteErr
// assert the NonRetryableBufferWriteErr
for _, err := range errs {
assert.Equal(t, err, isb.NoRetryableBufferWriteErr{Name: stream, Message: "Buffer full!"})
assert.Equal(t, err, isb.NonRetryableBufferWriteErr{Name: stream, Message: isb.BufferFullMessage})
}
}

Expand Down Expand Up @@ -309,7 +309,7 @@ func Test_updateIsFullFlag(t *testing.T) {

// assert the actual error that the buffer is full
for _, err := range errs {
assert.Equal(t, err, isb.BufferWriteErr{Name: stream, Full: true, Message: "Buffer full!"})
assert.Equal(t, err, isb.BufferWriteErr{Name: stream, Full: true, Message: isb.BufferFullMessage})
}
}

Expand Down Expand Up @@ -510,7 +510,7 @@ func TestXTrimOnIsFull(t *testing.T) {
// Buffer is full at this point so write will fail with errors because of usage limit
_, errs := rqw.Write(ctx, messages)
for _, err := range errs {
assert.Equal(t, err, isb.BufferWriteErr{Name: buffer, Full: true, Message: "Buffer full!"})
assert.Equal(t, err, isb.BufferWriteErr{Name: buffer, Full: true, Message: isb.BufferFullMessage})
}

// Read all the messages.
Expand Down
4 changes: 2 additions & 2 deletions pkg/isb/stores/simplebuffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ func (b *InMemoryBuffer) Write(_ context.Context, messages []isb.Message) ([]isb
} else {
switch b.options.bufferFullWritingStrategy {
case v1alpha1.DiscardLatest:
errs[idx] = isb.NoRetryableBufferWriteErr{Name: b.name, Message: "Buffer full!"}
errs[idx] = isb.NonRetryableBufferWriteErr{Name: b.name, Message: isb.BufferFullMessage}
default:
errs[idx] = isb.BufferWriteErr{Name: b.name, Full: true, Message: "Buffer full!"}
errs[idx] = isb.BufferWriteErr{Name: b.name, Full: true, Message: isb.BufferFullMessage}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/isb/stores/simplebuffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestNewSimpleBuffer(t *testing.T) {

// try to write 3 messages and it should fail (we have only space for 2)
_, errs3 := sb.Write(ctx, writeMessages[0:3])
assert.EqualValues(t, []error{nil, nil, isb.BufferWriteErr{Name: "test", Full: true, Message: "Buffer full!"}}, errs3)
assert.EqualValues(t, []error{nil, nil, isb.BufferWriteErr{Name: "test", Full: true, Message: isb.BufferFullMessage}}, errs3)

// let's read some more
readMessages, err = sb.Read(ctx, 2)
Expand All @@ -95,7 +95,7 @@ func TestNewSimpleBuffer_BufferFullWritingStrategyIsDiscard(t *testing.T) {
_, errors := sb.Write(ctx, writeMessages[0:3])
assert.NoError(t, errors[0])
assert.NoError(t, errors[1])
assert.EqualValues(t, []error{nil, nil, isb.NoRetryableBufferWriteErr{Name: "test", Message: "Buffer full!"}}, errors)
assert.EqualValues(t, []error{nil, nil, isb.NonRetryableBufferWriteErr{Name: "test", Message: isb.BufferFullMessage}}, errors)

// still full as we did not ack
assert.Equal(t, true, sb.IsFull())
Expand Down
4 changes: 2 additions & 2 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ var (
Subsystem: "forwarder",
Name: "drop_total",
Help: "Total number of Messages Dropped",
}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})
}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName, LabelReason})

// DropBytesCount is to indicate the number of bytes dropped
DropBytesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "forwarder",
Name: "drop_bytes_total",
Help: "Total number of Bytes Dropped",
}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName})
}, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName, LabelReason})

// AckMessagesCount is used to indicate the number of messages acknowledged
AckMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{
Expand Down
Loading
Loading