Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into mvg/readd-filter
Browse files Browse the repository at this point in the history
  • Loading branch information
MadVikingGod committed Nov 3, 2022
2 parents ec165d6 + b5b6852 commit f97f1af
Show file tree
Hide file tree
Showing 38 changed files with 815 additions and 195 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- The `WithView` `Option` is added to the `go.opentelemetry.io/otel/sdk/metric` package.
This option is used to configure the view(s) a `MeterProvider` will use for all `Reader`s that are registered with it. (#3387)
- Add Instrumentation Scope and Version as info metric and label in Prometheus exporter.
This can be disabled using the `WithoutScopeInfo()` option added to that package.(#3273, #3357)

### Changed

- The `"go.opentelemetry.io/otel/sdk/metric".WithReader` option no longer accepts views to associate with the `Reader`.
Instead, views are now registered directly with the `MeterProvider` via the new `WithView` option.
The views registered with the `MeterProvider` apply to all `Reader`s. (#3387)
- The `Temporality(view.InstrumentKind) metricdata.Temporality` and `Aggregation(view.InstrumentKind) aggregation.Aggregation` methods are added to the `"go.opentelemetry.io/otel/sdk/metric".Exporter` interface. (#3260)
- The `Temporality(view.InstrumentKind) metricdata.Temporality` and `Aggregation(view.InstrumentKind) aggregation.Aggregation` methods are added to the `"go.opentelemetry.io/otel/exporters/otlp/otlpmetric".Client` interface. (#3260)
- The `WithTemporalitySelector` and `WithAggregationSelector` `ReaderOption`s have been changed to `ManualReaderOption`s in the `go.opentelemetry.io/otel/sdk/metric` package. (#3260)
- The periodic reader in the `go.opentelemetry.io/otel/sdk/metric` package now uses the temporality and aggregation selectors from its configured exporter instead of accepting them as options. (#3260)

### Fixed

Expand All @@ -27,6 +33,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Asynchronous counters (`Counter` and `UpDownCounter`) from the metric SDK now produce delta sums when configured with delta temporality. (#3398)
- Exported `Status` codes in the `go.opentelemetry.io/otel/exporters/zipkin` exporter are now exported as all upper case values. (#3340)
- Reenabled Attribute Filters in the Metric SDK. (#3396)
- Do not report empty partial-success responses in the `go.opentelemetry.io/otel/exporters/otlp` exporters. (#3438, #3432)

## [1.11.1/0.33.0] 2022-10-19

Expand Down
34 changes: 15 additions & 19 deletions exporters/otlp/internal/partialsuccess.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,14 @@ package internal // import "go.opentelemetry.io/otel/exporters/otlp/internal"

import "fmt"

// PartialSuccessDropKind indicates the kind of partial success error
// received by an OTLP exporter, which corresponds with the signal
// being exported.
type PartialSuccessDropKind string

const (
// TracingPartialSuccess indicates that some spans were rejected.
TracingPartialSuccess PartialSuccessDropKind = "spans"

// MetricsPartialSuccess indicates that some metric data points were rejected.
MetricsPartialSuccess PartialSuccessDropKind = "metric data points"
)

// PartialSuccess represents the underlying error for all handling
// OTLP partial success messages. Use `errors.Is(err,
// PartialSuccess{})` to test whether an error passed to the OTel
// error handler belongs to this category.
type PartialSuccess struct {
ErrorMessage string
RejectedItems int64
RejectedKind PartialSuccessDropKind
RejectedKind string
}

var _ error = PartialSuccess{}
Expand All @@ -56,13 +43,22 @@ func (ps PartialSuccess) Is(err error) bool {
return ok
}

// PartialSuccessToError produces an error suitable for passing to
// `otel.Handle()` out of the fields in a partial success response,
// independent of which signal produced the outcome.
func PartialSuccessToError(kind PartialSuccessDropKind, itemsRejected int64, errorMessage string) error {
// TracePartialSuccessError returns an error describing a partial success
// response for the trace signal.
func TracePartialSuccessError(itemsRejected int64, errorMessage string) error {
return PartialSuccess{
ErrorMessage: errorMessage,
RejectedItems: itemsRejected,
RejectedKind: "spans",
}
}

// MetricPartialSuccessError returns an error describing a partial success
// response for the metric signal.
func MetricPartialSuccessError(itemsRejected int64, errorMessage string) error {
return PartialSuccess{
ErrorMessage: errorMessage,
RejectedItems: itemsRejected,
RejectedKind: kind,
RejectedKind: "metric data points",
}
}
9 changes: 4 additions & 5 deletions exporters/otlp/internal/partialsuccess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ func requireErrorString(t *testing.T, expect string, err error) {
}

func TestPartialSuccessFormat(t *testing.T) {
requireErrorString(t, "empty message (0 metric data points rejected)", PartialSuccessToError(MetricsPartialSuccess, 0, ""))
requireErrorString(t, "help help (0 metric data points rejected)", PartialSuccessToError(MetricsPartialSuccess, 0, "help help"))
requireErrorString(t, "what happened (10 metric data points rejected)", PartialSuccessToError(MetricsPartialSuccess, 10, "what happened"))
requireErrorString(t, "what happened (15 spans rejected)", PartialSuccessToError(TracingPartialSuccess, 15, "what happened"))
requireErrorString(t, "empty message (7 log records rejected)", PartialSuccessToError("log records", 7, ""))
requireErrorString(t, "empty message (0 metric data points rejected)", MetricPartialSuccessError(0, ""))
requireErrorString(t, "help help (0 metric data points rejected)", MetricPartialSuccessError(0, "help help"))
requireErrorString(t, "what happened (10 metric data points rejected)", MetricPartialSuccessError(10, "what happened"))
requireErrorString(t, "what happened (15 spans rejected)", TracePartialSuccessError(15, "what happened"))
}
9 changes: 9 additions & 0 deletions exporters/otlp/otlpmetric/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,20 @@ package otlpmetric // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric
import (
"context"

"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)

// Client handles the transmission of OTLP data to an OTLP receiving endpoint.
type Client interface {
// Temporality returns the Temporality to use for an instrument kind.
Temporality(view.InstrumentKind) metricdata.Temporality

// Aggregation returns the Aggregation to use for an instrument kind.
Aggregation(view.InstrumentKind) aggregation.Aggregation

// UploadMetrics transmits metric data to an OTLP receiver.
//
// All retry logic must be handled by UploadMetrics alone, the Exporter
Expand Down
34 changes: 32 additions & 2 deletions exporters/otlp/otlpmetric/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (

"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/transform"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)

Expand All @@ -34,6 +36,20 @@ type exporter struct {
shutdownOnce sync.Once
}

// Temporality returns the Temporality to use for an instrument kind.
func (e *exporter) Temporality(k view.InstrumentKind) metricdata.Temporality {
e.clientMu.Lock()
defer e.clientMu.Unlock()
return e.client.Temporality(k)
}

// Aggregation returns the Aggregation to use for an instrument kind.
func (e *exporter) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
e.clientMu.Lock()
defer e.clientMu.Unlock()
return e.client.Aggregation(k)
}

// Export transforms and transmits metric data to an OTLP receiver.
func (e *exporter) Export(ctx context.Context, rm metricdata.ResourceMetrics) error {
otlpRm, err := transform.ResourceMetrics(rm)
Expand Down Expand Up @@ -68,7 +84,10 @@ func (e *exporter) Shutdown(ctx context.Context) error {
e.shutdownOnce.Do(func() {
e.clientMu.Lock()
client := e.client
e.client = shutdownClient{}
e.client = shutdownClient{
temporalitySelector: client.Temporality,
aggregationSelector: client.Aggregation,
}
e.clientMu.Unlock()
err = client.Shutdown(ctx)
})
Expand All @@ -82,7 +101,10 @@ func New(client Client) metric.Exporter {
return &exporter{client: client}
}

type shutdownClient struct{}
type shutdownClient struct {
temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector
}

func (c shutdownClient) err(ctx context.Context) error {
if err := ctx.Err(); err != nil {
Expand All @@ -91,6 +113,14 @@ func (c shutdownClient) err(ctx context.Context) error {
return errShutdown
}

func (c shutdownClient) Temporality(k view.InstrumentKind) metricdata.Temporality {
return c.temporalitySelector(k)
}

func (c shutdownClient) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return c.aggregationSelector(k)
}

func (c shutdownClient) UploadMetrics(ctx context.Context, _ *mpb.ResourceMetrics) error {
return c.err(ctx)
}
Expand Down
11 changes: 11 additions & 0 deletions exporters/otlp/otlpmetric/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import (

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)

Expand All @@ -31,6 +34,14 @@ type client struct {
n int
}

func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality {
return metric.DefaultTemporalitySelector(k)
}

func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return metric.DefaultAggregationSelector(k)
}

func (c *client) UploadMetrics(context.Context, *mpb.ResourceMetrics) error {
c.n++
return nil
Expand Down
42 changes: 42 additions & 0 deletions exporters/otlp/otlpmetric/internal/oconf/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (

"go.opentelemetry.io/otel/exporters/otlp/internal"
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/view"
)

const (
Expand Down Expand Up @@ -57,6 +61,9 @@ type (

// gRPC configurations
GRPCCredentials credentials.TransportCredentials

TemporalitySelector metric.TemporalitySelector
AggregationSelector metric.AggregationSelector
}

Config struct {
Expand All @@ -82,6 +89,9 @@ func NewHTTPConfig(opts ...HTTPOption) Config {
URLPath: DefaultMetricsPath,
Compression: NoCompression,
Timeout: DefaultTimeout,

TemporalitySelector: metric.DefaultTemporalitySelector,
AggregationSelector: metric.DefaultAggregationSelector,
},
RetryConfig: retry.DefaultConfig,
}
Expand All @@ -102,6 +112,9 @@ func NewGRPCConfig(opts ...GRPCOption) Config {
URLPath: DefaultMetricsPath,
Compression: NoCompression,
Timeout: DefaultTimeout,

TemporalitySelector: metric.DefaultTemporalitySelector,
AggregationSelector: metric.DefaultAggregationSelector,
},
RetryConfig: retry.DefaultConfig,
DialOptions: []grpc.DialOption{grpc.WithUserAgent(internal.GetUserAgentHeader())},
Expand Down Expand Up @@ -313,3 +326,32 @@ func WithTimeout(duration time.Duration) GenericOption {
return cfg
})
}

func WithTemporalitySelector(selector metric.TemporalitySelector) GenericOption {
return newGenericOption(func(cfg Config) Config {
cfg.Metrics.TemporalitySelector = selector
return cfg
})
}

func WithAggregationSelector(selector metric.AggregationSelector) GenericOption {
// Deep copy and validate before using.
wrapped := func(ik view.InstrumentKind) aggregation.Aggregation {
a := selector(ik)
cpA := a.Copy()
if err := cpA.Err(); err != nil {
cpA = metric.DefaultAggregationSelector(ik)
global.Error(
err, "using default aggregation instead",
"aggregation", a,
"replacement", cpA,
)
}
return cpA
}

return newGenericOption(func(cfg Config) Config {
cfg.Metrics.AggregationSelector = wrapped
return cfg
})
}
43 changes: 43 additions & 0 deletions exporters/otlp/otlpmetric/internal/oconf/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (

"go.opentelemetry.io/otel/exporters/otlp/internal/envconfig"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
)

const (
Expand Down Expand Up @@ -383,6 +386,38 @@ func TestConfigs(t *testing.T) {
assert.Equal(t, c.Metrics.Timeout, 5*time.Second)
},
},

// Temporality Selector Tests
{
name: "WithTemporalitySelector",
opts: []oconf.GenericOption{
oconf.WithTemporalitySelector(deltaSelector),
},
asserts: func(t *testing.T, c *oconf.Config, grpcOption bool) {
// Function value comparisons are disallowed, test non-default
// behavior of a TemporalitySelector here to ensure our "catch
// all" was set.
var undefinedKind view.InstrumentKind
got := c.Metrics.TemporalitySelector
assert.Equal(t, metricdata.DeltaTemporality, got(undefinedKind))
},
},

// Aggregation Selector Tests
{
name: "WithAggregationSelector",
opts: []oconf.GenericOption{
oconf.WithAggregationSelector(dropSelector),
},
asserts: func(t *testing.T, c *oconf.Config, grpcOption bool) {
// Function value comparisons are disallowed, test non-default
// behavior of a AggregationSelector here to ensure our "catch
// all" was set.
var undefinedKind view.InstrumentKind
got := c.Metrics.AggregationSelector
assert.Equal(t, aggregation.Drop{}, got(undefinedKind))
},
},
}

for _, tt := range tests {
Expand All @@ -406,6 +441,14 @@ func TestConfigs(t *testing.T) {
}
}

func dropSelector(view.InstrumentKind) aggregation.Aggregation {
return aggregation.Drop{}
}

func deltaSelector(view.InstrumentKind) metricdata.Temporality {
return metricdata.DeltaTemporality
}

func asHTTPOptions(opts []oconf.GenericOption) []oconf.HTTPOption {
converted := make([]oconf.HTTPOption, len(opts))
for i, o := range opts {
Expand Down
12 changes: 12 additions & 0 deletions exporters/otlp/otlpmetric/internal/otest/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
"testing"

"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
cpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
Expand All @@ -27,6 +31,14 @@ type client struct {
storage *Storage
}

func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality {
return metric.DefaultTemporalitySelector(k)
}

func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return metric.DefaultAggregationSelector(k)
}

func (c *client) Collect() *Storage {
return c.storage
}
Expand Down
Loading

0 comments on commit f97f1af

Please sign in to comment.