-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
wrapped_exporter.go
68 lines (58 loc) · 2.3 KB
/
wrapped_exporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter"
import (
"context"
"fmt"
"sync"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/attribute"
)
// wrappedExporter is an exporter that waits for the data processing to complete before shutting down.
// consumeWG has to be incremented explicitly by the consumer of the wrapped exporter.
type wrappedExporter struct {
component.Component
consumeWG sync.WaitGroup
// we store the attributes here for both cases, to avoid new allocations on the hot path
endpointAttr attribute.Set
successAttr attribute.Set
failureAttr attribute.Set
}
func newWrappedExporter(exp component.Component, identifier string) *wrappedExporter {
ea := attribute.String("endpoint", identifier)
return &wrappedExporter{
Component: exp,
endpointAttr: attribute.NewSet(ea),
successAttr: attribute.NewSet(ea, attribute.Bool("success", true)),
failureAttr: attribute.NewSet(ea, attribute.Bool("success", false)),
}
}
func (we *wrappedExporter) Shutdown(ctx context.Context) error {
we.consumeWG.Wait()
return we.Component.Shutdown(ctx)
}
func (we *wrappedExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
te, ok := we.Component.(exporter.Traces)
if !ok {
return fmt.Errorf("unable to export traces, unexpected exporter type: expected exporter.Traces but got %T", we.Component)
}
return te.ConsumeTraces(ctx, td)
}
func (we *wrappedExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
me, ok := we.Component.(exporter.Metrics)
if !ok {
return fmt.Errorf("unable to export metrics, unexpected exporter type: expected exporter.Metrics but got %T", we.Component)
}
return me.ConsumeMetrics(ctx, md)
}
func (we *wrappedExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
le, ok := we.Component.(exporter.Logs)
if !ok {
return fmt.Errorf("unable to export logs, unexpected exporter type: expected exporter.Logs but got %T", we.Component)
}
return le.ConsumeLogs(ctx, ld)
}