Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #529: Partition API requests to CloudWatch into separate concur… #540

Merged
merged 3 commits into from
Jul 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 60 additions & 14 deletions metrics/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,42 @@ import (
"github.com/go-kit/kit/metrics/generic"
)

const (
maxConcurrentRequests = 20
)

// CloudWatch receives metrics observations and forwards them to CloudWatch.
// Create a CloudWatch object, use it to create metrics, and pass those metrics as
// dependencies to the components that will use them.
//
// To regularly report metrics to CloudWatch, use the WriteLoop helper method.
type CloudWatch struct {
mtx sync.RWMutex
namespace string
svc cloudwatchiface.CloudWatchAPI
counters map[string]*counter
gauges map[string]*gauge
histograms map[string]*histogram
logger log.Logger
mtx sync.RWMutex
sem chan struct{}
namespace string
numConcurrentRequests int
svc cloudwatchiface.CloudWatchAPI
counters map[string]*counter
gauges map[string]*gauge
histograms map[string]*histogram
logger log.Logger
}

// New returns a CloudWatch object that may be used to create metrics.
// Namespace is applied to all created metrics and maps to the CloudWatch namespace.
// NumConcurrent sets the number of simultaneous requests to Amazon.
// A good default value is 10 and the maximum is 20.
// Callers must ensure that regular calls to Send are performed, either
// manually or with one of the helper methods.
func New(namespace string, svc cloudwatchiface.CloudWatchAPI, logger log.Logger) *CloudWatch {
func New(namespace string, svc cloudwatchiface.CloudWatchAPI, numConcurrent int, logger log.Logger) *CloudWatch {
if numConcurrent > maxConcurrentRequests {
numConcurrent = maxConcurrentRequests
}

return &CloudWatch{
namespace: namespace,
sem: make(chan struct{}, numConcurrent),
namespace: namespace,
numConcurrentRequests: numConcurrent,
svc: svc,
counters: map[string]*counter{},
gauges: map[string]*gauge{},
Expand Down Expand Up @@ -133,11 +147,43 @@ func (cw *CloudWatch) Send() error {
}
}

_, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
Namespace: aws.String(cw.namespace),
MetricData: datums,
})
return err
var batches [][]*cloudwatch.MetricDatum
for len(datums) > 0 {
var batch []*cloudwatch.MetricDatum
lim := min(len(datums), maxConcurrentRequests)
batch, datums = datums[:lim], datums[lim:]
batches = append(batches, batch)
}

var errors = make(chan error, len(batches))
for _, batch := range batches {
go func(batch []*cloudwatch.MetricDatum) {
cw.sem <- struct{}{}
defer func() {
<-cw.sem
}()
_, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
Namespace: aws.String(cw.namespace),
MetricData: batch,
})
errors <- err
}(batch)
}
var firstErr error
for i := 0; i < cap(errors); i++ {
if err := <-errors; err != nil && firstErr != nil {
firstErr = err
}
}

return firstErr
}

func min(a, b int) int {
if a < b {
return a
}
return b
}

// counter is a CloudWatch counter metric.
Expand Down
42 changes: 39 additions & 3 deletions metrics/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package cloudwatch
import (
"errors"
"fmt"
"strconv"
"sync"
"testing"

"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/teststat"
)

Expand Down Expand Up @@ -64,7 +66,7 @@ func TestCounter(t *testing.T) {
namespace, name := "abc", "def"
label, value := "label", "value"
svc := newMockCloudWatch()
cw := New(namespace, svc, log.NewNopLogger())
cw := New(namespace, svc, 10, log.NewNopLogger())
counter := cw.NewCounter(name).With(label, value)
valuef := func() float64 {
err := cw.Send()
Expand All @@ -83,11 +85,45 @@ func TestCounter(t *testing.T) {
}
}

func TestCounterLowSendConcurrency(t *testing.T) {
namespace := "abc"
var names, labels, values []string
for i := 1; i <= 45; i++ {
num := strconv.Itoa(i)
names = append(names, "name"+num)
labels = append(labels, "label"+num)
values = append(values, "value"+num)
}
svc := newMockCloudWatch()
cw := New(namespace, svc, 2, log.NewNopLogger())

counters := make(map[string]metrics.Counter)
var wants []float64
for i, name := range names {
counters[name] = cw.NewCounter(name).With(labels[i], values[i])
wants = append(wants, teststat.FillCounter(counters[name]))
}

err := cw.Send()
if err != nil {
t.Fatal(err)
}

for i, name := range names {
if svc.valuesReceived[name] != wants[i] {
t.Fatal("want %f, have %f", wants[i], svc.valuesReceived[name])
}
if err := testDimensions(svc, name, labels[i], values[i]); err != nil {
t.Fatal(err)
}
}
}

func TestGauge(t *testing.T) {
namespace, name := "abc", "def"
label, value := "label", "value"
svc := newMockCloudWatch()
cw := New(namespace, svc, log.NewNopLogger())
cw := New(namespace, svc, 10, log.NewNopLogger())
gauge := cw.NewGauge(name).With(label, value)
valuef := func() float64 {
err := cw.Send()
Expand All @@ -110,7 +146,7 @@ func TestHistogram(t *testing.T) {
namespace, name := "abc", "def"
label, value := "label", "value"
svc := newMockCloudWatch()
cw := New(namespace, svc, log.NewNopLogger())
cw := New(namespace, svc, 10, log.NewNopLogger())
histogram := cw.NewHistogram(name, 50).With(label, value)
n50 := fmt.Sprintf("%s_50", name)
n90 := fmt.Sprintf("%s_90", name)
Expand Down
17 changes: 11 additions & 6 deletions metrics/teststat/teststat.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ import (
// TestCounter puts some deltas through the counter, and then calls the value
// func to check that the counter has the correct final value.
func TestCounter(counter metrics.Counter, value func() float64) error {
want := FillCounter(counter)
if have := value(); want != have {
return fmt.Errorf("want %f, have %f", want, have)
}

return nil
}

// FillCounter puts some deltas through the counter and returns the total value.
func FillCounter(counter metrics.Counter) float64 {
a := rand.Perm(100)
n := rand.Intn(len(a))

Expand All @@ -23,12 +33,7 @@ func TestCounter(counter metrics.Counter, value func() float64) error {
counter.Add(f)
want += f
}

if have := value(); want != have {
return fmt.Errorf("want %f, have %f", want, have)
}

return nil
return want
}

// TestGauge puts some values through the gauge, and then calls the value func
Expand Down