Skip to content

Commit

Permalink
Fix race condition in otlpexporter tests (#5908)
Browse files Browse the repository at this point in the history
  • Loading branch information
bogdandrutu authored Aug 15, 2022
1 parent ea5b75d commit 1f2fa1b
Showing 1 changed file with 22 additions and 16 deletions.
38 changes: 22 additions & 16 deletions exporter/otlpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,23 @@ type mockReceiver struct {
totalItems *atomic.Int32
mux sync.Mutex
metadata metadata.MD
exportError error
}

func (r *mockReceiver) GetMetadata() metadata.MD {
func (r *mockReceiver) getMetadata() metadata.MD {
r.mux.Lock()
defer r.mux.Unlock()
return r.metadata
}

func (r *mockReceiver) setExportError(err error) {
r.mux.Lock()
defer r.mux.Unlock()
r.exportError = err
}

type mockTracesReceiver struct {
mockReceiver
exportError error
lastRequest ptrace.Traces
}

Expand All @@ -78,7 +84,7 @@ func (r *mockTracesReceiver) Export(ctx context.Context, req ptraceotlp.Request)
return ptraceotlp.NewResponse(), r.exportError
}

func (r *mockTracesReceiver) GetLastRequest() ptrace.Traces {
func (r *mockTracesReceiver) getLastRequest() ptrace.Traces {
r.mux.Lock()
defer r.mux.Unlock()
return r.lastRequest
Expand Down Expand Up @@ -130,10 +136,10 @@ func (r *mockLogsReceiver) Export(ctx context.Context, req plogotlp.Request) (pl
defer r.mux.Unlock()
r.lastRequest = ld
r.metadata, _ = metadata.FromIncomingContext(ctx)
return plogotlp.NewResponse(), nil
return plogotlp.NewResponse(), r.exportError
}

func (r *mockLogsReceiver) GetLastRequest() plog.Logs {
func (r *mockLogsReceiver) getLastRequest() plog.Logs {
r.mux.Lock()
defer r.mux.Unlock()
return r.lastRequest
Expand Down Expand Up @@ -170,10 +176,10 @@ func (r *mockMetricsReceiver) Export(ctx context.Context, req pmetricotlp.Reques
defer r.mux.Unlock()
r.lastRequest = md
r.metadata, _ = metadata.FromIncomingContext(ctx)
return pmetricotlp.NewResponse(), nil
return pmetricotlp.NewResponse(), r.exportError
}

func (r *mockMetricsReceiver) GetLastRequest() pmetric.Metrics {
func (r *mockMetricsReceiver) getLastRequest() pmetric.Metrics {
r.mux.Lock()
defer r.mux.Unlock()
return r.lastRequest
Expand Down Expand Up @@ -262,9 +268,9 @@ func TestSendTraces(t *testing.T) {
// Verify received span.
assert.EqualValues(t, 2, rcv.totalItems.Load())
assert.EqualValues(t, 2, rcv.requestCount.Load())
assert.EqualValues(t, td, rcv.GetLastRequest())
assert.EqualValues(t, td, rcv.getLastRequest())

md := rcv.GetMetadata()
md := rcv.getMetadata()
require.EqualValues(t, md.Get("header"), expectedHeader)
require.Equal(t, len(md.Get("User-Agent")), 1)
require.Contains(t, md.Get("User-Agent")[0], "Collector/1.2.3test")
Expand Down Expand Up @@ -408,9 +414,9 @@ func TestSendMetrics(t *testing.T) {
// Verify received metrics.
assert.EqualValues(t, 2, rcv.requestCount.Load())
assert.EqualValues(t, 4, rcv.totalItems.Load())
assert.EqualValues(t, md, rcv.GetLastRequest())
assert.EqualValues(t, md, rcv.getLastRequest())

mdata := rcv.GetMetadata()
mdata := rcv.getMetadata()
require.EqualValues(t, mdata.Get("header"), expectedHeader)
require.Equal(t, len(mdata.Get("User-Agent")), 1)
require.Contains(t, mdata.Get("User-Agent")[0], "Collector/1.2.3test")
Expand Down Expand Up @@ -532,7 +538,7 @@ func TestSendTracesOnResourceExhaustion(t *testing.T) {
ln, err := net.Listen("tcp", "localhost:")
require.NoError(t, err)
rcv, _ := otlpTracesReceiverOnGRPCServer(ln, false)
rcv.exportError = status.Error(codes.ResourceExhausted, "resource exhausted")
rcv.setExportError(status.Error(codes.ResourceExhausted, "resource exhausted"))
defer rcv.srv.GracefulStop()

factory := NewFactory()
Expand Down Expand Up @@ -571,7 +577,7 @@ func TestSendTracesOnResourceExhaustion(t *testing.T) {
st, _ = st.WithDetails(&errdetails.RetryInfo{
RetryDelay: durationpb.New(100 * time.Millisecond),
})
rcv.exportError = st.Err()
rcv.setExportError(st.Err())

assert.NoError(t, exp.ConsumeTraces(context.Background(), td))

Expand Down Expand Up @@ -601,7 +607,7 @@ func startServerAndMakeRequest(t *testing.T, exp component.TracesExporter, td pt

// Verify received span.
assert.EqualValues(t, 2, rcv.totalItems.Load())
assert.EqualValues(t, expectedData, rcv.GetLastRequest())
assert.EqualValues(t, expectedData, rcv.getLastRequest())
}

func TestSendLogData(t *testing.T) {
Expand Down Expand Up @@ -664,9 +670,9 @@ func TestSendLogData(t *testing.T) {
// Verify received logs.
assert.EqualValues(t, 2, rcv.requestCount.Load())
assert.EqualValues(t, 2, rcv.totalItems.Load())
assert.EqualValues(t, ld, rcv.GetLastRequest())
assert.EqualValues(t, ld, rcv.getLastRequest())

md := rcv.GetMetadata()
md := rcv.getMetadata()
require.Equal(t, len(md.Get("User-Agent")), 1)
require.Contains(t, md.Get("User-Agent")[0], "Collector/1.2.3test")
}

0 comments on commit 1f2fa1b

Please sign in to comment.