Skip to content

Commit

Permalink
Merge pull request influxdata#281 from bonitoo-io/feat/v3/query
Browse files Browse the repository at this point in the history
feat(v3): common HTTP handling code + query
  • Loading branch information
vlastahajek authored Apr 26, 2022
2 parents 018dcc7 + 3f89808 commit ad6f3cf
Show file tree
Hide file tree
Showing 11 changed files with 803 additions and 130 deletions.
169 changes: 169 additions & 0 deletions influxclient/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2021 InfluxData, Inc. All rights reserved.
// Use of this source code is governed by MIT
// license that can be found in the LICENSE file.

// Package influxclient provides client for InfluxDB server.
package influxclient

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"mime"
"net/http"
"net/url"
"strings"
)

// Params holds the parameters for creating a new client.
// The only mandatory field is ServerURL. AuthToken is also important
// if authentication was not done outside this client.
type Params struct {
// ServerURL holds the URL of the InfluxDB server to connect to.
// This must be non-empty. E.g. http://localhost:8086
ServerURL string

// AuthToken holds the authorization token for the API.
// This can be obtained through the GUI web browser interface.
AuthToken string

// Organization is name or ID of organization where data (buckets, users, tasks, etc.) belongs to
Organization string

// HTTPClient is used to make API requests.
//
// This can be used to specify a custom TLS configuration
// (TLSClientConfig), a custom request timeout (Timeout),
// or other customization as required.
//
// It HTTPClient is nil, http.DefaultClient will be used.
HTTPClient *http.Client
}

// Client implements an InfluxDB client.
type Client struct {
// Configuration params.
params Params
// Pre-created Authorization HTTP header value.
authorization string
// Cached base server API URL.
apiURL *url.URL
}

// httpParams holds parameters for creating an HTTP request
type httpParams struct {
// URL of server endpoint
endpointURL *url.URL
// Params to be added to URL
queryParams url.Values
// HTTP request method, eg. POST
httpMethod string
// HTTP request headers
headers map[string]string
// HTTP POST/PUT body
body io.Reader
}

// New creates new Client with given Params, where ServerURL and AuthToken are mandatory.
func New(params Params) (*Client, error) {
c := &Client{params: params}
if params.ServerURL == "" {
return nil, errors.New("empty server URL")
}
if c.params.AuthToken != "" {
c.authorization = "Token " + c.params.AuthToken
}
if c.params.HTTPClient == nil {
c.params.HTTPClient = http.DefaultClient
}

serverAddress := params.ServerURL
if !strings.HasSuffix(serverAddress, "/") {
// For subsequent path parts concatenation, url has to end with '/'
serverAddress = params.ServerURL + "/"
}
var err error
// Prepare server API URL
c.apiURL, err = url.Parse(serverAddress + "api/v2/")
if err != nil {
return nil, fmt.Errorf("error parsing server URL: %w", err)
}
return c, nil
}

// makeAPICall issues an HTTP request to InfluxDB server API url according to parameters.
// Additionally, sets Authorization header and User-Agent.
// It returns http.Response or error. Error can be a *ServerError if server responded with error.
func (c *Client) makeAPICall(ctx context.Context, params httpParams) (*http.Response, error) {
// copy URL
urlObj := *params.endpointURL
urlObj.RawQuery = params.queryParams.Encode()

fullURL := urlObj.String()

req, err := http.NewRequestWithContext(ctx, params.httpMethod, fullURL, params.body)
if err != nil {
return nil, fmt.Errorf("error calling %s: %v", fullURL, err)
}
for k, v := range params.headers {
req.Header.Set(k, v)
}
req.Header.Set("User-Agent", userAgent)
if c.authorization != "" {
req.Header.Add("Authorization", c.authorization)
}

resp, err := c.params.HTTPClient.Do(req)
if err != nil {
return nil, fmt.Errorf("error calling %s: %v", fullURL, err)
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, c.resolveHTTPError(resp)
}

return resp, nil
}

// resolveHTTPError parses server error response and returns error with human-readable message
func (c *Client) resolveHTTPError(r *http.Response) error {
// successful status code range
if r.StatusCode >= 200 && r.StatusCode < 300 {
return nil
}

var httpError struct {
ServerError
// Error message of InfluxDB 1 error
Error string `json:"error"`
}

httpError.StatusCode = r.StatusCode

body, err := ioutil.ReadAll(r.Body)
if err != nil {
httpError.Message = fmt.Sprintf("cannot read error response:: %v", err)
}
ctype, _, _ := mime.ParseMediaType(r.Header.Get("Content-Type"))
if ctype == "application/json" {
err := json.Unmarshal(body, &httpError)
if err != nil {
httpError.Message = fmt.Sprintf("cannot decode error response: %v", err)
}
if httpError.Message == "" && httpError.Code == "" {
httpError.Message = httpError.Error
}
}
if httpError.Message == "" {
//TODO: "This could be a large piece of unreadable body; we might be able to do better than this"
if len(body) > 0 {
httpError.Message = string(body)
} else {
httpError.Message = r.Status
}
}

return &httpError.ServerError
}
180 changes: 180 additions & 0 deletions influxclient/client_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Copyright 2021 InfluxData, Inc. All rights reserved.
// Use of this source code is governed by MIT
// license that can be found in the LICENSE file.

package influxclient

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/url"
"reflect"
"time"
)

// dialect defines attributes of Flux query response header.
type dialect struct {
Annotations []string `json:"annotations"`
Delimiter string `json:"delimiter"`
Header bool `json:"header"`
}

// queryBody holds the body for an HTTP query request.
type queryBody struct {
Dialect dialect `json:"dialect"`
Query string `json:"query"`
Type string `json:"type"`
Params interface{} `json:"params,omitempty"`
}

// defaultDialect is queryBody dialect value with all annotations, name header and comma as delimiter
var defaultDialect = dialect{
Annotations: []string{"datatype", "default", "group"},
Delimiter: ",",
Header: true,
}

// Query sends the given Flux query to server and returns QueryResultReader for further parsing result.
// The result must be closed after use.
//
// Flux query can contain reference to parameters, which must be passed via queryParams.
// it can be a struct or map. Param values can be only simple types or time.Time.
// Name of a struct field or a map key (must be a string) will be a param name.
//
// Fields of a struct can be more specified by json annotations:
//
// type Condition struct {
// Start time.Time `json:"start"`
// Field string `json:"field"`
// Value float64 `json:"value"`
// }
//
// cond := Condition {
// "iot_center",
// "Temperature",
// "-10m",
// 23.0,
// }
//
// Parameters are then accessed via the params object:
//
// query:= `from(bucket: "environment")
// |> range(start: time(v: params.start))
// |> filter(fn: (r) => r._measurement == "air")
// |> filter(fn: (r) => r._field == params.field)
// |> filter(fn: (r) => r._value > params.value)`
//
// And used in the call to Query:
//
// result, err := client.Query(ctx, query, cond);
//
// Use QueryResultReader.NextSection() for navigation to the sections in the query result set.
//
// Use QueryResultReader.NextRow() for iterating over rows in the section.
//
// Read the row raw data using QueryResultReader.Row()
// or deserialize data into a struct or a slice via QueryResultReader.Decode:
//
// val := &struct {
// Time time.Time `csv:"_time"`
// Temp float64 `csv:"_value"`
// Sensor string `csv:"sensor"`
// }{}
// err = result.Decode(val)
//
func (c *Client) Query(ctx context.Context, query string, queryParams interface{}) (*QueryResultReader, error) {
if err := checkParamsType(queryParams); err != nil {
return nil, err
}
queryURL, _ := c.apiURL.Parse("query")

q := queryBody{
Dialect: defaultDialect,
Query: query,
Type: "flux",
Params: queryParams,
}
qrJSON, err := json.Marshal(q)
if err != nil {
return nil, err
}
resp, err := c.makeAPICall(ctx, httpParams{
endpointURL: queryURL,
httpMethod: "POST",
headers: map[string]string{"Content-Type": "application/json"},
queryParams: url.Values{"org": []string{c.params.Organization}},
body: bytes.NewReader(qrJSON),
})
if err != nil {
return nil, err
}

return NewQueryResultReader(resp.Body), nil
}

// checkParamsType validates the value is struct with simple type fields
// or a map with key as string and value as a simple type
func checkParamsType(p interface{}) error {
if p == nil {
return nil
}
t := reflect.TypeOf(p)
v := reflect.ValueOf(p)
if t.Kind() == reflect.Ptr {
t = t.Elem()
v = v.Elem()
}
if t.Kind() != reflect.Struct && t.Kind() != reflect.Map {
return fmt.Errorf("cannot use %v as query params", t)
}
switch t.Kind() {
case reflect.Struct:
fields := reflect.VisibleFields(t)
for _, f := range fields {
fv := v.FieldByIndex(f.Index)
t := getFieldType(fv)
if !validParamType(t) {
return fmt.Errorf("cannot use field '%s' of type '%v' as a query param", f.Name, t)
}

}
case reflect.Map:
key := t.Key()
if key.Kind() != reflect.String {
return fmt.Errorf("cannot use map key of type '%v' for query param name", key)
}
for _, k := range v.MapKeys() {
f := v.MapIndex(k)
t := getFieldType(f)
if !validParamType(t) {
return fmt.Errorf("cannot use map value type '%v' as a query param", t)
}
}
}
return nil
}

// getFieldType extracts type of value
func getFieldType(v reflect.Value) reflect.Type {
t := v.Type()
if t.Kind() == reflect.Ptr {
t = t.Elem()
v = v.Elem()
}
if t.Kind() == reflect.Interface && !v.IsNil() {
t = reflect.ValueOf(v.Interface()).Type()
}
return t
}

// timeType is the exact type for the Time
var timeType = reflect.TypeOf(time.Time{})

// validParamType validates that t is primitive type or string or interface
func validParamType(t reflect.Type) bool {
return (t.Kind() > reflect.Invalid && t.Kind() < reflect.Complex64) ||
t.Kind() == reflect.String ||
t == timeType
}
Loading

0 comments on commit ad6f3cf

Please sign in to comment.