Skip to content

Commit

Permalink
Change max query size for GetMetricData API to 500 and add RecentlyAc…
Browse files Browse the repository at this point in the history
…tive config (#33105)
  • Loading branch information
kaiyan-sheng authored Oct 11, 2022
1 parent 1032646 commit 35cab79
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- in module/windows/perfmon, changed collection method of the second counter value required to create a displayable value {pull}32305[32305]
- Fix and improve AWS metric period calculation to avoid zero-length intervals {pull}32724[32724]
- Add missing cluster metadata to k8s module metricsets {pull}32979[32979] {pull}33032[33032]
- Change max query size for GetMetricData API to 500 and add RecentlyActive for ListMetrics API call {pull}33105[33105]
- Add GCP CloudSQL region filter {pull}32943[32943]
- Fix logstash cgroup mappings {pull}33131[33131]
- Remove unused `elasticsearch.node_stats.indices.bulk.avg_time.bytes` mapping {pull}33263[33263]
Expand Down
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/aws/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (m *MetricSet) getCloudWatchBillingMetrics(
endTime time.Time) []mb.Event {
var events []mb.Event
namespace := "AWS/Billing"
listMetricsOutput, err := aws.GetListMetricsOutput(namespace, regionName, svcCloudwatch)
listMetricsOutput, err := aws.GetListMetricsOutput(namespace, regionName, m.Period, svcCloudwatch)
if err != nil {
m.Logger().Error(err.Error())
return nil
Expand Down
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
for namespace, namespaceDetails := range namespaceDetailTotal {
m.logger.Debugf("Collected metrics from namespace %s", namespace)

listMetricsOutput, err := aws.GetListMetricsOutput(namespace, regionName, svcCloudwatch)
listMetricsOutput, err := aws.GetListMetricsOutput(namespace, regionName, m.Period, svcCloudwatch)
if err != nil {
m.logger.Info(err.Error())
continue
Expand Down
52 changes: 32 additions & 20 deletions x-pack/metricbeat/module/aws/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,23 @@ func GetStartTimeEndTime(now time.Time, period time.Duration, latency time.Durat
// GetListMetricsOutput function gets listMetrics results from cloudwatch ~~per namespace~~ for each region.
// ListMetrics Cloudwatch API is used to list the specified metrics. The returned metrics can be used with GetMetricData
// to obtain statistical data.
func GetListMetricsOutput(namespace string, regionName string, svcCloudwatch cloudwatch.ListMetricsAPIClient) ([]types.Metric, error) {
// Note: We are not using Dimensions and MetricName in ListMetricsInput because with that we will have to make one ListMetrics
// API call per metric name and set of dimensions. This will increase API cost.
func GetListMetricsOutput(namespace string, regionName string, period time.Duration, svcCloudwatch cloudwatch.ListMetricsAPIClient) ([]types.Metric, error) {
var metricsTotal []types.Metric
var nextToken *string

listMetricsInput := &cloudwatch.ListMetricsInput{
NextToken: nextToken,
}

// To filter the results to show only metrics that have had data points published
// in the past three hours, specify this parameter with a value of PT3H.
// Please see https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_ListMetrics.html for more details.
if period <= time.Hour*3 {
listMetricsInput.RecentlyActive = types.RecentlyActivePt3h
}

if namespace != "*" {
listMetricsInput.Namespace = &namespace
}
Expand All @@ -67,14 +76,14 @@ func GetListMetricsOutput(namespace string, regionName string, svcCloudwatch clo

// GetMetricDataResults function uses MetricDataQueries to get metric data output.
func GetMetricDataResults(metricDataQueries []types.MetricDataQuery, svc cloudwatch.GetMetricDataAPIClient, startTime time.Time, endTime time.Time) ([]types.MetricDataResult, error) {
maxQuerySize := 100
maxNumberOfMetricsRetrieved := 500
getMetricDataOutput := &cloudwatch.GetMetricDataOutput{NextToken: nil}

// Split metricDataQueries into smaller slices that length no longer than 100.
// 100 is defined in maxQuerySize.
// To avoid ValidationError: The collection MetricDataQueries must not have a size greater than 100.
for i := 0; i < len(metricDataQueries); i += maxQuerySize {
metricDataQueriesPartial := metricDataQueries[i:int(math.Min(float64(i+maxQuerySize), float64(len(metricDataQueries))))]
// Split metricDataQueries into smaller slices that length no longer than 500.
// 500 is defined in maxNumberOfMetricsRetrieved.
// To avoid ValidationError: The collection MetricDataQueries must not have a size greater than 500.
for i := 0; i < len(metricDataQueries); i += maxNumberOfMetricsRetrieved {
metricDataQueriesPartial := metricDataQueries[i:int(math.Min(float64(i+maxNumberOfMetricsRetrieved), float64(len(metricDataQueries))))]
if len(metricDataQueriesPartial) == 0 {
return getMetricDataOutput.MetricDataResults, nil
}
Expand Down Expand Up @@ -111,19 +120,22 @@ func CheckTimestampInArray(timestamp time.Time, timestampArray []time.Time) (boo

// FindTimestamp function checks MetricDataResults and find the timestamp to collect metrics from.
// For example, MetricDataResults might look like:
// metricDataResults = [{
// Id: "sqs0",
// Label: "testName SentMessageSize",
// StatusCode: Complete,
// Timestamps: [2019-03-11 17:45:00 +0000 UTC],
// Values: [981]
// } {
// Id: "sqs1",
// Label: "testName NumberOfMessagesSent",
// StatusCode: Complete,
// Timestamps: [2019-03-11 17:45:00 +0000 UTC,2019-03-11 17:40:00 +0000 UTC],
// Values: [0.5,0]
// }]
//
// metricDataResults = [{
// Id: "sqs0",
// Label: "testName SentMessageSize",
// StatusCode: Complete,
// Timestamps: [2019-03-11 17:45:00 +0000 UTC],
// Values: [981]
// } {
//
// Id: "sqs1",
// Label: "testName NumberOfMessagesSent",
// StatusCode: Complete,
// Timestamps: [2019-03-11 17:45:00 +0000 UTC,2019-03-11 17:40:00 +0000 UTC],
// Values: [0.5,0]
// }]
//
// This case, we are collecting values for both metrics from timestamp 2019-03-11 17:45:00 +0000 UTC.
func FindTimestamp(getMetricDataResults []types.MetricDataResult) time.Time {
timestamp := time.Time{}
Expand Down
4 changes: 2 additions & 2 deletions x-pack/metricbeat/module/aws/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (m *MockResourceGroupsTaggingClient) GetResources(_ context.Context, _ *res

func TestGetListMetricsOutput(t *testing.T) {
svcCloudwatch := &MockCloudWatchClient{}
listMetricsOutput, err := GetListMetricsOutput("AWS/EC2", "us-west-1", svcCloudwatch)
listMetricsOutput, err := GetListMetricsOutput("AWS/EC2", "us-west-1", time.Minute*5, svcCloudwatch)
assert.NoError(t, err)
assert.Equal(t, 1, len(listMetricsOutput))
assert.Equal(t, namespace, *listMetricsOutput[0].Namespace)
Expand All @@ -149,7 +149,7 @@ func TestGetListMetricsOutput(t *testing.T) {

func TestGetListMetricsOutputWithWildcard(t *testing.T) {
svcCloudwatch := &MockCloudWatchClient{}
listMetricsOutput, err := GetListMetricsOutput("*", "us-west-1", svcCloudwatch)
listMetricsOutput, err := GetListMetricsOutput("*", "us-west-1", time.Minute*5, svcCloudwatch)
assert.NoError(t, err)
assert.Equal(t, 1, len(listMetricsOutput))
assert.Equal(t, namespace, *listMetricsOutput[0].Namespace)
Expand Down
4 changes: 2 additions & 2 deletions x-pack/metricbeat/module/azure/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
// Currently, the usage period is the start/end time (00:00:00->23:59:59 UTC) of the day before the reference time.
//
// For example, if the reference time is 2007-01-09 09:41:00Z, the usage period is:
// 2007-01-08 00:00:00Z -> 2007-01-08 23:59:59Z
//
// 2007-01-08 00:00:00Z -> 2007-01-08 23:59:59Z
func usageIntervalFrom(reference time.Time) (time.Time, time.Time) {
beginningOfDay := reference.UTC().Truncate(24 * time.Hour).Add((-24) * time.Hour)
endOfDay := beginningOfDay.Add(time.Hour * 24).Add(time.Second * (-1))
Expand All @@ -127,8 +127,8 @@ func usageIntervalFrom(reference time.Time) (time.Time, time.Time) {
// reference time.
//
// For example, if the reference time is 2007-01-09 09:41:00Z, the forecast period is:
// 2007-01-01T00:00:00Z -> 2007-01-31:59:59Z
//
// 2007-01-01T00:00:00Z -> 2007-01-31:59:59Z
func forecastIntervalFrom(reference time.Time) (time.Time, time.Time) {
referenceUTC := reference.UTC()
beginningOfMonth := time.Date(referenceUTC.Year(), referenceUTC.Month(), 1, 0, 0, 0, 0, time.UTC)
Expand Down

0 comments on commit 35cab79

Please sign in to comment.