From 51bf10b569e3393f03715e878ce1961cdba5a3e3 Mon Sep 17 00:00:00 2001 From: Andrzej Stencel Date: Thu, 31 Oct 2024 15:48:34 +0100 Subject: [PATCH] [chore][pkg/stanza] refactor: remove function juggling (#36108) Separates header processing functions from content processing functions to improve code clarity. This is a follow-up from https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/35870#discussion_r1806581048. --- .../fileconsumer/internal/reader/factory.go | 11 +++-------- pkg/stanza/fileconsumer/internal/reader/reader.go | 15 +++++---------- 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/pkg/stanza/fileconsumer/internal/reader/factory.go b/pkg/stanza/fileconsumer/internal/reader/factory.go index 7287ca40dae7..4731e6bc3ce7 100644 --- a/pkg/stanza/fileconsumer/internal/reader/factory.go +++ b/pkg/stanza/fileconsumer/internal/reader/factory.go @@ -74,7 +74,6 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, initialBufferSize: f.InitialBufferSize, maxLogSize: f.MaxLogSize, decoder: decode.New(f.Encoding), - lineSplitFunc: f.SplitFunc, deleteAtEOF: f.DeleteAtEOF, includeFileRecordNum: f.IncludeFileRecordNumber, compression: f.Compression, @@ -103,18 +102,14 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, } flushFunc := m.FlushState.Func(f.SplitFunc, f.FlushTimeout) - r.lineSplitFunc = trim.WithFunc(trim.ToLength(flushFunc, f.MaxLogSize), f.TrimFunc) + r.contentSplitFunc = trim.WithFunc(trim.ToLength(flushFunc, f.MaxLogSize), f.TrimFunc) r.emitFunc = f.EmitFunc - if f.HeaderConfig == nil || m.HeaderFinalized { - r.splitFunc = r.lineSplitFunc - r.processFunc = r.emitFunc - } else { + if f.HeaderConfig != nil && !m.HeaderFinalized { + r.headerSplitFunc = f.HeaderConfig.SplitFunc r.headerReader, err = header.NewReader(f.TelemetrySettings, *f.HeaderConfig) if err != nil { return nil, err } - r.splitFunc = f.HeaderConfig.SplitFunc - r.processFunc = r.headerReader.Process } attributes, err := f.Attributes.Resolve(file) diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index 3cb998597069..207e5b745dbd 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -42,11 +42,10 @@ type Reader struct { fingerprintSize int initialBufferSize int maxLogSize int - lineSplitFunc bufio.SplitFunc - splitFunc bufio.SplitFunc + headerSplitFunc bufio.SplitFunc + contentSplitFunc bufio.SplitFunc decoder *decode.Decoder headerReader *header.Reader - processFunc emit.Callback emitFunc emit.Callback deleteAtEOF bool needsUpdateFingerprint bool @@ -116,7 +115,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) { } func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) { - s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.splitFunc) + s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.headerSplitFunc) // Read the tokens from the file until no more header tokens are found or the end of file is reached. for { @@ -167,10 +166,6 @@ func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) { r.HeaderFinalized = true r.initialBufferSize = scanner.DefaultBufferSize - // Switch to the normal split and process functions. - r.splitFunc = r.lineSplitFunc - r.processFunc = r.emitFunc - // Reset position in file to r.Offest after the header scanner might have moved it past a content token. if _, err := r.file.Seek(r.Offset, 0); err != nil { r.set.Logger.Error("failed to seek post-header", zap.Error(err)) @@ -182,7 +177,7 @@ func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) { func (r *Reader) readContents(ctx context.Context) { // Create the scanner to read the contents of the file. - s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.splitFunc) + s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.contentSplitFunc) // Iterate over the contents of the file. for { @@ -214,7 +209,7 @@ func (r *Reader) readContents(ctx context.Context) { r.FileAttributes[attrs.LogFileRecordNumber] = r.RecordNum } - err = r.processFunc(ctx, token, r.FileAttributes) + err = r.emitFunc(ctx, token, r.FileAttributes) if err != nil { r.set.Logger.Error("failed to process token", zap.Error(err)) }