From fa93ac80e3239c7bb29787a144e8a2547e3194b9 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Sun, 7 Apr 2024 12:18:52 +0530 Subject: [PATCH 1/3] avoid publishing watermarks for duplicate messages Signed-off-by: Yashash H L --- pkg/isb/errors.go | 5 + pkg/isb/stores/jetstream/metrics.go | 7 - pkg/isb/stores/jetstream/writer.go | 30 ++- pkg/isb/stores/jetstream/writer_test.go | 2 +- pkg/isb/stores/redis/write.go | 4 +- pkg/isb/stores/redis/write_test.go | 8 +- pkg/isb/stores/simplebuffer/buffer.go | 4 +- pkg/isb/stores/simplebuffer/buffer_test.go | 4 +- pkg/metrics/metrics.go | 4 +- pkg/reduce/pnf/pnf.go | 212 +++++++++++---------- pkg/sinks/forward/forward.go | 29 ++- pkg/sources/forward/data_forward.go | 29 ++- pkg/udf/forward/forward.go | 29 ++- 13 files changed, 216 insertions(+), 151 deletions(-) diff --git a/pkg/isb/errors.go b/pkg/isb/errors.go index c30cbd39f6..c272f6bb5a 100644 --- a/pkg/isb/errors.go +++ b/pkg/isb/errors.go @@ -18,6 +18,11 @@ package isb import "fmt" +var ( + BufferFullMessage = "Buffer full!" + DuplicateIDMessage = "Duplicate ID!" +) + // MessageWriteErr is associated with message write errors. type MessageWriteErr struct { Name string diff --git a/pkg/isb/stores/jetstream/metrics.go b/pkg/isb/stores/jetstream/metrics.go index 13480c6dd2..d9558ff365 100644 --- a/pkg/isb/stores/jetstream/metrics.go +++ b/pkg/isb/stores/jetstream/metrics.go @@ -107,10 +107,3 @@ var isbAckTime = promauto.NewHistogramVec(prometheus.HistogramOpts{ Help: "Processing times of acks for jetstream", Buckets: prometheus.ExponentialBucketsRange(100, 60000000*2, 10), }, []string{"buffer"}) - -// isbDedupCount is used to indicate the number of messages that are duplicate -var isbDedupCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Subsystem: "isb_jetstream", - Name: "dedup_total", - Help: "Total number of jetstream dedup", -}, []string{"buffer"}) diff --git a/pkg/isb/stores/jetstream/writer.go b/pkg/isb/stores/jetstream/writer.go index bde352dd39..a2c52bbc6a 100644 --- a/pkg/isb/stores/jetstream/writer.go +++ b/pkg/isb/stores/jetstream/writer.go @@ -166,12 +166,12 @@ func (jw *jetStreamWriter) Write(ctx context.Context, messages []isb.Message) ([ // user explicitly wants to discard the message when buffer if full. // return no retryable error as a callback to let caller know that the message is discarded. for i := 0; i < len(errs); i++ { - errs[i] = isb.NoRetryableBufferWriteErr{Name: jw.name, Message: "Buffer full!"} + errs[i] = isb.NoRetryableBufferWriteErr{Name: jw.name, Message: isb.BufferFullMessage} } default: // Default behavior is to return a BufferWriteErr. for i := 0; i < len(errs); i++ { - errs[i] = isb.BufferWriteErr{Name: jw.name, Full: true, Message: "Buffer full!"} + errs[i] = isb.BufferWriteErr{Name: jw.name, Full: true, Message: isb.BufferFullMessage} } } isbWriteErrors.With(labels).Inc() @@ -222,9 +222,16 @@ func (jw *jetStreamWriter) asyncWrite(_ context.Context, messages []isb.Message, defer wg.Done() select { case pubAck := <-fu.Ok(): - writeOffsets[idx] = &writeOffset{seq: pubAck.Sequence, partitionIdx: jw.partitionIdx} - errs[idx] = nil - jw.log.Debugw("Succeeded to publish a message", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.Bool("duplicate", pubAck.Duplicate), zap.String("domain", pubAck.Domain)) + if pubAck.Duplicate { + // If a message gets repeated, it will have the same offset number as the one before it. + // We shouldn't try to publish watermark on these repeated messages. Doing so would + // violate the principle of publishing watermarks to monotonically increasing offsets. + errs[idx] = isb.NoRetryableBufferWriteErr{Name: jw.name, Message: isb.DuplicateIDMessage} + } else { + writeOffsets[idx] = &writeOffset{seq: pubAck.Sequence, partitionIdx: jw.partitionIdx} + errs[idx] = nil + jw.log.Debugw("Succeeded to publish a message", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.Bool("duplicate", pubAck.Duplicate), zap.String("domain", pubAck.Domain)) + } case err := <-fu.Err(): errs[idx] = err isbWriteErrors.With(metricsLabels).Inc() @@ -278,13 +285,16 @@ func (jw *jetStreamWriter) syncWrite(_ context.Context, messages []isb.Message, errs[idx] = err isbWriteErrors.With(metricsLabels).Inc() } else { - writeOffsets[idx] = &writeOffset{seq: pubAck.Sequence, partitionIdx: jw.partitionIdx} - errs[idx] = nil if pubAck.Duplicate { - isbDedupCount.With(metricsLabels).Inc() - jw.log.Infow("Duplicate message detected", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.String("msgID", message.Header.ID), zap.String("domain", pubAck.Domain)) + // If a message gets repeated, it will have the same offset number as the one before it. + // We shouldn't try to publish watermark on these repeated messages. Doing so would + // violate the principle of publishing watermarks to monotonically increasing offsets. + errs[idx] = isb.NoRetryableBufferWriteErr{Name: jw.name, Message: isb.DuplicateIDMessage} + } else { + writeOffsets[idx] = &writeOffset{seq: pubAck.Sequence, partitionIdx: jw.partitionIdx} + errs[idx] = nil + jw.log.Debugw("Succeeded to publish a message", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.Bool("duplicate", pubAck.Duplicate), zap.String("msgID", message.Header.ID), zap.String("domain", pubAck.Domain)) } - jw.log.Debugw("Succeeded to publish a message", zap.String("stream", pubAck.Stream), zap.Any("seq", pubAck.Sequence), zap.Bool("duplicate", pubAck.Duplicate), zap.String("msgID", message.Header.ID), zap.String("domain", pubAck.Domain)) } }(msg, index) } diff --git a/pkg/isb/stores/jetstream/writer_test.go b/pkg/isb/stores/jetstream/writer_test.go index a0db37b9cd..9be379e860 100644 --- a/pkg/isb/stores/jetstream/writer_test.go +++ b/pkg/isb/stores/jetstream/writer_test.go @@ -300,7 +300,7 @@ func TestJetStreamBufferWriterBufferFull_DiscardLatest(t *testing.T) { _, errs = jw.Write(ctx, messages) assert.Equal(t, len(errs), 2) for _, errMsg := range errs { - assert.Equal(t, errMsg, isb.NoRetryableBufferWriteErr{Name: streamName, Message: "Buffer full!"}) + assert.Equal(t, errMsg, isb.NoRetryableBufferWriteErr{Name: streamName, Message: isb.BufferFullMessage}) } } diff --git a/pkg/isb/stores/redis/write.go b/pkg/isb/stores/redis/write.go index 448b90d157..2a8fa68338 100644 --- a/pkg/isb/stores/redis/write.go +++ b/pkg/isb/stores/redis/write.go @@ -167,10 +167,10 @@ func (bw *BufferWrite) Write(_ context.Context, messages []isb.Message) ([]isb.O case dfv1.DiscardLatest: // user explicitly wants to discard the message when buffer if full. // return no retryable error as a callback to let caller know that the message is discarded. - initializeErrorArray(errs, isb.NoRetryableBufferWriteErr{Name: bw.Name, Message: "Buffer full!"}) + initializeErrorArray(errs, isb.NoRetryableBufferWriteErr{Name: bw.Name, Message: isb.BufferFullMessage}) default: // Default behavior is to return a BufferWriteErr. - initializeErrorArray(errs, isb.BufferWriteErr{Name: bw.Name, Full: true, Message: "Buffer full!"}) + initializeErrorArray(errs, isb.BufferWriteErr{Name: bw.Name, Full: true, Message: isb.BufferFullMessage}) } isbWriteErrors.With(labels).Inc() return nil, errs diff --git a/pkg/isb/stores/redis/write_test.go b/pkg/isb/stores/redis/write_test.go index 38411b521a..e8391dd84a 100644 --- a/pkg/isb/stores/redis/write_test.go +++ b/pkg/isb/stores/redis/write_test.go @@ -174,7 +174,7 @@ func TestRedisQWrite_WithInfoRefreshInterval(t *testing.T) { // assert the actual error that the buffer is full for _, err := range errs { - assert.Equal(t, err, isb.BufferWriteErr{Name: stream, Full: true, Message: "Buffer full!"}) + assert.Equal(t, err, isb.BufferWriteErr{Name: stream, Full: true, Message: isb.BufferFullMessage}) } } @@ -214,7 +214,7 @@ func TestRedisQWrite_WithInfoRefreshInterval_WithBufferFullWritingStrategyIsDisc // assert the NoRetryableBufferWriteErr for _, err := range errs { - assert.Equal(t, err, isb.NoRetryableBufferWriteErr{Name: stream, Message: "Buffer full!"}) + assert.Equal(t, err, isb.NoRetryableBufferWriteErr{Name: stream, Message: isb.BufferFullMessage}) } } @@ -309,7 +309,7 @@ func Test_updateIsFullFlag(t *testing.T) { // assert the actual error that the buffer is full for _, err := range errs { - assert.Equal(t, err, isb.BufferWriteErr{Name: stream, Full: true, Message: "Buffer full!"}) + assert.Equal(t, err, isb.BufferWriteErr{Name: stream, Full: true, Message: isb.BufferFullMessage}) } } @@ -510,7 +510,7 @@ func TestXTrimOnIsFull(t *testing.T) { // Buffer is full at this point so write will fail with errors because of usage limit _, errs := rqw.Write(ctx, messages) for _, err := range errs { - assert.Equal(t, err, isb.BufferWriteErr{Name: buffer, Full: true, Message: "Buffer full!"}) + assert.Equal(t, err, isb.BufferWriteErr{Name: buffer, Full: true, Message: isb.BufferFullMessage}) } // Read all the messages. diff --git a/pkg/isb/stores/simplebuffer/buffer.go b/pkg/isb/stores/simplebuffer/buffer.go index d7d9bd8969..46de9551f3 100644 --- a/pkg/isb/stores/simplebuffer/buffer.go +++ b/pkg/isb/stores/simplebuffer/buffer.go @@ -152,9 +152,9 @@ func (b *InMemoryBuffer) Write(_ context.Context, messages []isb.Message) ([]isb } else { switch b.options.bufferFullWritingStrategy { case v1alpha1.DiscardLatest: - errs[idx] = isb.NoRetryableBufferWriteErr{Name: b.name, Message: "Buffer full!"} + errs[idx] = isb.NoRetryableBufferWriteErr{Name: b.name, Message: isb.BufferFullMessage} default: - errs[idx] = isb.BufferWriteErr{Name: b.name, Full: true, Message: "Buffer full!"} + errs[idx] = isb.BufferWriteErr{Name: b.name, Full: true, Message: isb.BufferFullMessage} } } } diff --git a/pkg/isb/stores/simplebuffer/buffer_test.go b/pkg/isb/stores/simplebuffer/buffer_test.go index c52a315385..2866c6433b 100644 --- a/pkg/isb/stores/simplebuffer/buffer_test.go +++ b/pkg/isb/stores/simplebuffer/buffer_test.go @@ -69,7 +69,7 @@ func TestNewSimpleBuffer(t *testing.T) { // try to write 3 messages and it should fail (we have only space for 2) _, errs3 := sb.Write(ctx, writeMessages[0:3]) - assert.EqualValues(t, []error{nil, nil, isb.BufferWriteErr{Name: "test", Full: true, Message: "Buffer full!"}}, errs3) + assert.EqualValues(t, []error{nil, nil, isb.BufferWriteErr{Name: "test", Full: true, Message: isb.BufferFullMessage}}, errs3) // let's read some more readMessages, err = sb.Read(ctx, 2) @@ -95,7 +95,7 @@ func TestNewSimpleBuffer_BufferFullWritingStrategyIsDiscard(t *testing.T) { _, errors := sb.Write(ctx, writeMessages[0:3]) assert.NoError(t, errors[0]) assert.NoError(t, errors[1]) - assert.EqualValues(t, []error{nil, nil, isb.NoRetryableBufferWriteErr{Name: "test", Message: "Buffer full!"}}, errors) + assert.EqualValues(t, []error{nil, nil, isb.NoRetryableBufferWriteErr{Name: "test", Message: isb.BufferFullMessage}}, errors) // still full as we did not ack assert.Equal(t, true, sb.IsFull()) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index dc3f4dc67b..cdaa8f02a3 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -87,14 +87,14 @@ var ( Subsystem: "forwarder", Name: "drop_total", Help: "Total number of Messages Dropped", - }, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName}) + }, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName, LabelReason}) // DropBytesCount is to indicate the number of bytes dropped DropBytesCount = promauto.NewCounterVec(prometheus.CounterOpts{ Subsystem: "forwarder", Name: "drop_bytes_total", Help: "Total number of Bytes Dropped", - }, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName}) + }, []string{LabelVertex, LabelPipeline, LabelVertexType, LabelVertexReplicaIndex, LabelPartitionName, LabelReason}) // AckMessagesCount is used to indicate the number of messages acknowledged AckMessagesCount = promauto.NewCounterVec(prometheus.CounterOpts{ diff --git a/pkg/reduce/pnf/pnf.go b/pkg/reduce/pnf/pnf.go index 9de5b3bcae..3df06783d3 100644 --- a/pkg/reduce/pnf/pnf.go +++ b/pkg/reduce/pnf/pnf.go @@ -121,56 +121,56 @@ func NewProcessAndForward(ctx context.Context, // AsyncSchedulePnF creates a go routine for each partition to invoke the UDF. // does not maintain the order of execution between partitions. -func (pm *ProcessAndForward) AsyncSchedulePnF(ctx context.Context, partitionID *partition.ID, pbq pbq.Reader) { +func (pf *ProcessAndForward) AsyncSchedulePnF(ctx context.Context, partitionID *partition.ID, pbq pbq.Reader) { doneCh := make(chan struct{}) - pm.mu.Lock() - pm.pnfRoutines[partitionID.String()] = doneCh - pm.mu.Unlock() + pf.mu.Lock() + pf.pnfRoutines[partitionID.String()] = doneCh + pf.mu.Unlock() - go pm.invokeUDF(ctx, doneCh, partitionID, pbq) + go pf.invokeUDF(ctx, doneCh, partitionID, pbq) } // invokeUDF reads requests from the supplied PBQ, invokes the UDF to gets the response and writes the response to the // main channel. -func (pm *ProcessAndForward) invokeUDF(ctx context.Context, done chan struct{}, pid *partition.ID, pbqReader pbq.Reader) { +func (pf *ProcessAndForward) invokeUDF(ctx context.Context, done chan struct{}, pid *partition.ID, pbqReader pbq.Reader) { defer func() { - pm.mu.Lock() - delete(pm.pnfRoutines, pid.String()) - pm.mu.Unlock() + pf.mu.Lock() + delete(pf.pnfRoutines, pid.String()) + pf.mu.Unlock() }() defer close(done) - udfResponseCh, errCh := pm.reduceApplier.ApplyReduce(ctx, pid, pbqReader.ReadCh()) + udfResponseCh, errCh := pf.reduceApplier.ApplyReduce(ctx, pid, pbqReader.ReadCh()) outerLoop: for { select { case err := <-errCh: if errors.Is(err, context.Canceled) || ctx.Err() != nil { - pm.log.Infow("Context is canceled, stopping the processAndForward", zap.Error(err)) + pf.log.Infow("Context is canceled, stopping the processAndForward", zap.Error(err)) return } if err != nil { - pm.log.Panic("Got an error while invoking ApplyReduce", zap.Error(err)) + pf.log.Panic("Got an error while invoking ApplyReduce", zap.Error(err)) } case response, ok := <-udfResponseCh: if !ok { break outerLoop } - pm.responseCh <- response + pf.responseCh <- response } } } // forwardResponses forwards the writeMessages to the ISBs. It also publishes the watermark and invokes GC on PBQ. // The watermark is only published at COB at key level for Unaligned and at Partition level for Aligned. -func (pm *ProcessAndForward) forwardResponses(ctx context.Context) { - defer close(pm.forwardDoneCh) +func (pf *ProcessAndForward) forwardResponses(ctx context.Context) { + defer close(pf.forwardDoneCh) - flushTimer := time.NewTicker(pm.opts.flushDuration) - writeMessages := make([]*isb.WriteMessage, 0, pm.opts.batchSize) + flushTimer := time.NewTicker(pf.opts.flushDuration) + writeMessages := make([]*isb.WriteMessage, 0, pf.opts.batchSize) // should we flush? var flush bool @@ -180,17 +180,17 @@ func (pm *ProcessAndForward) forwardResponses(ctx context.Context) { forwardLoop: for { select { - case response, ok := <-pm.responseCh: + case response, ok := <-pf.responseCh: if !ok { break forwardLoop } if response.EOF { - if err := pm.forwardToBuffers(ctx, &writeMessages); err != nil { + if err := pf.forwardToBuffers(ctx, &writeMessages); err != nil { return } - if err := pm.handleEOFResponse(ctx, response); err != nil { + if err := pf.handleEOFResponse(ctx, response); err != nil { return } @@ -202,7 +202,7 @@ forwardLoop: writeMessages = append(writeMessages, response.WriteMessage) // if the batch size is reached, let's flush - if len(writeMessages) >= pm.opts.batchSize { + if len(writeMessages) >= pf.opts.batchSize { flush = true } @@ -217,7 +217,7 @@ forwardLoop: } if flush { - if err := pm.forwardToBuffers(ctx, &writeMessages); err != nil { + if err := pf.forwardToBuffers(ctx, &writeMessages); err != nil { return } flush = false @@ -226,19 +226,19 @@ forwardLoop: // if there are any messages left, forward them to the ISB if len(writeMessages) > 0 { - if err := pm.forwardToBuffers(ctx, &writeMessages); err != nil { + if err := pf.forwardToBuffers(ctx, &writeMessages); err != nil { return } } } // handleEOFResponse handles the EOF response received from the response channel. It publishes the watermark and invokes GC on PBQ. -func (pm *ProcessAndForward) handleEOFResponse(ctx context.Context, response *window.TimedWindowResponse) error { +func (pf *ProcessAndForward) handleEOFResponse(ctx context.Context, response *window.TimedWindowResponse) error { // publish watermark - pm.publishWM(ctx) + pf.publishWM(ctx) // delete the closed windows which are tracked by the windower - pm.windower.DeleteClosedWindow(response.Window) + pf.windower.DeleteClosedWindow(response.Window) var infiniteBackoff = wait.Backoff{ Steps: math.MaxInt, @@ -248,13 +248,13 @@ func (pm *ProcessAndForward) handleEOFResponse(ctx context.Context, response *wi } // persist the GC event for unaligned window type (compactor will compact it) and invoke GC for aligned window type. - if pm.windower.Type() == window.Unaligned { + if pf.windower.Type() == window.Unaligned { err := wait.ExponentialBackoff(infiniteBackoff, func() (done bool, err error) { var attempt int - err = pm.opts.gcEventsTracker.PersistGCEvent(response.Window) + err = pf.opts.gcEventsTracker.PersistGCEvent(response.Window) if err != nil { attempt++ - pm.log.Errorw("Got an error while tracking GC event", zap.Error(err), zap.String("windowID", response.Window.ID()), zap.Int("attempt", attempt)) + pf.log.Errorw("Got an error while tracking GC event", zap.Error(err), zap.String("windowID", response.Window.ID()), zap.Int("attempt", attempt)) // no point retrying if ctx.Done has been invoked select { case <-ctx.Done(): @@ -272,13 +272,13 @@ func (pm *ProcessAndForward) handleEOFResponse(ctx context.Context, response *wi } } else { pid := *response.Window.Partition() - pbqReader := pm.pbqManager.GetPBQ(*response.Window.Partition()) + pbqReader := pf.pbqManager.GetPBQ(*response.Window.Partition()) err := wait.ExponentialBackoff(infiniteBackoff, func() (done bool, err error) { var attempt int err = pbqReader.GC() if err != nil { attempt++ - pm.log.Errorw("Got an error while invoking GC on PBQ", zap.Error(err), zap.String("partitionID", pid.String()), zap.Int("attempt", attempt)) + pf.log.Errorw("Got an error while invoking GC on PBQ", zap.Error(err), zap.String("partitionID", pid.String()), zap.Int("attempt", attempt)) // no point retrying if ctx.Done has been invoked select { case <-ctx.Done(): @@ -294,18 +294,18 @@ func (pm *ProcessAndForward) handleEOFResponse(ctx context.Context, response *wi if err != nil { return err } - pm.log.Infow("Finished GC", zap.String("partitionID", pid.String())) + pf.log.Infow("Finished GC", zap.String("partitionID", pid.String())) } return nil } // forwardToBuffers writes the messages to the ISBs concurrently for each partition. -func (pm *ProcessAndForward) forwardToBuffers(ctx context.Context, writeMessages *[]*isb.WriteMessage) error { +func (pf *ProcessAndForward) forwardToBuffers(ctx context.Context, writeMessages *[]*isb.WriteMessage) error { if len(*writeMessages) == 0 { return nil } - messagesToStep := pm.whereToStep(*writeMessages) + messagesToStep := pf.whereToStep(*writeMessages) // parallel writes to each ISB var mu sync.Mutex // use error group @@ -318,13 +318,13 @@ func (pm *ProcessAndForward) forwardToBuffers(ctx context.Context, writeMessages func(toVertexName string, toVertexPartitionIdx int32, resultMessages []isb.Message) { eg.Go(func() error { - offsets, err := pm.writeToBuffer(ctx, toVertexName, toVertexPartitionIdx, resultMessages) + offsets, err := pf.writeToBuffer(ctx, toVertexName, toVertexPartitionIdx, resultMessages) if err != nil { return err } mu.Lock() // TODO: do we need lock? isn't each buffer isolated since we do sequential per ISB? - pm.latestWriteOffsets[toVertexName][toVertexPartitionIdx] = offsets + pf.latestWriteOffsets[toVertexName][toVertexPartitionIdx] = offsets mu.Unlock() return nil }) @@ -338,27 +338,27 @@ func (pm *ProcessAndForward) forwardToBuffers(ctx context.Context, writeMessages } // clear the writeMessages - *writeMessages = make([]*isb.WriteMessage, 0, pm.opts.batchSize) + *writeMessages = make([]*isb.WriteMessage, 0, pf.opts.batchSize) return nil } // whereToStep assigns a message to the ISBs based on the Message.Keys. -func (pm *ProcessAndForward) whereToStep(writeMessages []*isb.WriteMessage) map[string][][]isb.Message { +func (pf *ProcessAndForward) whereToStep(writeMessages []*isb.WriteMessage) map[string][][]isb.Message { // writer doesn't accept array of pointers messagesToStep := make(map[string][][]isb.Message) var to []forwarder.VertexBuffer var err error for _, msg := range writeMessages { - to, err = pm.whereToDecider.WhereTo(msg.Keys, msg.Tags, msg.ID) + to, err = pf.whereToDecider.WhereTo(msg.Keys, msg.Tags, msg.ID) if err != nil { metrics.PlatformError.With(map[string]string{ - metrics.LabelVertex: pm.vertexName, - metrics.LabelPipeline: pm.pipelineName, + metrics.LabelVertex: pf.vertexName, + metrics.LabelPipeline: pf.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeReduceUDF), - metrics.LabelVertexReplicaIndex: strconv.Itoa(int(pm.vertexReplica)), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(pf.vertexReplica)), }).Inc() - pm.log.Errorw("Got an error while invoking WhereTo, dropping the message", zap.Strings("keys", msg.Keys), zap.Error(err)) + pf.log.Errorw("Got an error while invoking WhereTo, dropping the message", zap.Strings("keys", msg.Keys), zap.Error(err)) continue } @@ -368,7 +368,7 @@ func (pm *ProcessAndForward) whereToStep(writeMessages []*isb.WriteMessage) map[ for _, step := range to { if _, ok := messagesToStep[step.ToVertexName]; !ok { - messagesToStep[step.ToVertexName] = make([][]isb.Message, len(pm.toBuffers[step.ToVertexName])) + messagesToStep[step.ToVertexName] = make([][]isb.Message, len(pf.toBuffers[step.ToVertexName])) } messagesToStep[step.ToVertexName][step.ToVertexPartitionIdx] = append(messagesToStep[step.ToVertexName][step.ToVertexPartitionIdx], msg.Message) } @@ -378,11 +378,10 @@ func (pm *ProcessAndForward) whereToStep(writeMessages []*isb.WriteMessage) map[ } // writeToBuffer writes to the ISBs. -func (pm *ProcessAndForward) writeToBuffer(ctx context.Context, edgeName string, partition int32, resultMessages []isb.Message) ([]isb.Offset, error) { +func (pf *ProcessAndForward) writeToBuffer(ctx context.Context, edgeName string, partition int32, resultMessages []isb.Message) ([]isb.Offset, error) { var ( writeCount int writeBytes float64 - dropBytes float64 ) var ISBWriteBackoff = wait.Backoff{ @@ -399,12 +398,33 @@ func (pm *ProcessAndForward) writeToBuffer(ctx context.Context, edgeName string, ctxClosedErr := wait.ExponentialBackoff(ISBWriteBackoff, func() (done bool, err error) { var writeErrs []error var failedMessages []isb.Message - offsets, writeErrs = pm.toBuffers[edgeName][partition].Write(ctx, writeMessages) + offsets, writeErrs = pf.toBuffers[edgeName][partition].Write(ctx, writeMessages) for i, message := range writeMessages { - if writeErrs[i] != nil { - if errors.As(writeErrs[i], &isb.NoRetryableBufferWriteErr{}) { - // If toBuffer returns us a NoRetryableBufferWriteErr, we drop the message. - dropBytes += float64(len(message.Payload)) + writeErr := writeErrs[i] + if writeErr != nil { + // Non retryable error, drop the message. Non retryable errors are only returned + // when the buffer is full and the user has set the buffer full strategy to + // DiscardLatest or when the message is duplicate. + if errors.As(writeErr, &isb.NoRetryableBufferWriteErr{}) { + metrics.DropMessagesCount.With(map[string]string{ + metrics.LabelVertex: pf.vertexName, + metrics.LabelPipeline: pf.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeReduceUDF), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(pf.vertexReplica)), + metrics.LabelPartitionName: pf.toBuffers[edgeName][partition].GetName(), + metrics.LabelReason: writeErr.Error(), + }).Inc() + + metrics.DropBytesCount.With(map[string]string{ + metrics.LabelVertex: pf.vertexName, + metrics.LabelPipeline: pf.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeReduceUDF), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(pf.vertexReplica)), + metrics.LabelPartitionName: pf.toBuffers[edgeName][partition].GetName(), + metrics.LabelReason: writeErr.Error(), + }).Add(float64(len(message.Payload))) + + pf.log.Infow("Dropped message", zap.String("reason", writeErr.Error()), zap.String("vertex", pf.vertexName), zap.String("pipeline", pf.pipelineName)) } else { failedMessages = append(failedMessages, message) } @@ -415,14 +435,14 @@ func (pm *ProcessAndForward) writeToBuffer(ctx context.Context, edgeName string, } // retry only the failed messages if len(failedMessages) > 0 { - pm.log.Warnw("Failed to write messages to isb inside pnf", zap.Errors("errors", writeErrs)) + pf.log.Warnw("Failed to write messages to isb inside pnf", zap.Errors("errors", writeErrs)) writeMessages = failedMessages metrics.WriteMessagesError.With(map[string]string{ - metrics.LabelVertex: pm.vertexName, - metrics.LabelPipeline: pm.pipelineName, + metrics.LabelVertex: pf.vertexName, + metrics.LabelPipeline: pf.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeReduceUDF), - metrics.LabelVertexReplicaIndex: strconv.Itoa(int(pm.vertexReplica)), - metrics.LabelPartitionName: pm.toBuffers[edgeName][partition].GetName()}).Add(float64(len(failedMessages))) + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(pf.vertexReplica)), + metrics.LabelPartitionName: pf.toBuffers[edgeName][partition].GetName()}).Add(float64(len(failedMessages))) if ctx.Err() != nil { // no need to retry if the context is closed @@ -435,47 +455,33 @@ func (pm *ProcessAndForward) writeToBuffer(ctx context.Context, edgeName string, }) if ctxClosedErr != nil { - pm.log.Errorw("Ctx closed while writing messages to ISB", zap.Error(ctxClosedErr)) + pf.log.Errorw("Ctx closed while writing messages to ISB", zap.Error(ctxClosedErr)) return nil, ctxClosedErr } - metrics.DropMessagesCount.With(map[string]string{ - metrics.LabelVertex: pm.vertexName, - metrics.LabelPipeline: pm.pipelineName, - metrics.LabelVertexType: string(dfv1.VertexTypeReduceUDF), - metrics.LabelVertexReplicaIndex: strconv.Itoa(int(pm.vertexReplica)), - metrics.LabelPartitionName: pm.toBuffers[edgeName][partition].GetName()}).Add(float64(len(resultMessages) - writeCount)) - - metrics.DropBytesCount.With(map[string]string{ - metrics.LabelVertex: pm.vertexName, - metrics.LabelPipeline: pm.pipelineName, - metrics.LabelVertexType: string(dfv1.VertexTypeReduceUDF), - metrics.LabelVertexReplicaIndex: strconv.Itoa(int(pm.vertexReplica)), - metrics.LabelPartitionName: pm.toBuffers[edgeName][partition].GetName()}).Add(dropBytes) - metrics.WriteMessagesCount.With(map[string]string{ - metrics.LabelVertex: pm.vertexName, - metrics.LabelPipeline: pm.pipelineName, + metrics.LabelVertex: pf.vertexName, + metrics.LabelPipeline: pf.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeReduceUDF), - metrics.LabelVertexReplicaIndex: strconv.Itoa(int(pm.vertexReplica)), - metrics.LabelPartitionName: pm.toBuffers[edgeName][partition].GetName()}).Add(float64(writeCount)) + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(pf.vertexReplica)), + metrics.LabelPartitionName: pf.toBuffers[edgeName][partition].GetName()}).Add(float64(writeCount)) metrics.WriteBytesCount.With(map[string]string{ - metrics.LabelVertex: pm.vertexName, - metrics.LabelPipeline: pm.pipelineName, + metrics.LabelVertex: pf.vertexName, + metrics.LabelPipeline: pf.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeReduceUDF), - metrics.LabelVertexReplicaIndex: strconv.Itoa(int(pm.vertexReplica)), - metrics.LabelPartitionName: pm.toBuffers[edgeName][partition].GetName()}).Add(writeBytes) + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(pf.vertexReplica)), + metrics.LabelPartitionName: pf.toBuffers[edgeName][partition].GetName()}).Add(writeBytes) return offsets, nil } // publishWM publishes the watermark to each edge. -func (pm *ProcessAndForward) publishWM(ctx context.Context) { +func (pf *ProcessAndForward) publishWM(ctx context.Context) { // publish watermark, we publish window end time minus one millisecond as watermark // but if there's a window that's about to be closed which has a end time before the current window end time, // we publish that window's end time as watermark. This is to ensure that the watermark is monotonically increasing. wm := wmb.Watermark(time.UnixMilli(-1)) - if oldestClosedWindowEndTime := pm.windower.OldestWindowEndTime(); oldestClosedWindowEndTime.UnixMilli() != -1 { + if oldestClosedWindowEndTime := pf.windower.OldestWindowEndTime(); oldestClosedWindowEndTime.UnixMilli() != -1 { wm = wmb.Watermark(oldestClosedWindowEndTime.Add(-1 * time.Millisecond)) } @@ -484,15 +490,15 @@ func (pm *ProcessAndForward) publishWM(ctx context.Context) { // it's used to determine which buffers should receive an idle watermark. // Created as a slice since it tracks per partition of the buffer. var activeWatermarkBuffers = make(map[string][]bool) - for toVertexName, bufferOffsets := range pm.latestWriteOffsets { + for toVertexName, bufferOffsets := range pf.latestWriteOffsets { activeWatermarkBuffers[toVertexName] = make([]bool, len(bufferOffsets)) - if publisher, ok := pm.watermarkPublishers[toVertexName]; ok { + if publisher, ok := pf.watermarkPublishers[toVertexName]; ok { for index, offsets := range bufferOffsets { if len(offsets) > 0 { publisher.PublishWatermark(wm, offsets[len(offsets)-1], int32(index)) activeWatermarkBuffers[toVertexName][index] = true // reset because the toBuffer partition is not idling - pm.idleManager.MarkActive(wmb.PARTITION_0, pm.toBuffers[toVertexName][index].GetName()) + pf.idleManager.MarkActive(wmb.PARTITION_0, pf.toBuffers[toVertexName][index].GetName()) } } } @@ -501,55 +507,55 @@ func (pm *ProcessAndForward) publishWM(ctx context.Context) { // if there's any buffers that haven't received any watermark during this // batch processing cycle, send an idle watermark - for toVertexName := range pm.watermarkPublishers { + for toVertexName := range pf.watermarkPublishers { for index, activePartition := range activeWatermarkBuffers[toVertexName] { if !activePartition { - if publisher, ok := pm.watermarkPublishers[toVertexName]; ok { - idlehandler.PublishIdleWatermark(ctx, wmb.PARTITION_0, pm.toBuffers[toVertexName][index], publisher, pm.idleManager, pm.log, pm.vertexName, pm.pipelineName, dfv1.VertexTypeReduceUDF, pm.vertexReplica, wm) + if publisher, ok := pf.watermarkPublishers[toVertexName]; ok { + idlehandler.PublishIdleWatermark(ctx, wmb.PARTITION_0, pf.toBuffers[toVertexName][index], publisher, pf.idleManager, pf.log, pf.vertexName, pf.pipelineName, dfv1.VertexTypeReduceUDF, pf.vertexReplica, wm) } } } } // reset the latestWriteOffsets after publishing watermark - pm.latestWriteOffsets = make(map[string][][]isb.Offset) - for toVertexName, toVertexBuffer := range pm.toBuffers { - pm.latestWriteOffsets[toVertexName] = make([][]isb.Offset, len(toVertexBuffer)) + pf.latestWriteOffsets = make(map[string][][]isb.Offset) + for toVertexName, toVertexBuffer := range pf.toBuffers { + pf.latestWriteOffsets[toVertexName] = make([][]isb.Offset, len(toVertexBuffer)) } } // Shutdown closes all the partitions of the buffer. -func (pm *ProcessAndForward) Shutdown() { - pm.log.Infow("Shutting down ProcessAndForward") +func (pf *ProcessAndForward) Shutdown() { + pf.log.Infow("Shutting down ProcessAndForward") - pm.mu.RLock() - doneChs := make([]chan struct{}, 0, len(pm.pnfRoutines)) - for _, doneCh := range pm.pnfRoutines { + pf.mu.RLock() + doneChs := make([]chan struct{}, 0, len(pf.pnfRoutines)) + for _, doneCh := range pf.pnfRoutines { doneChs = append(doneChs, doneCh) } - pm.mu.RUnlock() + pf.mu.RUnlock() for _, doneCh := range doneChs { <-doneCh } - pm.log.Infow("All PnFs have finished, waiting for forwardDoneCh to be done") + pf.log.Infow("All PnFs have finished, waiting for forwardDoneCh to be done") // close the response channel - close(pm.responseCh) + close(pf.responseCh) // wait for the forwardResponses to finish - <-pm.forwardDoneCh + <-pf.forwardDoneCh // close all the buffers since the forwardResponses is done - for _, buffer := range pm.toBuffers { + for _, buffer := range pf.toBuffers { for _, p := range buffer { if err := p.Close(); err != nil { - pm.log.Errorw("Failed to close partition writer, shutdown anyways...", zap.Error(err), zap.String("bufferTo", p.GetName())) + pf.log.Errorw("Failed to close partition writer, shutdown anyways...", zap.Error(err), zap.String("bufferTo", p.GetName())) } else { - pm.log.Infow("Closed partition writer", zap.String("bufferTo", p.GetName())) + pf.log.Infow("Closed partition writer", zap.String("bufferTo", p.GetName())) } } } - pm.log.Infow("Successfully shutdown ProcessAndForward") + pf.log.Infow("Successfully shutdown ProcessAndForward") } diff --git a/pkg/sinks/forward/forward.go b/pkg/sinks/forward/forward.go index 90f5041fb4..05b5f12302 100644 --- a/pkg/sinks/forward/forward.go +++ b/pkg/sinks/forward/forward.go @@ -355,7 +355,6 @@ func (df *DataForward) writeToBuffer(ctx context.Context, toBufferPartition isb. totalCount int writeCount int writeBytes float64 - dropBytes float64 ) totalCount = len(messages) writeOffsets = make([]isb.Offset, 0, totalCount) @@ -366,11 +365,31 @@ func (df *DataForward) writeToBuffer(ctx context.Context, toBufferPartition isb. var failedMessages []isb.Message needRetry := false for idx, msg := range messages { - if err := errs[idx]; err != nil { + if err = errs[idx]; err != nil { // ATM there are no user defined errors during write, all are InternalErrors. + // Non retryable error, drop the message. Non retryable errors are only returned + // when the buffer is full and the user has set the buffer full strategy to + // DiscardLatest or when the message is duplicate. if errors.As(err, &isb.NoRetryableBufferWriteErr{}) { - // If toBufferPartition returns us a NoRetryableBufferWriteErr, we drop the message. - dropBytes += float64(len(msg.Payload)) + metrics.DropMessagesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSink), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: toBufferPartition.GetName(), + metrics.LabelReason: err.Error(), + }).Inc() + + metrics.DropBytesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSink), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: toBufferPartition.GetName(), + metrics.LabelReason: err.Error(), + }).Add(float64(len(msg.Payload))) + + df.opts.logger.Infow("Dropped message", zap.String("reason", err.Error()), zap.String("partition", toBufferPartition.GetName()), zap.String("vertex", df.vertexName), zap.String("pipeline", df.pipelineName)) } else { needRetry = true // we retry only failed messages @@ -408,8 +427,6 @@ func (df *DataForward) writeToBuffer(ctx context.Context, toBufferPartition isb. } } - metrics.DropMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(float64(totalCount - writeCount)) - metrics.DropBytesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(dropBytes) metrics.WriteMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(float64(writeCount)) metrics.WriteBytesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(writeBytes) return writeOffsets, nil diff --git a/pkg/sources/forward/data_forward.go b/pkg/sources/forward/data_forward.go index 00897548e9..a2a4357835 100644 --- a/pkg/sources/forward/data_forward.go +++ b/pkg/sources/forward/data_forward.go @@ -474,7 +474,6 @@ func (df *DataForward) writeToBuffer(ctx context.Context, toBufferPartition isb. totalCount int writeCount int writeBytes float64 - dropBytes float64 ) totalCount = len(messages) writeOffsets = make([]isb.Offset, 0, totalCount) @@ -485,11 +484,31 @@ func (df *DataForward) writeToBuffer(ctx context.Context, toBufferPartition isb. var failedMessages []isb.Message needRetry := false for idx, msg := range messages { - if err := errs[idx]; err != nil { + if err = errs[idx]; err != nil { // ATM there are no user-defined errors during writing, all are InternalErrors. + // Non retryable error, drop the message. Non retryable errors are only returned + // when the buffer is full and the user has set the buffer full strategy to + // DiscardLatest or when the message is duplicate. if errors.As(err, &isb.NoRetryableBufferWriteErr{}) { - // If toBufferPartition returns us a NoRetryableBufferWriteErr, we drop the message. - dropBytes += float64(len(msg.Payload)) + metrics.DropMessagesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: toBufferPartition.GetName(), + metrics.LabelReason: err.Error(), + }).Inc() + + metrics.DropBytesCount.With(map[string]string{ + metrics.LabelVertex: df.vertexName, + metrics.LabelPipeline: df.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSource), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), + metrics.LabelPartitionName: toBufferPartition.GetName(), + metrics.LabelReason: err.Error(), + }).Add(float64(len(msg.Payload))) + + df.opts.logger.Infow("Dropped message", zap.String("reason", err.Error()), zap.String("partition", toBufferPartition.GetName()), zap.String("vertex", df.vertexName), zap.String("pipeline", df.pipelineName)) } else { needRetry = true // we retry only failed messages @@ -527,8 +546,6 @@ func (df *DataForward) writeToBuffer(ctx context.Context, toBufferPartition isb. } } - metrics.DropMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(float64(totalCount - writeCount)) - metrics.DropBytesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(dropBytes) metrics.WriteMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(float64(writeCount)) metrics.WriteBytesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(writeBytes) return writeOffsets, nil diff --git a/pkg/udf/forward/forward.go b/pkg/udf/forward/forward.go index 2cd8fb4234..d36e7b1a16 100644 --- a/pkg/udf/forward/forward.go +++ b/pkg/udf/forward/forward.go @@ -572,7 +572,6 @@ func (isdf *InterStepDataForward) writeToBuffer(ctx context.Context, toBufferPar totalCount int writeCount int writeBytes float64 - dropBytes float64 ) totalCount = len(messages) writeOffsets = make([]isb.Offset, 0, totalCount) @@ -583,11 +582,31 @@ func (isdf *InterStepDataForward) writeToBuffer(ctx context.Context, toBufferPar var failedMessages []isb.Message needRetry := false for idx, msg := range messages { - if err := errs[idx]; err != nil { + if err = errs[idx]; err != nil { // ATM there are no user defined errors during write, all are InternalErrors. + // Non retryable error, drop the message. Non retryable errors are only returned + // when the buffer is full and the user has set the buffer full strategy to + // DiscardLatest or when the message is duplicate. if errors.As(err, &isb.NoRetryableBufferWriteErr{}) { - // If toBufferPartition returns us a NoRetryableBufferWriteErr, we drop the message. - dropBytes += float64(len(msg.Payload)) + metrics.DropMessagesCount.With(map[string]string{ + metrics.LabelVertex: isdf.vertexName, + metrics.LabelPipeline: isdf.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSink), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(isdf.vertexReplica)), + metrics.LabelPartitionName: toBufferPartition.GetName(), + metrics.LabelReason: err.Error(), + }).Inc() + + metrics.DropBytesCount.With(map[string]string{ + metrics.LabelVertex: isdf.vertexName, + metrics.LabelPipeline: isdf.pipelineName, + metrics.LabelVertexType: string(dfv1.VertexTypeSink), + metrics.LabelVertexReplicaIndex: strconv.Itoa(int(isdf.vertexReplica)), + metrics.LabelPartitionName: toBufferPartition.GetName(), + metrics.LabelReason: err.Error(), + }).Add(float64(len(msg.Payload))) + + isdf.opts.logger.Infow("Dropped message", zap.String("reason", err.Error()), zap.String("partition", toBufferPartition.GetName()), zap.String("vertex", isdf.vertexName), zap.String("pipeline", isdf.pipelineName)) } else { needRetry = true // we retry only failed messages @@ -625,8 +644,6 @@ func (isdf *InterStepDataForward) writeToBuffer(ctx context.Context, toBufferPar } } - metrics.DropMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeMapUDF), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(isdf.vertexReplica)), metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(float64(totalCount - writeCount)) - metrics.DropBytesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeMapUDF), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(isdf.vertexReplica)), metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(dropBytes) metrics.WriteMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeMapUDF), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(isdf.vertexReplica)), metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(float64(writeCount)) metrics.WriteBytesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeMapUDF), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(isdf.vertexReplica)), metrics.LabelPartitionName: toBufferPartition.GetName()}).Add(writeBytes) return writeOffsets, nil From 282e136f0fc51d771f0bc692dac7d66eb5a6b5f4 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Sun, 7 Apr 2024 13:00:08 +0530 Subject: [PATCH 2/3] fix e2e Signed-off-by: Yashash H L --- pkg/isb/errors.go | 2 +- test/e2e/functional_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/isb/errors.go b/pkg/isb/errors.go index c272f6bb5a..8d6a2d1834 100644 --- a/pkg/isb/errors.go +++ b/pkg/isb/errors.go @@ -109,5 +109,5 @@ type NoRetryableBufferWriteErr struct { } func (e NoRetryableBufferWriteErr) Error() string { - return fmt.Sprintf("(%s) %s %#v", e.Name, e.Message, e) + return e.Message } diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index 7427945353..e70fe9a3ea 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -240,8 +240,8 @@ func (s *FunctionalSuite) TestDropOnFull() { time.Sleep(time.Second * 5) w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("2"))) - expectedDropMetricOne := `forwarder_drop_total{partition_name="numaflow-system-drop-on-full-sink-0",pipeline="drop-on-full",replica="0",vertex="in",vertex_type="Source"} 1` - expectedDropMetricTwo := `forwarder_drop_total{partition_name="numaflow-system-drop-on-full-sink-1",pipeline="drop-on-full",replica="0",vertex="in",vertex_type="Source"} 1` + expectedDropMetricOne := `forwarder_drop_total{partition_name="numaflow-system-drop-on-full-sink-0",pipeline="drop-on-full",reason="Buffer full!",replica="0",vertex="in",vertex_type="Source"} 1` + expectedDropMetricTwo := `forwarder_drop_total{partition_name="numaflow-system-drop-on-full-sink-1",pipeline="drop-on-full",reason="Buffer full!",replica="0",vertex="in",vertex_type="Source"} 1` // wait for the drop metric to be updated, time out after 10s. timeoutChan := time.After(time.Second * 10) ticker := time.NewTicker(time.Second * 2) From dc080a312866f880f0700a2226a38a38bd70aaa6 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Sun, 7 Apr 2024 19:56:08 -0700 Subject: [PATCH 3/3] fix: typo Signed-off-by: Vigith Maurice --- pkg/isb/errors.go | 6 +++--- pkg/isb/stores/jetstream/writer.go | 6 +++--- pkg/isb/stores/jetstream/writer_test.go | 2 +- pkg/isb/stores/redis/write.go | 2 +- pkg/isb/stores/redis/write_test.go | 4 ++-- pkg/isb/stores/simplebuffer/buffer.go | 2 +- pkg/isb/stores/simplebuffer/buffer_test.go | 2 +- pkg/reduce/pnf/pnf.go | 2 +- pkg/sinks/forward/forward.go | 2 +- pkg/sources/forward/data_forward.go | 2 +- pkg/udf/forward/forward.go | 2 +- 11 files changed, 16 insertions(+), 16 deletions(-) diff --git a/pkg/isb/errors.go b/pkg/isb/errors.go index 8d6a2d1834..0d9a543164 100644 --- a/pkg/isb/errors.go +++ b/pkg/isb/errors.go @@ -102,12 +102,12 @@ func (e MessageReadErr) Error() string { return fmt.Sprintf("(%s) %s Header: %s Body:%s", e.Name, e.Message, string(e.Header), string(e.Body)) } -// NoRetryableBufferWriteErr indicates that the buffer is full and the writer, based on user specification, decides to not retry. -type NoRetryableBufferWriteErr struct { +// NonRetryableBufferWriteErr indicates that the buffer is full and the writer, based on user specification, decides to not retry. +type NonRetryableBufferWriteErr struct { Name string Message string } -func (e NoRetryableBufferWriteErr) Error() string { +func (e NonRetryableBufferWriteErr) Error() string { return e.Message } diff --git a/pkg/isb/stores/jetstream/writer.go b/pkg/isb/stores/jetstream/writer.go index a2c52bbc6a..8605b3290d 100644 --- a/pkg/isb/stores/jetstream/writer.go +++ b/pkg/isb/stores/jetstream/writer.go @@ -166,7 +166,7 @@ func (jw *jetStreamWriter) Write(ctx context.Context, messages []isb.Message) ([ // user explicitly wants to discard the message when buffer if full. // return no retryable error as a callback to let caller know that the message is discarded. for i := 0; i < len(errs); i++ { - errs[i] = isb.NoRetryableBufferWriteErr{Name: jw.name, Message: isb.BufferFullMessage} + errs[i] = isb.NonRetryableBufferWriteErr{Name: jw.name, Message: isb.BufferFullMessage} } default: // Default behavior is to return a BufferWriteErr. @@ -226,7 +226,7 @@ func (jw *jetStreamWriter) asyncWrite(_ context.Context, messages []isb.Message, // If a message gets repeated, it will have the same offset number as the one before it. // We shouldn't try to publish watermark on these repeated messages. Doing so would // violate the principle of publishing watermarks to monotonically increasing offsets. - errs[idx] = isb.NoRetryableBufferWriteErr{Name: jw.name, Message: isb.DuplicateIDMessage} + errs[idx] = isb.NonRetryableBufferWriteErr{Name: jw.name, Message: isb.DuplicateIDMessage} } else { writeOffsets[idx] = &writeOffset{seq: pubAck.Sequence, partitionIdx: jw.partitionIdx} errs[idx] = nil @@ -289,7 +289,7 @@ func (jw *jetStreamWriter) syncWrite(_ context.Context, messages []isb.Message, // If a message gets repeated, it will have the same offset number as the one before it. // We shouldn't try to publish watermark on these repeated messages. Doing so would // violate the principle of publishing watermarks to monotonically increasing offsets. - errs[idx] = isb.NoRetryableBufferWriteErr{Name: jw.name, Message: isb.DuplicateIDMessage} + errs[idx] = isb.NonRetryableBufferWriteErr{Name: jw.name, Message: isb.DuplicateIDMessage} } else { writeOffsets[idx] = &writeOffset{seq: pubAck.Sequence, partitionIdx: jw.partitionIdx} errs[idx] = nil diff --git a/pkg/isb/stores/jetstream/writer_test.go b/pkg/isb/stores/jetstream/writer_test.go index 9be379e860..54ef1f65aa 100644 --- a/pkg/isb/stores/jetstream/writer_test.go +++ b/pkg/isb/stores/jetstream/writer_test.go @@ -300,7 +300,7 @@ func TestJetStreamBufferWriterBufferFull_DiscardLatest(t *testing.T) { _, errs = jw.Write(ctx, messages) assert.Equal(t, len(errs), 2) for _, errMsg := range errs { - assert.Equal(t, errMsg, isb.NoRetryableBufferWriteErr{Name: streamName, Message: isb.BufferFullMessage}) + assert.Equal(t, errMsg, isb.NonRetryableBufferWriteErr{Name: streamName, Message: isb.BufferFullMessage}) } } diff --git a/pkg/isb/stores/redis/write.go b/pkg/isb/stores/redis/write.go index 2a8fa68338..94876feffc 100644 --- a/pkg/isb/stores/redis/write.go +++ b/pkg/isb/stores/redis/write.go @@ -167,7 +167,7 @@ func (bw *BufferWrite) Write(_ context.Context, messages []isb.Message) ([]isb.O case dfv1.DiscardLatest: // user explicitly wants to discard the message when buffer if full. // return no retryable error as a callback to let caller know that the message is discarded. - initializeErrorArray(errs, isb.NoRetryableBufferWriteErr{Name: bw.Name, Message: isb.BufferFullMessage}) + initializeErrorArray(errs, isb.NonRetryableBufferWriteErr{Name: bw.Name, Message: isb.BufferFullMessage}) default: // Default behavior is to return a BufferWriteErr. initializeErrorArray(errs, isb.BufferWriteErr{Name: bw.Name, Full: true, Message: isb.BufferFullMessage}) diff --git a/pkg/isb/stores/redis/write_test.go b/pkg/isb/stores/redis/write_test.go index e8391dd84a..667bc7bd53 100644 --- a/pkg/isb/stores/redis/write_test.go +++ b/pkg/isb/stores/redis/write_test.go @@ -212,9 +212,9 @@ func TestRedisQWrite_WithInfoRefreshInterval_WithBufferFullWritingStrategyIsDisc defer func() { _ = client.DeleteKeys(ctx, internalKeys...) }() _, errs = rqw.Write(ctx, writeMessages) - // assert the NoRetryableBufferWriteErr + // assert the NonRetryableBufferWriteErr for _, err := range errs { - assert.Equal(t, err, isb.NoRetryableBufferWriteErr{Name: stream, Message: isb.BufferFullMessage}) + assert.Equal(t, err, isb.NonRetryableBufferWriteErr{Name: stream, Message: isb.BufferFullMessage}) } } diff --git a/pkg/isb/stores/simplebuffer/buffer.go b/pkg/isb/stores/simplebuffer/buffer.go index 46de9551f3..893c2fe085 100644 --- a/pkg/isb/stores/simplebuffer/buffer.go +++ b/pkg/isb/stores/simplebuffer/buffer.go @@ -152,7 +152,7 @@ func (b *InMemoryBuffer) Write(_ context.Context, messages []isb.Message) ([]isb } else { switch b.options.bufferFullWritingStrategy { case v1alpha1.DiscardLatest: - errs[idx] = isb.NoRetryableBufferWriteErr{Name: b.name, Message: isb.BufferFullMessage} + errs[idx] = isb.NonRetryableBufferWriteErr{Name: b.name, Message: isb.BufferFullMessage} default: errs[idx] = isb.BufferWriteErr{Name: b.name, Full: true, Message: isb.BufferFullMessage} } diff --git a/pkg/isb/stores/simplebuffer/buffer_test.go b/pkg/isb/stores/simplebuffer/buffer_test.go index 2866c6433b..20cf292d21 100644 --- a/pkg/isb/stores/simplebuffer/buffer_test.go +++ b/pkg/isb/stores/simplebuffer/buffer_test.go @@ -95,7 +95,7 @@ func TestNewSimpleBuffer_BufferFullWritingStrategyIsDiscard(t *testing.T) { _, errors := sb.Write(ctx, writeMessages[0:3]) assert.NoError(t, errors[0]) assert.NoError(t, errors[1]) - assert.EqualValues(t, []error{nil, nil, isb.NoRetryableBufferWriteErr{Name: "test", Message: isb.BufferFullMessage}}, errors) + assert.EqualValues(t, []error{nil, nil, isb.NonRetryableBufferWriteErr{Name: "test", Message: isb.BufferFullMessage}}, errors) // still full as we did not ack assert.Equal(t, true, sb.IsFull()) diff --git a/pkg/reduce/pnf/pnf.go b/pkg/reduce/pnf/pnf.go index 3df06783d3..d54b09eb63 100644 --- a/pkg/reduce/pnf/pnf.go +++ b/pkg/reduce/pnf/pnf.go @@ -405,7 +405,7 @@ func (pf *ProcessAndForward) writeToBuffer(ctx context.Context, edgeName string, // Non retryable error, drop the message. Non retryable errors are only returned // when the buffer is full and the user has set the buffer full strategy to // DiscardLatest or when the message is duplicate. - if errors.As(writeErr, &isb.NoRetryableBufferWriteErr{}) { + if errors.As(writeErr, &isb.NonRetryableBufferWriteErr{}) { metrics.DropMessagesCount.With(map[string]string{ metrics.LabelVertex: pf.vertexName, metrics.LabelPipeline: pf.pipelineName, diff --git a/pkg/sinks/forward/forward.go b/pkg/sinks/forward/forward.go index 05b5f12302..6263c36b4b 100644 --- a/pkg/sinks/forward/forward.go +++ b/pkg/sinks/forward/forward.go @@ -370,7 +370,7 @@ func (df *DataForward) writeToBuffer(ctx context.Context, toBufferPartition isb. // Non retryable error, drop the message. Non retryable errors are only returned // when the buffer is full and the user has set the buffer full strategy to // DiscardLatest or when the message is duplicate. - if errors.As(err, &isb.NoRetryableBufferWriteErr{}) { + if errors.As(err, &isb.NonRetryableBufferWriteErr{}) { metrics.DropMessagesCount.With(map[string]string{ metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, diff --git a/pkg/sources/forward/data_forward.go b/pkg/sources/forward/data_forward.go index a2a4357835..41e9cc3c28 100644 --- a/pkg/sources/forward/data_forward.go +++ b/pkg/sources/forward/data_forward.go @@ -489,7 +489,7 @@ func (df *DataForward) writeToBuffer(ctx context.Context, toBufferPartition isb. // Non retryable error, drop the message. Non retryable errors are only returned // when the buffer is full and the user has set the buffer full strategy to // DiscardLatest or when the message is duplicate. - if errors.As(err, &isb.NoRetryableBufferWriteErr{}) { + if errors.As(err, &isb.NonRetryableBufferWriteErr{}) { metrics.DropMessagesCount.With(map[string]string{ metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, diff --git a/pkg/udf/forward/forward.go b/pkg/udf/forward/forward.go index d36e7b1a16..4a7a2e8ccb 100644 --- a/pkg/udf/forward/forward.go +++ b/pkg/udf/forward/forward.go @@ -587,7 +587,7 @@ func (isdf *InterStepDataForward) writeToBuffer(ctx context.Context, toBufferPar // Non retryable error, drop the message. Non retryable errors are only returned // when the buffer is full and the user has set the buffer full strategy to // DiscardLatest or when the message is duplicate. - if errors.As(err, &isb.NoRetryableBufferWriteErr{}) { + if errors.As(err, &isb.NonRetryableBufferWriteErr{}) { metrics.DropMessagesCount.With(map[string]string{ metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName,