Skip to content

Commit

Permalink
Add internal data -> OC compatibility shims to processors and exporters
Browse files Browse the repository at this point in the history
In order to start migrating components from OC to internal data, all parts of the pipeline should be able to automatically convert back to OC if downstream component doesn't work with internal data structure yet
  • Loading branch information
dmitryax committed Mar 19, 2020
1 parent 04068d2 commit a161f2d
Show file tree
Hide file tree
Showing 15 changed files with 302 additions and 59 deletions.
8 changes: 4 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ type Factories struct {
Receivers map[string]receiver.BaseFactory

// Processors maps processor type names in the config to the respective factory.
Processors map[string]processor.Factory
Processors map[string]processor.BaseFactory

// Exporters maps exporter type names in the config to the respective factory.
Exporters map[string]exporter.Factory
Exporters map[string]exporter.BaseFactory

// Extensions maps extension type names in the config to the respective factory.
Extensions map[string]extension.Factory
Expand Down Expand Up @@ -383,7 +383,7 @@ func loadReceivers(v *viper.Viper, factories map[string]receiver.BaseFactory) (c
return receivers, nil
}

func loadExporters(v *viper.Viper, factories map[string]exporter.Factory) (configmodels.Exporters, error) {
func loadExporters(v *viper.Viper, factories map[string]exporter.BaseFactory) (configmodels.Exporters, error) {
// Get the list of all "exporters" sub vipers from config source.
subViper := v.Sub(exportersKeyName)

Expand Down Expand Up @@ -451,7 +451,7 @@ func loadExporters(v *viper.Viper, factories map[string]exporter.Factory) (confi
return exporters, nil
}

func loadProcessors(v *viper.Viper, factories map[string]processor.Factory) (configmodels.Processors, error) {
func loadProcessors(v *viper.Viper, factories map[string]processor.BaseFactory) (configmodels.Processors, error) {
// Get the list of all "processors" sub vipers from config source.
subViper := v.Sub(processorsKeyName)

Expand Down
4 changes: 2 additions & 2 deletions defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestDefaultComponents(t *testing.T) {
"otlp": &otlpreceiver.Factory{},
"vmmetrics": &vmmetricsreceiver.Factory{},
}
expectedProcessors := map[string]processor.Factory{
expectedProcessors := map[string]processor.BaseFactory{
"attributes": &attributesprocessor.Factory{},
"queued_retry": &queuedprocessor.Factory{},
"batch": &batchprocessor.Factory{},
Expand All @@ -74,7 +74,7 @@ func TestDefaultComponents(t *testing.T) {
"probabilistic_sampler": &probabilisticsamplerprocessor.Factory{},
"span": &spanprocessor.Factory{},
}
expectedExporters := map[string]exporter.Factory{
expectedExporters := map[string]exporter.BaseFactory{
"opencensus": &opencensusexporter.Factory{},
"prometheus": &prometheusexporter.Factory{},
"logging": &loggingexporter.Factory{},
Expand Down
16 changes: 13 additions & 3 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,36 @@ type Exporter interface {
component.Component
}

// BaseTraceExporter defines a common interface for TraceExporter and TraceExporterV2
type BaseTraceExporter interface {
Exporter
}

// TraceExporter is a TraceConsumer that is also an Exporter.
type TraceExporter interface {
consumer.TraceConsumer
Exporter
BaseTraceExporter
}

// TraceExporterV2 is an TraceConsumerV2 that is also an Exporter.
type TraceExporterV2 interface {
consumer.TraceConsumerV2
BaseTraceExporter
}

// BaseMetricsExporter defines a common interface for MetricsExporter and MetricsExporterV2
type BaseMetricsExporter interface {
Exporter
}

// MetricsExporter is a MetricsConsumer that is also an Exporter.
type MetricsExporter interface {
consumer.MetricsConsumer
Exporter
BaseMetricsExporter
}

// MetricsExporterV2 is a MetricsConsumerV2 that is also an Exporter.
type MetricsExporterV2 interface {
consumer.MetricsConsumerV2
Exporter
BaseMetricsExporter
}
4 changes: 2 additions & 2 deletions exporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ type FactoryV2 interface {
// Build takes a list of exporter factories and returns a map of type map[string]Factory
// with factory type as keys. It returns a non-nil error when more than one factories
// have the same type.
func Build(factories ...Factory) (map[string]Factory, error) {
fMap := map[string]Factory{}
func Build(factories ...BaseFactory) (map[string]BaseFactory, error) {
fMap := map[string]BaseFactory{}
for _, f := range factories {
if _, ok := fMap[f.Type()]; ok {
return fMap, fmt.Errorf("duplicate exporter factory %q", f.Type())
Expand Down
10 changes: 5 additions & 5 deletions exporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,25 @@ func (f *TestFactory) CreateMetricsExporter(logger *zap.Logger, cfg configmodels

func TestFactoriesBuilder(t *testing.T) {
type testCase struct {
in []Factory
out map[string]Factory
in []BaseFactory
out map[string]BaseFactory
err bool
}

testCases := []testCase{
{
in: []Factory{
in: []BaseFactory{
&TestFactory{"exp1"},
&TestFactory{"exp2"},
},
out: map[string]Factory{
out: map[string]BaseFactory{
"exp1": &TestFactory{"exp1"},
"exp2": &TestFactory{"exp2"},
},
err: false,
},
{
in: []Factory{
in: []BaseFactory{
&TestFactory{"exp1"},
&TestFactory{"exp1"},
},
Expand Down
24 changes: 24 additions & 0 deletions processor/cloningfanoutconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ import (
// clones of data before fanning out, which ensures each consumer gets their
// own copy of data and is free to modify it.

// CreateMetricsCloningFanOutConnector is a placeholder function for now.
// It supposed to create an old type connector or a new type connector based on type of provided metrics consumer.
func CreateMetricsCloningFanOutConnector(baseMetricsConsumers []consumer.BaseMetricsConsumer) consumer.MetricsConsumer {
// TODO: CreateMetricsCloningFanOutConnector doesn't support new type of consumers
// until internal data structure provides Clone method.
metricsConsumers := make([]consumer.MetricsConsumer, len(baseMetricsConsumers))
for _, baseMetricsConsumer := range baseMetricsConsumers {
metricsConsumers = append(metricsConsumers, baseMetricsConsumer.(consumer.MetricsConsumer))
}
return NewMetricsCloningFanOutConnector(metricsConsumers)
}

// NewMetricsCloningFanOutConnector wraps multiple metrics consumers in a single one.
func NewMetricsCloningFanOutConnector(mcs []consumer.MetricsConsumer) consumer.MetricsConsumer {
return metricsCloningFanOutConnector(mcs)
Expand Down Expand Up @@ -66,6 +78,18 @@ func (mfc metricsCloningFanOutConnector) ConsumeMetricsData(ctx context.Context,
return oterr.CombineErrors(errs)
}

// CreateTraceCloningFanOutConnector is a placeholder function for now.
// It supposed to create an old type connector or a new type connector based on type of provided trace consumer.
func CreateTraceCloningFanOutConnector(baseTraceConsumers []consumer.BaseTraceConsumer) consumer.TraceConsumer {
// TODO: CreateTraceCloningFanOutConnector doesn't support new type of consumers
// until internal data structure provides Clone method
traceConsumers := make([]consumer.TraceConsumer, len(baseTraceConsumers))
for _, baseTraceConsumer := range baseTraceConsumers {
traceConsumers = append(traceConsumers, baseTraceConsumer.(consumer.TraceConsumer))
}
return NewTraceCloningFanOutConnector(traceConsumers)
}

// NewTraceCloningFanOutConnector wraps multiple trace consumers in a single one.
func NewTraceCloningFanOutConnector(tcs []consumer.TraceConsumer) consumer.TraceConsumer {
return traceCloningFanOutConnector(tcs)
Expand Down
4 changes: 2 additions & 2 deletions processor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ type FactoryV2 interface {
// Build takes a list of processor factories and returns a map of type map[string]Factory
// with factory type as keys. It returns a non-nil error when more than one factories
// have the same type.
func Build(factories ...Factory) (map[string]Factory, error) {
fMap := map[string]Factory{}
func Build(factories ...BaseFactory) (map[string]BaseFactory, error) {
fMap := map[string]BaseFactory{}
for _, f := range factories {
if _, ok := fMap[f.Type()]; ok {
return fMap, fmt.Errorf("duplicate processor factory %q", f.Type())
Expand Down
10 changes: 5 additions & 5 deletions processor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,25 @@ func (f *TestFactory) CreateMetricsProcessor(

func TestFactoriesBuilder(t *testing.T) {
type testCase struct {
in []Factory
out map[string]Factory
in []BaseFactory
out map[string]BaseFactory
err bool
}

testCases := []testCase{
{
in: []Factory{
in: []BaseFactory{
&TestFactory{"p1"},
&TestFactory{"p2"},
},
out: map[string]Factory{
out: map[string]BaseFactory{
"p1": &TestFactory{"p1"},
"p2": &TestFactory{"p2"},
},
err: false,
},
{
in: []Factory{
in: []BaseFactory{
&TestFactory{"p1"},
&TestFactory{"p1"},
},
Expand Down
89 changes: 89 additions & 0 deletions processor/fanoutconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,37 @@ import (

"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/internal/data"
"github.com/open-telemetry/opentelemetry-collector/oterr"
)

// This file contains implementations of Trace/Metrics connectors
// that fan out the data to multiple other consumers.

// CreateMetricsFanOutConnector creates a connector based on provided type of trace consumer.
// If any of the wrapped metrics consumers are of the new type, use metricsFanOutConnectorV2,
// otherwise use the old type connector.
func CreateMetricsFanOutConnector(mcs []consumer.BaseMetricsConsumer) consumer.BaseMetricsConsumer {
metricsConsumers := make([]consumer.MetricsConsumer, 0, len(mcs))
metricsConsumersV2 := make([]consumer.MetricsConsumerV2, 0, len(mcs))
anyMetricsConsumersV2 := false
for _, mc := range mcs {
metricsConsumer := mc.(consumer.MetricsConsumer)
metricsConsumers = append(metricsConsumers, metricsConsumer)
if metricsConsumerV2, ok := mc.(consumer.MetricsConsumerV2); ok {
anyMetricsConsumersV2 = true
metricsConsumersV2 = append(metricsConsumersV2, metricsConsumerV2)
} else {
metricsConsumersV2 = append(metricsConsumersV2, consumer.NewInternalToOCMetricsConverter(metricsConsumer))
}
}

if anyMetricsConsumersV2 {
return NewMetricsFanOutConnectorV2(metricsConsumersV2)
}
return NewMetricsFanOutConnector(metricsConsumers)
}

// NewMetricsFanOutConnector wraps multiple metrics consumers in a single one.
func NewMetricsFanOutConnector(mcs []consumer.MetricsConsumer) consumer.MetricsConsumer {
return metricsFanOutConnector(mcs)
Expand All @@ -45,6 +70,50 @@ func (mfc metricsFanOutConnector) ConsumeMetricsData(ctx context.Context, md con
return oterr.CombineErrors(errs)
}

// NewMetricsFanOutConnectorV2 wraps multiple new type metrics consumers in a single one.
func NewMetricsFanOutConnectorV2(mcs []consumer.MetricsConsumerV2) consumer.MetricsConsumerV2 {
return metricsFanOutConnectorV2(mcs)
}

type metricsFanOutConnectorV2 []consumer.MetricsConsumerV2

var _ consumer.MetricsConsumerV2 = (*metricsFanOutConnectorV2)(nil)

// ConsumeMetricsData exports the MetricsData to all consumers wrapped by the current one.
func (mfc metricsFanOutConnectorV2) ConsumeMetrics(ctx context.Context, md data.MetricData) error {
var errs []error
for _, mc := range mfc {
if err := mc.ConsumeMetrics(ctx, md); err != nil {
errs = append(errs, err)
}
}
return oterr.CombineErrors(errs)
}

// CreateTraceFanOutConnector wraps multiple trace consumers in a single one.
// If any of the wrapped trace consumers are of the new type, use traceFanOutConnectorV2,
// otherwise use the old type connector
func CreateTraceFanOutConnector(tcs []consumer.BaseTraceConsumer) consumer.BaseTraceConsumer {
traceConsumers := make([]consumer.TraceConsumer, 0, len(tcs))
traceConsumersV2 := make([]consumer.TraceConsumerV2, 0, len(tcs))
anyTraceConsumersV2 := false
for _, tc := range tcs {
traceConsumer := tc.(consumer.TraceConsumer)
traceConsumers = append(traceConsumers, traceConsumer)
if traceConsumerV2, ok := tc.(consumer.TraceConsumerV2); ok {
anyTraceConsumersV2 = true
traceConsumersV2 = append(traceConsumersV2, traceConsumerV2)
} else {
traceConsumersV2 = append(traceConsumersV2, consumer.NewInternalToOCTraceConverter(traceConsumer))
}
}

if anyTraceConsumersV2 {
return NewTraceFanOutConnectorV2(traceConsumersV2)
}
return NewTraceFanOutConnector(traceConsumers)
}

// NewTraceFanOutConnector wraps multiple trace consumers in a single one.
func NewTraceFanOutConnector(tcs []consumer.TraceConsumer) consumer.TraceConsumer {
return traceFanOutConnector(tcs)
Expand All @@ -64,3 +133,23 @@ func (tfc traceFanOutConnector) ConsumeTraceData(ctx context.Context, td consume
}
return oterr.CombineErrors(errs)
}

// NewTraceFanOutConnectorV2 wraps multiple new type trace consumers in a single one.
func NewTraceFanOutConnectorV2(tcs []consumer.TraceConsumerV2) consumer.TraceConsumerV2 {
return traceFanOutConnectorV2(tcs)
}

type traceFanOutConnectorV2 []consumer.TraceConsumerV2

var _ consumer.TraceConsumerV2 = (*traceFanOutConnectorV2)(nil)

// ConsumeTrace exports the span data to all trace consumers wrapped by the current one.
func (tfc traceFanOutConnectorV2) ConsumeTrace(ctx context.Context, td data.TraceData) error {
var errs []error
for _, tc := range tfc {
if err := tc.ConsumeTrace(ctx, td); err != nil {
errs = append(errs, err)
}
}
return oterr.CombineErrors(errs)
}
18 changes: 14 additions & 4 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,38 @@ type Processor interface {
GetCapabilities() Capabilities
}

// BaseTraceProcessor is a common interface for TraceProcessor and TraceProcessorV2
type BaseTraceProcessor interface {
Processor
}

// BaseMetricsProcessor is a common interface for MetricsProcessor and MetricsProcessorV2
type BaseMetricsProcessor interface {
Processor
}

// TraceProcessor composes TraceConsumer with some additional processor-specific functions.
type TraceProcessor interface {
consumer.TraceConsumer
Processor
BaseTraceProcessor
}

// MetricsProcessor composes MetricsConsumer with some additional processor-specific functions.
type MetricsProcessor interface {
consumer.MetricsConsumer
Processor
BaseMetricsProcessor
}

// TraceProcessorV2 composes TraceConsumerV2 with some additional processor-specific functions.
type TraceProcessorV2 interface {
consumer.TraceConsumerV2
Processor
BaseTraceProcessor
}

// MetricsProcessorV2 composes MetricsConsumerV2 with some additional processor-specific functions.
type MetricsProcessorV2 interface {
consumer.MetricsConsumerV2
Processor
BaseMetricsProcessor
}

type DualTypeProcessor interface {
Expand Down
Loading

0 comments on commit a161f2d

Please sign in to comment.