Skip to content

Commit

Permalink
cmd/bosun: add opentsdb query functions with end duration parameter. (#…
Browse files Browse the repository at this point in the history
…2310)

Add OverQuery(), BandQuery() extending Over(), Band() to support end time

Adds more general versions of Over() and Band() that support an eduration
parameter to set the end time of the query, like with q() (#2297)
  • Loading branch information
Danny Boland authored and kylebrandt committed Sep 24, 2018
1 parent aaa574d commit c8d22c5
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 75 deletions.
73 changes: 73 additions & 0 deletions cmd/bosun/expr/expr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,19 @@ var queryTime = time.Date(2000, 1, 1, 12, 0, 0, 0, time.UTC)

func TestQueryExpr(t *testing.T) {
queries := map[string]opentsdb.ResponseSet{
`q("avg:m{a=*}", "9.467277e+08", "9.46728e+08")`: {
{
Metric: "m",
Tags: opentsdb.TagSet{"a": "b"},
DPS: map[string]opentsdb.Point{"0": 0, "1": 3},
},
{
Metric: "m",
Tags: opentsdb.TagSet{"a": "c"},
DPS: map[string]opentsdb.Point{"5": 1, "7": 4},
},
},

`q("avg:m{a=*}", "9.467241e+08", "9.467244e+08")`: {
{
Metric: "m",
Expand Down Expand Up @@ -173,6 +186,66 @@ func TestQueryExpr(t *testing.T) {
d.Add(time.Second * 8): 8.5,
},
},
`over("avg:m{a=*}", "5m", "1h", 3)`: {
"a=b,shift=0s": Series{
d: 0,
d.Add(time.Second * 1): 3,
},
"a=b,shift=1h0m0s": Series{
d.Add(time.Hour): 1,
d.Add(time.Hour + time.Second*1): 2,
},
"a=b,shift=2h0m0s": Series{
d.Add(time.Hour*2 + time.Second*2): 6,
d.Add(time.Hour*2 + time.Second*3): 4,
},
"a=c,shift=0s": Series{
d.Add(time.Second * 5): 1,
d.Add(time.Second * 7): 4,
},
"a=c,shift=1h0m0s": Series{
d.Add(time.Hour + time.Second*3): 7,
d.Add(time.Hour + time.Second*1): 8,
},
"a=d,shift=2h0m0s": Series{
d.Add(time.Hour*2 + time.Second*8): 8,
d.Add(time.Hour*2 + time.Second*9): 9,
},
},
`band("avg:m{a=*}", "5m", "1h", 2)`: {
"a=b": Series{
d: 1,
d.Add(time.Second * 1): 2,
d.Add(time.Second * 2): 6,
d.Add(time.Second * 3): 4,
},
"a=c": Series{
d.Add(time.Second * 3): 7,
d.Add(time.Second * 1): 8,
},
"a=d": Series{
d.Add(time.Second * 8): 8,
d.Add(time.Second * 9): 9,
},
},
`shiftBand("avg:m{a=*}", "5m", "1h", 2)`: {
"a=b,shift=1h0m0s": Series{
d.Add(time.Hour): 1,
d.Add(time.Hour + time.Second*1): 2,
},
"a=b,shift=2h0m0s": Series{
d.Add(time.Hour*2 + time.Second*2): 6,
d.Add(time.Hour*2 + time.Second*3): 4,
},
"a=c,shift=1h0m0s": Series{
d.Add(time.Hour + time.Second*3): 7,
d.Add(time.Hour + time.Second*1): 8,
},
"a=d,shift=2h0m0s": Series{
d.Add(time.Hour*2 + time.Second*8): 8,
d.Add(time.Hour*2 + time.Second*9): 9,
},
},
`abs(-1)`: {"": Number(1)},
}

Expand Down
117 changes: 43 additions & 74 deletions cmd/bosun/expr/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ var TSDB = map[string]parse.Func{
Tags: tagQuery,
F: Band,
},
"bandQuery": {
Args: []models.FuncType{models.TypeString, models.TypeString, models.TypeString, models.TypeString, models.TypeScalar},
Return: models.TypeSeriesSet,
Tags: tagQuery,
F: BandQuery,
},
"shiftBand": {
Args: []models.FuncType{models.TypeString, models.TypeString, models.TypeString, models.TypeScalar},
Return: models.TypeSeriesSet,
Expand All @@ -35,6 +41,12 @@ var TSDB = map[string]parse.Func{
Tags: tagQuery,
F: Over,
},
"overQuery": {
Args: []models.FuncType{models.TypeString, models.TypeString, models.TypeString, models.TypeString, models.TypeScalar},
Return: models.TypeSeriesSet,
Tags: tagQuery,
F: OverQuery,
},
"change": {
Args: []models.FuncType{models.TypeString, models.TypeString, models.TypeString},
Return: models.TypeNumberSet,
Expand Down Expand Up @@ -102,7 +114,7 @@ func timeTSDBRequest(e *State, req *opentsdb.Request) (s opentsdb.ResponseSet, e
return
}

func bandTSDB(e *State, query, duration, period string, num float64, rfunc func(*Results, *opentsdb.Response, time.Duration) error) (r *Results, err error) {
func bandTSDB(e *State, query, duration, period, eduration string, num float64, rfunc func(*Results, *opentsdb.Response, time.Duration) error) (r *Results, err error) {
r = new(Results)
r.IgnoreOtherUnjoined = true
r.IgnoreUnjoined = true
Expand Down Expand Up @@ -132,16 +144,23 @@ func bandTSDB(e *State, query, duration, period string, num float64, rfunc func(
req := opentsdb.Request{
Queries: []*opentsdb.Query{q},
}
now := e.now
req.End = now.Unix()
req.Start = now.Add(time.Duration(-d)).Unix()
end := e.now
if eduration != "" {
var ed opentsdb.Duration
ed, err = opentsdb.ParseDuration(eduration)
if err != nil {
return
}
end = end.Add(time.Duration(-ed))
}
req.End = end.Unix()
req.Start = end.Add(time.Duration(-d)).Unix()
if err = req.SetTime(e.now); err != nil {
return
}
for i := 0; i < int(num); i++ {
now = now.Add(time.Duration(-p))
req.End = now.Unix()
req.Start = now.Add(time.Duration(-d)).Unix()
req.End = end.Unix()
req.Start = end.Add(time.Duration(-d)).Unix()
var s opentsdb.ResponseSet
s, err = timeTSDBRequest(e, &req)
if err != nil {
Expand All @@ -152,11 +171,12 @@ func bandTSDB(e *State, query, duration, period string, num float64, rfunc func(
continue
}
//offset := e.now.Sub(now.Add(time.Duration(p-d)))
offset := e.now.Sub(now)
offset := e.now.Sub(end)
if err = rfunc(r, res, offset); err != nil {
return
}
}
end = end.Add(time.Duration(-p))
}
})
return
Expand Down Expand Up @@ -236,7 +256,7 @@ func Window(e *State, query, duration, period string, num float64, rfunc string)
}
return nil
}
r, err := bandTSDB(e, query, duration, period, num, bandFn)
r, err := bandTSDB(e, query, duration, period, period, num, bandFn)
if err != nil {
err = fmt.Errorf("expr: Window: %v", err)
}
Expand Down Expand Up @@ -268,8 +288,8 @@ func windowCheck(t *parse.Tree, f *parse.FuncNode) error {
return nil
}

func Band(e *State, query, duration, period string, num float64) (r *Results, err error) {
r, err = bandTSDB(e, query, duration, period, num, func(r *Results, res *opentsdb.Response, offset time.Duration) error {
func BandQuery(e *State, query, duration, period, eduration string, num float64) (r *Results, err error) {
r, err = bandTSDB(e, query, duration, period, eduration, num, func(r *Results, res *opentsdb.Response, offset time.Duration) error {
newarr := true
for _, a := range r.Results {
if !a.Group.Equal(res.Tags) {
Expand Down Expand Up @@ -306,8 +326,8 @@ func Band(e *State, query, duration, period string, num float64) (r *Results, er
return
}

func ShiftBand(e *State, query, duration, period string, num float64) (r *Results, err error) {
r, err = bandTSDB(e, query, duration, period, num, func(r *Results, res *opentsdb.Response, offset time.Duration) error {
func OverQuery(e *State, query, duration, period, eduration string, num float64) (r *Results, err error) {
r, err = bandTSDB(e, query, duration, period, eduration, num, func(r *Results, res *opentsdb.Response, offset time.Duration) error {
values := make(Series)
a := &Result{Group: res.Tags.Merge(opentsdb.TagSet{"shift": offset.String()})}
for k, v := range res.DPS {
Expand All @@ -327,68 +347,17 @@ func ShiftBand(e *State, query, duration, period string, num float64) (r *Result
return
}

func Band(e *State, query, duration, period string, num float64) (r *Results, err error) {
// existing Band behaviour is to end 'period' ago, so pass period as eduration.
return BandQuery(e, query, duration, period, period, num)
}

func ShiftBand(e *State, query, duration, period string, num float64) (r *Results, err error) {
return OverQuery(e, query, duration, period, period, num)
}

func Over(e *State, query, duration, period string, num float64) (r *Results, err error) {
r = new(Results)
r.IgnoreOtherUnjoined = true
r.IgnoreUnjoined = true
e.Timer.Step("band", func(T miniprofiler.Timer) {
var d, p opentsdb.Duration
d, err = opentsdb.ParseDuration(duration)
if err != nil {
return
}
p, err = opentsdb.ParseDuration(period)
if err != nil {
return
}
if num < 1 || num > 100 {
err = fmt.Errorf("num out of bounds")
}
var q *opentsdb.Query
q, err = opentsdb.ParseQuery(query, e.TSDBContext.Version())
if err != nil {
return
}
if !e.TSDBContext.Version().FilterSupport() {
if err = e.Search.Expand(q); err != nil {
return
}
}
req := opentsdb.Request{
Queries: []*opentsdb.Query{q},
}
now := e.now
req.End = now.Unix()
req.Start = now.Add(time.Duration(-d)).Unix()
for i := 0; i < int(num); i++ {
var s opentsdb.ResponseSet
s, err = timeTSDBRequest(e, &req)
if err != nil {
return
}
offset := e.now.Sub(now)
for _, res := range s {
if e.Squelched(res.Tags) {
continue
}
values := make(Series)
a := &Result{Group: res.Tags.Merge(opentsdb.TagSet{"shift": offset.String()})}
for k, v := range res.DPS {
i, err := strconv.ParseInt(k, 10, 64)
if err != nil {
return
}
values[time.Unix(i, 0).Add(offset).UTC()] = float64(v)
}
a.Value = values
r.Results = append(r.Results, a)
}
now = now.Add(time.Duration(-p))
req.End = now.Unix()
req.Start = now.Add(time.Duration(-d)).Unix()
}
})
return
return OverQuery(e, query, duration, period, "", num)
}

func Query(e *State, query, sduration, eduration string) (r *Results, err error) {
Expand Down
11 changes: 10 additions & 1 deletion docs/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,19 @@ Generic query from endDuration to startDuration ago. If endDuration is the empty

Band performs `num` queries of `duration` each, `period` apart and concatenates them together, starting `period` ago. So `band("avg:os.cpu", "1h", "1d", 7)` will return a series comprising of the given metric from 1d to 1d-1h-ago, 2d to 2d-1h-ago, etc, until 8d. This is a good way to get a time block from a certain hour of a day or certain day of a week over a long time period.

Note: this function wraps a more general version `bandQuery(query string, duration string, period string, eduration string, num scalar) seriesSet`, where `eduration` specifies the end duration for the query to stop at, as with `q()`.

### over(query string, duration string, period string, num scalar) seriesSet
{: .exprFunc}

Over's arguments behave the same way as band. However over shifts the time of previous periods to be now, tags them with duration that each period was shifted, and merges those shifted periods into a single seriesSet. This is useful for displaying time over time graphs. For example, the same day week over week would be `over("avg:1h-avg:rate:os.cpu{host=ny-bosun01}", "1d", "1w", 4)`.
Over's arguments behave the same way as band. However over shifts the time of previous periods to be now, tags them with duration that each period was shifted, and merges those shifted periods into a single seriesSet, which includes the most recent period. This is useful for displaying time over time graphs. For example, the same day week over week would be `over("avg:1h-avg:rate:os.cpu{host=ny-bosun01}", "1d", "1w", 4)`.

Note: this function wraps a more general version `overQuery(query string, duration string, period string, eduration string, num scalar) seriesSet`, where `eduration` specifies the end duration for the query to stop at, as with `q`. Results are still shifted to end at current time.

### shiftBand(query string, duration string, period string, num scalar) seriesSet
{: .exprFunc}

shiftBand's behaviour is very similar to `over`, however the most recent period is not included in the seriesSet. This function could be useful for anomaly detection when used with `aggr`, to calculate historical distributions to compare against.

### change(query string, startDuration string, endDuration string) numberSet
{: .exprFunc}
Expand Down

0 comments on commit c8d22c5

Please sign in to comment.