Skip to content

Commit

Permalink
Add support for AWS EMF 1 second Storage Resolution in the AWS EMF Ex…
Browse files Browse the repository at this point in the history
…porter

This change implements the capability for users of the AWS EMF Exporter to specify
which metrics they would like to have sent to CloudWatch with a 1 second Storage
Resolution. The EMF Exporter now explicitly states the Storage Resolution for each
metric as 60 seconds, the previous implicit default, so there is no behavior
change. If the user specifies a metric to have 1 second resolution it will be
sent to CloudWatch EMF with the Storage Resolution set accordingly.

Previously the AWS EMF Exporter sent metric data into CloudWatch without
specifying the storage resolution. CloudWatch would then default to a 60 second
storage resolution, even if metrics are sent more frequently than every 60
seconds. This would confuse users when they try to apply functions like AVG,
SUM, MAX, or MIN to their metrics with a period of 5 seconds. The function would
be applied by CloudWatch to 60 seconds worth of data and produced unexpected
results and confusion for the user. This commit makes this 60 second resolution
explicit in the messages sent to CloudWatch by the EMF Exporter and also gives
the user the option to specify a more granular 1 second resolution per metric
in the configuration file of the AWS EMF Exporter.
  • Loading branch information
jpbarto committed Oct 29, 2024
1 parent df5ba72 commit f32066e
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 170 deletions.
11 changes: 10 additions & 1 deletion exporter/awsemfexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type MetricDescriptor struct {
// Overwrite set to true means the existing metric descriptor will be overwritten or a new metric descriptor will be created; false means
// the descriptor will only be configured if empty.
Overwrite bool `mapstructure:"overwrite"`
// Set the storage resolution for this metric in AWS Embedded Metric Format (EMF). Valid values are 1 (for 1 second resolution) and 60 (for 60 second resolution).
StorageResolution int `mapstructure:"storage_resolution"`
}

var _ component.Config = (*Config)(nil)
Expand All @@ -123,7 +125,14 @@ func (config *Config) Validate() error {
if descriptor.MetricName == "" {
continue
}
if _, ok := eMFSupportedUnits[descriptor.Unit]; ok {
_, unitOk := eMFSupportedUnits[descriptor.Unit]
stoResErr := cwlogs.ValidateStorageResolution(descriptor.StorageResolution)

// store the descriptor if
// - it has a valid unit and a valid storage resolution
// - it has a valid unit and no specified storage resolution
// - no specified unit but a valid storage resolution
if (unitOk && stoResErr == nil) || (unitOk && descriptor.StorageResolution == 0) || (stoResErr == nil && descriptor.Unit == "") {
validDescriptors = append(validDescriptors, descriptor)
} else {
config.logger.Warn("Dropped unsupported metric desctriptor.", zap.String("unit", descriptor.Unit))
Expand Down
15 changes: 11 additions & 4 deletions exporter/awsemfexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ func TestLoadConfig(t *testing.T) {
OutputDestination: "cloudwatch",
Version: "1",
MetricDescriptors: []MetricDescriptor{{
MetricName: "memcached_current_items",
Unit: "Count",
Overwrite: true,
MetricName: "memcached_current_items",
Unit: "Count",
Overwrite: true,
StorageResolution: 1,
}},
logger: zap.NewNop(),
},
Expand Down Expand Up @@ -127,6 +128,10 @@ func TestConfigValidate(t *testing.T) {
{Unit: "Count", MetricName: "apiserver_total", Overwrite: true},
{Unit: "INVALID", MetricName: "404"},
{Unit: "Megabytes", MetricName: "memory_usage"},
{StorageResolution: 1, MetricName: "saturation"},
{StorageResolution: 10, MetricName: "throughput"},
{Unit: "Count", MetricName: "error_total", StorageResolution: 1},
{Unit: "Count", MetricName: "error_total", StorageResolution: 5, Overwrite: true},
}
cfg := &Config{
AWSSessionSettings: awsutil.AWSSessionSettings{
Expand All @@ -140,10 +145,12 @@ func TestConfigValidate(t *testing.T) {
}
assert.NoError(t, component.ValidateConfig(cfg))

assert.Len(t, cfg.MetricDescriptors, 2)
assert.Len(t, cfg.MetricDescriptors, 4)
assert.Equal(t, []MetricDescriptor{
{Unit: "Count", MetricName: "apiserver_total", Overwrite: true},
{Unit: "Megabytes", MetricName: "memory_usage"},
{StorageResolution: 1, MetricName: "saturation"},
{Unit: "Count", MetricName: "error_total", StorageResolution: 1},
}, cfg.MetricDescriptors)
}

Expand Down
20 changes: 16 additions & 4 deletions exporter/awsemfexporter/grouped_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ type groupedMetric struct {

// metricInfo defines value and unit for OT Metrics
type metricInfo struct {
value any
unit string
value any
unit string
storageResolution int
}

// addToGroupedMetric processes OT metrics and adds them into GroupedMetric buckets
Expand Down Expand Up @@ -78,8 +79,9 @@ func addToGroupedMetric(
}

metric := &metricInfo{
value: dp.value,
unit: translateUnit(pmd, descriptor),
value: dp.value,
unit: translateUnit(pmd, descriptor),
storageResolution: translateResolution(pmd, descriptor),
}

if dp.timestampMs > 0 {
Expand Down Expand Up @@ -186,6 +188,16 @@ func mapGetHelper(labels map[string]string, key string) string {
return ""
}

func translateResolution(metric pmetric.Metric, descriptor map[string]MetricDescriptor) int {
if descriptor, exists := descriptor[metric.Name()]; exists {
if descriptor.StorageResolution == 1 || descriptor.StorageResolution == 60 {
return descriptor.StorageResolution
}
}

return 60
}

func translateUnit(metric pmetric.Metric, descriptor map[string]MetricDescriptor) string {
unit := metric.Unit()
if descriptor, exists := descriptor[metric.Name()]; exists {
Expand Down
21 changes: 13 additions & 8 deletions exporter/awsemfexporter/grouped_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ func TestAddToGroupedMetric(t *testing.T) {
expectedLabels: map[string]string{oTellibDimensionKey: instrumentationLibName, "label1": "value1"},
expectedMetricInfo: map[string]*metricInfo{
"foo": {
value: 0.1,
unit: "Count",
value: 0.1,
unit: "Count",
storageResolution: 60,
},
},
},
Expand All @@ -54,8 +55,9 @@ func TestAddToGroupedMetric(t *testing.T) {
expectedLabels: map[string]string{oTellibDimensionKey: instrumentationLibName, "label1": "value1"},
expectedMetricInfo: map[string]*metricInfo{
"foo": {
value: float64(1),
unit: "Count",
value: float64(1),
unit: "Count",
storageResolution: 60,
},
},
},
Expand All @@ -70,7 +72,8 @@ func TestAddToGroupedMetric(t *testing.T) {
Count: 18,
Sum: 35.0,
},
unit: "Seconds",
unit: "Seconds",
storageResolution: 60,
},
},
},
Expand All @@ -87,7 +90,8 @@ func TestAddToGroupedMetric(t *testing.T) {
Count: 5,
Sum: 15,
},
unit: "Seconds",
unit: "Seconds",
storageResolution: 60,
},
},
},
Expand Down Expand Up @@ -297,8 +301,9 @@ func TestAddToGroupedMetric(t *testing.T) {
assert.Len(t, group.metrics, 1)
expectedMetrics := map[string]*metricInfo{
"int-gauge": {
value: float64(1),
unit: "Count",
value: float64(1),
unit: "Count",
storageResolution: 60,
},
}
assert.Equal(t, expectedMetrics, group.metrics)
Expand Down
24 changes: 16 additions & 8 deletions exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type cWMetrics struct {
type cWMeasurement struct {
Namespace string
Dimensions [][]string
Metrics []map[string]string
Metrics []map[string]any
}

type cWMetricStats struct {
Expand Down Expand Up @@ -228,15 +228,19 @@ func groupedMetricToCWMeasurement(groupedMetric *groupedMetric, config *Config)
// Add on rolled-up dimensions
dimensions = append(dimensions, rollupDimensionArray...)

metrics := make([]map[string]string, len(groupedMetric.metrics))
metrics := make([]map[string]any, len(groupedMetric.metrics))
idx = 0
for metricName, metricInfo := range groupedMetric.metrics {
metrics[idx] = map[string]string{
"Name": metricName,
metrics[idx] = map[string]any{
"Name": metricName,
"StorageResolution": 60,
}
if metricInfo.unit != "" {
metrics[idx]["Unit"] = metricInfo.unit
}
if resErr := cwlogs.ValidateStorageResolution(metricInfo.storageResolution); resErr == nil {
metrics[idx]["StorageResolution"] = metricInfo.storageResolution
}
idx++
}

Expand Down Expand Up @@ -278,7 +282,7 @@ func groupedMetricToCWMeasurementsWithFilters(groupedMetric *groupedMetric, conf
// Group metrics by matched metric declarations
type metricDeclarationGroup struct {
metricDeclIdxList []int
metrics []map[string]string
metrics []map[string]any
}

metricDeclGroups := make(map[string]*metricDeclarationGroup)
Expand All @@ -299,19 +303,23 @@ func groupedMetricToCWMeasurementsWithFilters(groupedMetric *groupedMetric, conf
continue
}

metric := map[string]string{
"Name": metricName,
metric := map[string]any{
"Name": metricName,
"StorageResolution": 60,
}
if metricInfo.unit != "" {
metric["Unit"] = metricInfo.unit
}
if resErr := cwlogs.ValidateStorageResolution(metricInfo.storageResolution); resErr == nil {
metric["StorageResolution"] = metricInfo.storageResolution
}
metricDeclKey := fmt.Sprint(metricDeclIdx)
if group, ok := metricDeclGroups[metricDeclKey]; ok {
group.metrics = append(group.metrics, metric)
} else {
metricDeclGroups[metricDeclKey] = &metricDeclarationGroup{
metricDeclIdxList: metricDeclIdx,
metrics: []map[string]string{metric},
metrics: []map[string]any{metric},
}
}
}
Expand Down
Loading

0 comments on commit f32066e

Please sign in to comment.