Skip to content

Commit

Permalink
textparse: Implement CreatedTimestamp() in openmetricsparse.go (p…
Browse files Browse the repository at this point in the history
…rometheus#14356)

* feat: initial implement of createedTimestamp() with tests

Signed-off-by: Manik Rana <[email protected]>

* feat: return ct after finding it

Signed-off-by: Manik Rana <[email protected]>

* chore: remove unneeded test

Signed-off-by: Manik Rana <[email protected]>

* chore: add comments

Signed-off-by: Manik Rana <[email protected]>

* feat: multiple changes

- implement changes from pair programming session
- use newParse.val()
- advance parser p if ct is found

Signed-off-by: Manik Rana <[email protected]>

* fix: check if err from p.Next()

Signed-off-by: Manik Rana <[email protected]>

* feat: advance parser and parse histograms + summary

Signed-off-by: Manik Rana <[email protected]>

* fix: restore previous tests

Signed-off-by: Manik Rana <[email protected]>

* fix: retore failing tests

Signed-off-by: Manik Rana <[email protected]>

* chore: remove unneeded comments

Signed-off-by: Manik Rana <[email protected]>

* fix: return nil when mtype doesn't match

Signed-off-by: Manik Rana <[email protected]>

* chore: update go fmt version

Signed-off-by: Manik Rana <[email protected]>

* chore: cleanup

Signed-off-by: Manik Rana <[email protected]>

* fix: comments

Signed-off-by: Manik Rana <[email protected]>

* feat: document deepcopyparser

Co-authored-by: Arthur Silva Sens <[email protected]>
Signed-off-by: Manik Rana <[email protected]>
Signed-off-by: Manik Rana <[email protected]>

* chore: lint

Signed-off-by: Manik Rana <[email protected]>

* fix: cover edgecase of `gauge_created` in CreatedTimestamp()

Signed-off-by: Manik Rana <[email protected]>

* refac: readability updates

Signed-off-by: Manik Rana <[email protected]>

* refac: dedeuplicate labeldiff checks

Signed-off-by: Manik Rana <[email protected]>

* tests: add tests for new label functions

Signed-off-by: Manik Rana <[email protected]>

* feat: document CreatedTimestamp func

Signed-off-by: Manik Rana <[email protected]>

* refac: optimize `CreatedTimestamp()`

 - Use refactored CreatedTimestamp function with bug fixes
 - Remove unused code in labels.go
 - Improve code documentation

 Signed-off-by: Manik Rana <[email protected]>

Signed-off-by: Manik Rana <[email protected]>

* chore: add tests and lint fixes

Signed-off-by: Manik Rana <[email protected]>

* chore: remove mName

Signed-off-by: Manik Rana <[email protected]>

* chore: lint

Signed-off-by: Manik Rana <[email protected]>

* chore: comments

Signed-off-by: Manik Rana <[email protected]>

* tests: add tests for CT parse failures and deepCopy

Signed-off-by: Manik Rana <[email protected]>

* refac: edit expectCT struct

Signed-off-by: Manik Rana <[email protected]>

* chore: lint

Signed-off-by: Manik Rana <[email protected]>

* chore: add new label in deepCopy

Signed-off-by: Manik Rana <[email protected]>

* fix: use p.builder in deepCopy

Signed-off-by: Manik Rana <[email protected]>

* fix: add NewMetricsParserWithOpts

Signed-off-by: Manik Rana <[email protected]>

* chore: lint

Signed-off-by: Manik Rana <[email protected]>

* chore: comments

Co-authored-by: Arthur Silva Sens <[email protected]>
Signed-off-by: Manik Rana <[email protected]>

* chore: comments

Co-authored-by: Arthur Silva Sens <[email protected]>
Signed-off-by: Manik Rana <[email protected]>

* chore: rename var

Signed-off-by: Manik Rana <[email protected]>

* fix: add condition for OM fuzzing

Signed-off-by: Manik Rana <[email protected]>

* fix: build tags

Signed-off-by: Manik Rana <[email protected]>

* refac: default skipCT to false

Signed-off-by: Manik Rana <[email protected]>

* refac: rename skipCT to skipCTSeries

Signed-off-by: Manik Rana <[email protected]>

* chore: formatting

Signed-off-by: Manik Rana <[email protected]>

* chore: comments and readability updates

Signed-off-by: Manik Rana <[email protected]>

* chore: comments

Co-authored-by: Bartlomiej Plotka <[email protected]>
Signed-off-by: Manik Rana <[email protected]>

* refac: remove NewOpenMetricsParserWithOpts

Signed-off-by: Manik Rana <[email protected]>

* chore: lint

Signed-off-by: Manik Rana <[email protected]>

* refac: extract skipCTSeries logic from parseMetricSuffix

Signed-off-by: Manik Rana <[email protected]>

* refac: inline create a NewOpenMetricsParser

Signed-off-by: Manik Rana <[email protected]>

* chore: comments

Signed-off-by: Manik Rana <[email protected]>

* chore: comments

Co-authored-by: Bartlomiej Plotka <[email protected]>
Signed-off-by: Manik Rana <[email protected]>

* refac: improve error handling

Signed-off-by: Manik Rana <[email protected]>

* fix: return error instead of nil

Signed-off-by: Manik Rana <[email protected]>

* fix: remove skipCT check from tBraceOpen

Signed-off-by: Manik Rana <[email protected]>

* Pair programming with Manik, Arthur and Daniel.

Signed-off-by: bwplotka <[email protected]>

* chore: comments and use helper funcs

Signed-off-by: Manik Rana <[email protected]>

* chore: lint

Signed-off-by: Manik Rana <[email protected]>

---------

Signed-off-by: Manik Rana <[email protected]>
Signed-off-by: Manik Rana <[email protected]>
Signed-off-by: bwplotka <[email protected]>
Co-authored-by: Arthur Silva Sens <[email protected]>
Co-authored-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
3 people authored Aug 8, 2024
1 parent b7a58dc commit 02c465b
Show file tree
Hide file tree
Showing 4 changed files with 503 additions and 35 deletions.
179 changes: 157 additions & 22 deletions model/textparse/openmetricsparse.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,46 @@ type OpenMetricsParser struct {
exemplarVal float64
exemplarTs int64
hasExemplarTs bool

skipCTSeries bool
}

type openMetricsParserOptions struct {
SkipCTSeries bool
}

// NewOpenMetricsParser returns a new parser of the byte slice.
func NewOpenMetricsParser(b []byte, st *labels.SymbolTable) Parser {
return &OpenMetricsParser{
l: &openMetricsLexer{b: b},
builder: labels.NewScratchBuilderWithSymbolTable(st, 16),
type OpenMetricsOption func(*openMetricsParserOptions)

// WithOMParserCTSeriesSkipped turns off exposing _created lines
// as series, which makes those only used for parsing created timestamp
// for `CreatedTimestamp` method purposes.
//
// It's recommended to use this option to avoid using _created lines for other
// purposes than created timestamp, but leave false by default for the
// best-effort compatibility.
func WithOMParserCTSeriesSkipped() OpenMetricsOption {
return func(o *openMetricsParserOptions) {
o.SkipCTSeries = true
}
}

// NewOpenMetricsParser returns a new parser for the byte slice with option to skip CT series parsing.
func NewOpenMetricsParser(b []byte, st *labels.SymbolTable, opts ...OpenMetricsOption) Parser {
options := &openMetricsParserOptions{}

for _, opt := range opts {
opt(options)
}

parser := &OpenMetricsParser{
l: &openMetricsLexer{b: b},
builder: labels.NewScratchBuilderWithSymbolTable(st, 16),
skipCTSeries: options.SkipCTSeries,
}

return parser
}

// Series returns the bytes of the series, the timestamp if set, and the value
// of the current sample.
func (p *OpenMetricsParser) Series() ([]byte, *int64, float64) {
Expand Down Expand Up @@ -219,10 +249,90 @@ func (p *OpenMetricsParser) Exemplar(e *exemplar.Exemplar) bool {
return true
}

// CreatedTimestamp returns nil as it's not implemented yet.
// TODO(bwplotka): https://github.com/prometheus/prometheus/issues/12980
// CreatedTimestamp returns the created timestamp for a current Metric if exists or nil.
// NOTE(Maniktherana): Might use additional CPU/mem resources due to deep copy of parser required for peeking given 1.0 OM specification on _created series.
func (p *OpenMetricsParser) CreatedTimestamp() *int64 {
return nil
if !TypeRequiresCT(p.mtype) {
// Not a CT supported metric type, fast path.
return nil
}

var (
currLset labels.Labels
buf []byte
peekWithoutNameLsetHash uint64
)
p.Metric(&currLset)
currFamilyLsetHash, buf := currLset.HashWithoutLabels(buf, labels.MetricName, "le", "quantile")
// Search for the _created line for the currFamilyLsetHash using ephemeral parser until
// we see EOF or new metric family. We have to do it as we don't know where (and if)
// that CT line is.
// TODO(bwplotka): Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this.
peek := deepCopy(p)
for {
eType, err := peek.Next()
if err != nil {
// This means peek will give error too later on, so def no CT line found.
// This might result in partial scrape with wrong/missing CT, but only
// spec improvement would help.
// TODO(bwplotka): Make sure OM 1.1/2.0 pass CT via metadata or exemplar-like to avoid this.
return nil
}
if eType != EntrySeries {
// Assume we hit different family, no CT line found.
return nil
}

var peekedLset labels.Labels
peek.Metric(&peekedLset)
peekedName := peekedLset.Get(model.MetricNameLabel)
if !strings.HasSuffix(peekedName, "_created") {
// Not a CT line, search more.
continue
}

// We got a CT line here, but let's search if CT line is actually for our series, edge case.
peekWithoutNameLsetHash, _ = peekedLset.HashWithoutLabels(buf, labels.MetricName, "le", "quantile")
if peekWithoutNameLsetHash != currFamilyLsetHash {
// CT line for a different series, for our series no CT.
return nil
}
ct := int64(peek.val)
return &ct
}
}

// TypeRequiresCT returns true if the metric type requires a _created timestamp.
func TypeRequiresCT(t model.MetricType) bool {
switch t {
case model.MetricTypeCounter, model.MetricTypeSummary, model.MetricTypeHistogram:
return true
default:
return false
}
}

// deepCopy creates a copy of a parser without re-using the slices' original memory addresses.
func deepCopy(p *OpenMetricsParser) OpenMetricsParser {
newB := make([]byte, len(p.l.b))
copy(newB, p.l.b)

newLexer := &openMetricsLexer{
b: newB,
i: p.l.i,
start: p.l.start,
err: p.l.err,
state: p.l.state,
}

newParser := OpenMetricsParser{
l: newLexer,
builder: p.builder,
mtype: p.mtype,
val: p.val,
skipCTSeries: false,
}
return newParser
}

// nextToken returns the next token from the openMetricsLexer.
Expand Down Expand Up @@ -337,7 +447,13 @@ func (p *OpenMetricsParser) Next() (Entry, error) {
}

p.series = p.l.b[p.start:p.l.i]
return p.parseMetricSuffix(p.nextToken())
if err := p.parseSeriesEndOfLine(p.nextToken()); err != nil {
return EntryInvalid, err
}
if p.skipCTSeries && p.isCreatedSeries() {
return p.Next()
}
return EntrySeries, nil
case tMName:
p.offsets = append(p.offsets, p.start, p.l.i)
p.series = p.l.b[p.start:p.l.i]
Expand All @@ -351,8 +467,14 @@ func (p *OpenMetricsParser) Next() (Entry, error) {
p.series = p.l.b[p.start:p.l.i]
t2 = p.nextToken()
}
return p.parseMetricSuffix(t2)

if err := p.parseSeriesEndOfLine(t2); err != nil {
return EntryInvalid, err
}
if p.skipCTSeries && p.isCreatedSeries() {
return p.Next()
}
return EntrySeries, nil
default:
err = p.parseError("expected a valid start token", t)
}
Expand Down Expand Up @@ -467,51 +589,64 @@ func (p *OpenMetricsParser) parseLVals(offsets []int, isExemplar bool) ([]int, e
}
}

// parseMetricSuffix parses the end of the line after the metric name and
// labels. It starts parsing with the provided token.
func (p *OpenMetricsParser) parseMetricSuffix(t token) (Entry, error) {
// isCreatedSeries returns true if the current series is a _created series.
func (p *OpenMetricsParser) isCreatedSeries() bool {
var newLbs labels.Labels
p.Metric(&newLbs)
name := newLbs.Get(model.MetricNameLabel)
if TypeRequiresCT(p.mtype) && strings.HasSuffix(name, "_created") {
return true
}
return false
}

// parseSeriesEndOfLine parses the series end of the line (value, optional
// timestamp, commentary, etc.) after the metric name and labels.
// It starts parsing with the provided token.
func (p *OpenMetricsParser) parseSeriesEndOfLine(t token) error {
if p.offsets[0] == -1 {
return EntryInvalid, fmt.Errorf("metric name not set while parsing: %q", p.l.b[p.start:p.l.i])
return fmt.Errorf("metric name not set while parsing: %q", p.l.b[p.start:p.l.i])
}

var err error
p.val, err = p.getFloatValue(t, "metric")
if err != nil {
return EntryInvalid, err
return err
}

p.hasTS = false
switch t2 := p.nextToken(); t2 {
case tEOF:
return EntryInvalid, errors.New("data does not end with # EOF")
return errors.New("data does not end with # EOF")
case tLinebreak:
break
case tComment:
if err := p.parseComment(); err != nil {
return EntryInvalid, err
return err
}
case tTimestamp:
p.hasTS = true
var ts float64
// A float is enough to hold what we need for millisecond resolution.
if ts, err = parseFloat(yoloString(p.l.buf()[1:])); err != nil {
return EntryInvalid, fmt.Errorf("%w while parsing: %q", err, p.l.b[p.start:p.l.i])
return fmt.Errorf("%w while parsing: %q", err, p.l.b[p.start:p.l.i])
}
if math.IsNaN(ts) || math.IsInf(ts, 0) {
return EntryInvalid, fmt.Errorf("invalid timestamp %f", ts)
return fmt.Errorf("invalid timestamp %f", ts)
}
p.ts = int64(ts * 1000)
switch t3 := p.nextToken(); t3 {
case tLinebreak:
case tComment:
if err := p.parseComment(); err != nil {
return EntryInvalid, err
return err
}
default:
return EntryInvalid, p.parseError("expected next entry after timestamp", t3)
return p.parseError("expected next entry after timestamp", t3)
}
}
return EntrySeries, nil

return nil
}

func (p *OpenMetricsParser) getFloatValue(t token, after string) (float64, error) {
Expand Down
Loading

0 comments on commit 02c465b

Please sign in to comment.