Skip to content

Commit

Permalink
Fix sdktrace.TraceProvider Shutdown/ForceFlush when no processor regi…
Browse files Browse the repository at this point in the history
…ster

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Oct 11, 2022
1 parent c5ebbc4 commit 6339700
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Fixed

- Use default view if instrument does not match any registered view of a reader. (#3224, #3237)
- `sdktrace.TraceProvider.Shutdown` and `sdktrace.TraceProvider.ForceFlush` to not return error when no processor register. (#3268)

## [0.32.1] Metric SDK (Alpha) - 2022-09-22

Expand Down
30 changes: 11 additions & 19 deletions sdk/trace/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func NewTracerProvider(opts ...TracerProviderOption) *TracerProvider {
spanLimits: o.spanLimits,
resource: o.resource,
}

tp.spanProcessors.Store(spanProcessorStates{})
global.Info("TracerProvider created", "config", o)

for _, sp := range o.processors {
Expand Down Expand Up @@ -163,26 +163,20 @@ func (p *TracerProvider) RegisterSpanProcessor(s SpanProcessor) {
p.mu.Lock()
defer p.mu.Unlock()
newSPS := spanProcessorStates{}
if old, ok := p.spanProcessors.Load().(spanProcessorStates); ok {
newSPS = append(newSPS, old...)
}
newSpanSync := &spanProcessorState{
sp: s,
state: &sync.Once{},
}
newSPS = append(newSPS, newSpanSync)
newSPS = append(newSPS, p.spanProcessors.Load().(spanProcessorStates)...)
newSPS = append(newSPS, &spanProcessorState{sp: s, state: &sync.Once{}})
p.spanProcessors.Store(newSPS)
}

// UnregisterSpanProcessor removes the given SpanProcessor from the list of SpanProcessors.
func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) {
p.mu.Lock()
defer p.mu.Unlock()
spss := spanProcessorStates{}
old, ok := p.spanProcessors.Load().(spanProcessorStates)
if !ok || len(old) == 0 {
old := p.spanProcessors.Load().(spanProcessorStates)
if len(old) == 0 {
return
}
spss := spanProcessorStates{}
spss = append(spss, old...)

// stop the span processor if it is started and remove it from the list
Expand Down Expand Up @@ -213,10 +207,7 @@ func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) {
// ForceFlush immediately exports all spans that have not yet been exported for
// all the registered span processors.
func (p *TracerProvider) ForceFlush(ctx context.Context) error {
spss, ok := p.spanProcessors.Load().(spanProcessorStates)
if !ok {
return fmt.Errorf("failed to load span processors")
}
spss := p.spanProcessors.Load().(spanProcessorStates)
if len(spss) == 0 {
return nil
}
Expand All @@ -237,10 +228,11 @@ func (p *TracerProvider) ForceFlush(ctx context.Context) error {

// Shutdown shuts down the span processors in the order they were registered.
func (p *TracerProvider) Shutdown(ctx context.Context) error {
spss, ok := p.spanProcessors.Load().(spanProcessorStates)
if !ok {
return fmt.Errorf("failed to load span processors")
spss := p.spanProcessors.Load().(spanProcessorStates)
if len(spss) == 0 {
return nil
}

var retErr error
for _, sps := range spss {
select {
Expand Down
51 changes: 26 additions & 25 deletions sdk/trace/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,41 +28,45 @@ import (
"go.opentelemetry.io/otel/trace"
)

type basicSpanProcesor struct {
running bool
type basicSpanProcessor struct {
flushed bool
closed bool
injectShutdownError error
}

func (t *basicSpanProcesor) Shutdown(context.Context) error {
t.running = false
func (t *basicSpanProcessor) Shutdown(context.Context) error {
t.closed = true
return t.injectShutdownError
}

func (t *basicSpanProcesor) OnStart(context.Context, ReadWriteSpan) {}
func (t *basicSpanProcesor) OnEnd(ReadOnlySpan) {}
func (t *basicSpanProcesor) ForceFlush(context.Context) error {
func (t *basicSpanProcessor) OnStart(context.Context, ReadWriteSpan) {}
func (t *basicSpanProcessor) OnEnd(ReadOnlySpan) {}
func (t *basicSpanProcessor) ForceFlush(context.Context) error {
t.flushed = true
return nil
}

func TestForceFlushAndShutdownTraceProviderWithoutProcessor(t *testing.T) {
stp := NewTracerProvider()
assert.NoError(t, stp.ForceFlush(context.Background()))
assert.NoError(t, stp.Shutdown(context.Background()))
}

func TestShutdownTraceProvider(t *testing.T) {
stp := NewTracerProvider()
sp := &basicSpanProcesor{}
sp := &basicSpanProcessor{}
stp.RegisterSpanProcessor(sp)

sp.running = true

_ = stp.Shutdown(context.Background())

if sp.running {
t.Errorf("Error shutdown basicSpanProcesor\n")
}
assert.NoError(t, stp.ForceFlush(context.Background()))
assert.True(t, sp.flushed, "error ForceFlush basicSpanProcessor")
assert.NoError(t, stp.Shutdown(context.Background()))
assert.True(t, sp.closed, "error Shutdown basicSpanProcessor")
}

func TestFailedProcessorShutdown(t *testing.T) {
stp := NewTracerProvider()
spErr := errors.New("basic span processor shutdown failure")
sp := &basicSpanProcesor{
running: true,
sp := &basicSpanProcessor{
injectShutdownError: spErr,
}
stp.RegisterSpanProcessor(sp)
Expand All @@ -76,12 +80,10 @@ func TestFailedProcessorsShutdown(t *testing.T) {
stp := NewTracerProvider()
spErr1 := errors.New("basic span processor shutdown failure1")
spErr2 := errors.New("basic span processor shutdown failure2")
sp1 := &basicSpanProcesor{
running: true,
sp1 := &basicSpanProcessor{
injectShutdownError: spErr1,
}
sp2 := &basicSpanProcesor{
running: true,
sp2 := &basicSpanProcessor{
injectShutdownError: spErr2,
}
stp.RegisterSpanProcessor(sp1)
Expand All @@ -90,16 +92,15 @@ func TestFailedProcessorsShutdown(t *testing.T) {
err := stp.Shutdown(context.Background())
assert.Error(t, err)
assert.EqualError(t, err, "basic span processor shutdown failure1; basic span processor shutdown failure2")
assert.False(t, sp1.running)
assert.False(t, sp2.running)
assert.True(t, sp1.closed)
assert.True(t, sp2.closed)
}

func TestFailedProcessorShutdownInUnregister(t *testing.T) {
handler.Reset()
stp := NewTracerProvider()
spErr := errors.New("basic span processor shutdown failure")
sp := &basicSpanProcesor{
running: true,
sp := &basicSpanProcessor{
injectShutdownError: spErr,
}
stp.RegisterSpanProcessor(sp)
Expand Down

0 comments on commit 6339700

Please sign in to comment.