From c8d22c5752a5863f2d40aaf1bea3a2b169ef5788 Mon Sep 17 00:00:00 2001 From: Danny Boland Date: Mon, 24 Sep 2018 13:47:39 +0100 Subject: [PATCH] cmd/bosun: add opentsdb query functions with end duration parameter. (#2310) Add OverQuery(), BandQuery() extending Over(), Band() to support end time Adds more general versions of Over() and Band() that support an eduration parameter to set the end time of the query, like with q() (#2297) --- cmd/bosun/expr/expr_test.go | 73 ++++++++++++++++++++++ cmd/bosun/expr/tsdb.go | 117 +++++++++++++----------------------- docs/expressions.md | 11 +++- 3 files changed, 126 insertions(+), 75 deletions(-) diff --git a/cmd/bosun/expr/expr_test.go b/cmd/bosun/expr/expr_test.go index 3a744ee99d..1454674c80 100644 --- a/cmd/bosun/expr/expr_test.go +++ b/cmd/bosun/expr/expr_test.go @@ -122,6 +122,19 @@ var queryTime = time.Date(2000, 1, 1, 12, 0, 0, 0, time.UTC) func TestQueryExpr(t *testing.T) { queries := map[string]opentsdb.ResponseSet{ + `q("avg:m{a=*}", "9.467277e+08", "9.46728e+08")`: { + { + Metric: "m", + Tags: opentsdb.TagSet{"a": "b"}, + DPS: map[string]opentsdb.Point{"0": 0, "1": 3}, + }, + { + Metric: "m", + Tags: opentsdb.TagSet{"a": "c"}, + DPS: map[string]opentsdb.Point{"5": 1, "7": 4}, + }, + }, + `q("avg:m{a=*}", "9.467241e+08", "9.467244e+08")`: { { Metric: "m", @@ -173,6 +186,66 @@ func TestQueryExpr(t *testing.T) { d.Add(time.Second * 8): 8.5, }, }, + `over("avg:m{a=*}", "5m", "1h", 3)`: { + "a=b,shift=0s": Series{ + d: 0, + d.Add(time.Second * 1): 3, + }, + "a=b,shift=1h0m0s": Series{ + d.Add(time.Hour): 1, + d.Add(time.Hour + time.Second*1): 2, + }, + "a=b,shift=2h0m0s": Series{ + d.Add(time.Hour*2 + time.Second*2): 6, + d.Add(time.Hour*2 + time.Second*3): 4, + }, + "a=c,shift=0s": Series{ + d.Add(time.Second * 5): 1, + d.Add(time.Second * 7): 4, + }, + "a=c,shift=1h0m0s": Series{ + d.Add(time.Hour + time.Second*3): 7, + d.Add(time.Hour + time.Second*1): 8, + }, + "a=d,shift=2h0m0s": Series{ + d.Add(time.Hour*2 + time.Second*8): 8, + d.Add(time.Hour*2 + time.Second*9): 9, + }, + }, + `band("avg:m{a=*}", "5m", "1h", 2)`: { + "a=b": Series{ + d: 1, + d.Add(time.Second * 1): 2, + d.Add(time.Second * 2): 6, + d.Add(time.Second * 3): 4, + }, + "a=c": Series{ + d.Add(time.Second * 3): 7, + d.Add(time.Second * 1): 8, + }, + "a=d": Series{ + d.Add(time.Second * 8): 8, + d.Add(time.Second * 9): 9, + }, + }, + `shiftBand("avg:m{a=*}", "5m", "1h", 2)`: { + "a=b,shift=1h0m0s": Series{ + d.Add(time.Hour): 1, + d.Add(time.Hour + time.Second*1): 2, + }, + "a=b,shift=2h0m0s": Series{ + d.Add(time.Hour*2 + time.Second*2): 6, + d.Add(time.Hour*2 + time.Second*3): 4, + }, + "a=c,shift=1h0m0s": Series{ + d.Add(time.Hour + time.Second*3): 7, + d.Add(time.Hour + time.Second*1): 8, + }, + "a=d,shift=2h0m0s": Series{ + d.Add(time.Hour*2 + time.Second*8): 8, + d.Add(time.Hour*2 + time.Second*9): 9, + }, + }, `abs(-1)`: {"": Number(1)}, } diff --git a/cmd/bosun/expr/tsdb.go b/cmd/bosun/expr/tsdb.go index 94e86d2e32..f6912f8db8 100644 --- a/cmd/bosun/expr/tsdb.go +++ b/cmd/bosun/expr/tsdb.go @@ -23,6 +23,12 @@ var TSDB = map[string]parse.Func{ Tags: tagQuery, F: Band, }, + "bandQuery": { + Args: []models.FuncType{models.TypeString, models.TypeString, models.TypeString, models.TypeString, models.TypeScalar}, + Return: models.TypeSeriesSet, + Tags: tagQuery, + F: BandQuery, + }, "shiftBand": { Args: []models.FuncType{models.TypeString, models.TypeString, models.TypeString, models.TypeScalar}, Return: models.TypeSeriesSet, @@ -35,6 +41,12 @@ var TSDB = map[string]parse.Func{ Tags: tagQuery, F: Over, }, + "overQuery": { + Args: []models.FuncType{models.TypeString, models.TypeString, models.TypeString, models.TypeString, models.TypeScalar}, + Return: models.TypeSeriesSet, + Tags: tagQuery, + F: OverQuery, + }, "change": { Args: []models.FuncType{models.TypeString, models.TypeString, models.TypeString}, Return: models.TypeNumberSet, @@ -102,7 +114,7 @@ func timeTSDBRequest(e *State, req *opentsdb.Request) (s opentsdb.ResponseSet, e return } -func bandTSDB(e *State, query, duration, period string, num float64, rfunc func(*Results, *opentsdb.Response, time.Duration) error) (r *Results, err error) { +func bandTSDB(e *State, query, duration, period, eduration string, num float64, rfunc func(*Results, *opentsdb.Response, time.Duration) error) (r *Results, err error) { r = new(Results) r.IgnoreOtherUnjoined = true r.IgnoreUnjoined = true @@ -132,16 +144,23 @@ func bandTSDB(e *State, query, duration, period string, num float64, rfunc func( req := opentsdb.Request{ Queries: []*opentsdb.Query{q}, } - now := e.now - req.End = now.Unix() - req.Start = now.Add(time.Duration(-d)).Unix() + end := e.now + if eduration != "" { + var ed opentsdb.Duration + ed, err = opentsdb.ParseDuration(eduration) + if err != nil { + return + } + end = end.Add(time.Duration(-ed)) + } + req.End = end.Unix() + req.Start = end.Add(time.Duration(-d)).Unix() if err = req.SetTime(e.now); err != nil { return } for i := 0; i < int(num); i++ { - now = now.Add(time.Duration(-p)) - req.End = now.Unix() - req.Start = now.Add(time.Duration(-d)).Unix() + req.End = end.Unix() + req.Start = end.Add(time.Duration(-d)).Unix() var s opentsdb.ResponseSet s, err = timeTSDBRequest(e, &req) if err != nil { @@ -152,11 +171,12 @@ func bandTSDB(e *State, query, duration, period string, num float64, rfunc func( continue } //offset := e.now.Sub(now.Add(time.Duration(p-d))) - offset := e.now.Sub(now) + offset := e.now.Sub(end) if err = rfunc(r, res, offset); err != nil { return } } + end = end.Add(time.Duration(-p)) } }) return @@ -236,7 +256,7 @@ func Window(e *State, query, duration, period string, num float64, rfunc string) } return nil } - r, err := bandTSDB(e, query, duration, period, num, bandFn) + r, err := bandTSDB(e, query, duration, period, period, num, bandFn) if err != nil { err = fmt.Errorf("expr: Window: %v", err) } @@ -268,8 +288,8 @@ func windowCheck(t *parse.Tree, f *parse.FuncNode) error { return nil } -func Band(e *State, query, duration, period string, num float64) (r *Results, err error) { - r, err = bandTSDB(e, query, duration, period, num, func(r *Results, res *opentsdb.Response, offset time.Duration) error { +func BandQuery(e *State, query, duration, period, eduration string, num float64) (r *Results, err error) { + r, err = bandTSDB(e, query, duration, period, eduration, num, func(r *Results, res *opentsdb.Response, offset time.Duration) error { newarr := true for _, a := range r.Results { if !a.Group.Equal(res.Tags) { @@ -306,8 +326,8 @@ func Band(e *State, query, duration, period string, num float64) (r *Results, er return } -func ShiftBand(e *State, query, duration, period string, num float64) (r *Results, err error) { - r, err = bandTSDB(e, query, duration, period, num, func(r *Results, res *opentsdb.Response, offset time.Duration) error { +func OverQuery(e *State, query, duration, period, eduration string, num float64) (r *Results, err error) { + r, err = bandTSDB(e, query, duration, period, eduration, num, func(r *Results, res *opentsdb.Response, offset time.Duration) error { values := make(Series) a := &Result{Group: res.Tags.Merge(opentsdb.TagSet{"shift": offset.String()})} for k, v := range res.DPS { @@ -327,68 +347,17 @@ func ShiftBand(e *State, query, duration, period string, num float64) (r *Result return } +func Band(e *State, query, duration, period string, num float64) (r *Results, err error) { + // existing Band behaviour is to end 'period' ago, so pass period as eduration. + return BandQuery(e, query, duration, period, period, num) +} + +func ShiftBand(e *State, query, duration, period string, num float64) (r *Results, err error) { + return OverQuery(e, query, duration, period, period, num) +} + func Over(e *State, query, duration, period string, num float64) (r *Results, err error) { - r = new(Results) - r.IgnoreOtherUnjoined = true - r.IgnoreUnjoined = true - e.Timer.Step("band", func(T miniprofiler.Timer) { - var d, p opentsdb.Duration - d, err = opentsdb.ParseDuration(duration) - if err != nil { - return - } - p, err = opentsdb.ParseDuration(period) - if err != nil { - return - } - if num < 1 || num > 100 { - err = fmt.Errorf("num out of bounds") - } - var q *opentsdb.Query - q, err = opentsdb.ParseQuery(query, e.TSDBContext.Version()) - if err != nil { - return - } - if !e.TSDBContext.Version().FilterSupport() { - if err = e.Search.Expand(q); err != nil { - return - } - } - req := opentsdb.Request{ - Queries: []*opentsdb.Query{q}, - } - now := e.now - req.End = now.Unix() - req.Start = now.Add(time.Duration(-d)).Unix() - for i := 0; i < int(num); i++ { - var s opentsdb.ResponseSet - s, err = timeTSDBRequest(e, &req) - if err != nil { - return - } - offset := e.now.Sub(now) - for _, res := range s { - if e.Squelched(res.Tags) { - continue - } - values := make(Series) - a := &Result{Group: res.Tags.Merge(opentsdb.TagSet{"shift": offset.String()})} - for k, v := range res.DPS { - i, err := strconv.ParseInt(k, 10, 64) - if err != nil { - return - } - values[time.Unix(i, 0).Add(offset).UTC()] = float64(v) - } - a.Value = values - r.Results = append(r.Results, a) - } - now = now.Add(time.Duration(-p)) - req.End = now.Unix() - req.Start = now.Add(time.Duration(-d)).Unix() - } - }) - return + return OverQuery(e, query, duration, period, "", num) } func Query(e *State, query, sduration, eduration string) (r *Results, err error) { diff --git a/docs/expressions.md b/docs/expressions.md index 96b953c62a..d7b2119583 100644 --- a/docs/expressions.md +++ b/docs/expressions.md @@ -374,10 +374,19 @@ Generic query from endDuration to startDuration ago. If endDuration is the empty Band performs `num` queries of `duration` each, `period` apart and concatenates them together, starting `period` ago. So `band("avg:os.cpu", "1h", "1d", 7)` will return a series comprising of the given metric from 1d to 1d-1h-ago, 2d to 2d-1h-ago, etc, until 8d. This is a good way to get a time block from a certain hour of a day or certain day of a week over a long time period. +Note: this function wraps a more general version `bandQuery(query string, duration string, period string, eduration string, num scalar) seriesSet`, where `eduration` specifies the end duration for the query to stop at, as with `q()`. + ### over(query string, duration string, period string, num scalar) seriesSet {: .exprFunc} -Over's arguments behave the same way as band. However over shifts the time of previous periods to be now, tags them with duration that each period was shifted, and merges those shifted periods into a single seriesSet. This is useful for displaying time over time graphs. For example, the same day week over week would be `over("avg:1h-avg:rate:os.cpu{host=ny-bosun01}", "1d", "1w", 4)`. +Over's arguments behave the same way as band. However over shifts the time of previous periods to be now, tags them with duration that each period was shifted, and merges those shifted periods into a single seriesSet, which includes the most recent period. This is useful for displaying time over time graphs. For example, the same day week over week would be `over("avg:1h-avg:rate:os.cpu{host=ny-bosun01}", "1d", "1w", 4)`. + +Note: this function wraps a more general version `overQuery(query string, duration string, period string, eduration string, num scalar) seriesSet`, where `eduration` specifies the end duration for the query to stop at, as with `q`. Results are still shifted to end at current time. + +### shiftBand(query string, duration string, period string, num scalar) seriesSet +{: .exprFunc} + +shiftBand's behaviour is very similar to `over`, however the most recent period is not included in the seriesSet. This function could be useful for anomaly detection when used with `aggr`, to calculate historical distributions to compare against. ### change(query string, startDuration string, endDuration string) numberSet {: .exprFunc}