Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add panic counter to observability middleware #1871

Merged
merged 9 commits into from
Jan 22, 2020
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
concrete types.
However, we expect that in practice, peer lists are used as either peer.List,
peer.Chooser, or for the private introspection interface.
- Observability middleware now emits metrics for panics that occur on the stack of an inbound call handler.

## [1.42.1] - 2019-11-27 (Gobble)
### Fixed
Expand Down
12 changes: 12 additions & 0 deletions internal/observability/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ func (c call) EndWithAppError(err error, isApplicationError bool) {
c.endStats(elapsed, err, isApplicationError)
}

// EndWithPanic ends the call with additional panic metrics
func (c call) EndWithPanic(err error) {
c.edge.panics.Inc()
c.EndWithAppError(err, true)
}

func (c call) endLogs(elapsed time.Duration, err error, isApplicationError bool) {
var ce *zapcore.CheckedEntry
if err == nil && !isApplicationError {
Expand Down Expand Up @@ -203,6 +209,12 @@ func (c call) EndStream(err error) {
c.emitStreamError(err)
}

// EndStreamWithPanic ends the stream call with additional panic metrics
func (c call) EndStreamWithPanic(err error) {
c.edge.panics.Inc()
c.EndStream(err)
}

// This function resembles EndStats for unary calls. However, we do not special
// case application errors and it does not measure failure latencies as those
// measurements are irrelevant for streams.
Expand Down
16 changes: 13 additions & 3 deletions internal/observability/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,33 @@ type fakeAck struct{}
func (a fakeAck) String() string { return "" }

type fakeHandler struct {
err error
applicationErr bool
handleStream func(*transport.ServerStream)
err error
applicationErr bool
applicationPanic bool
handleStream func(*transport.ServerStream)
}

func (h fakeHandler) Handle(_ context.Context, _ *transport.Request, rw transport.ResponseWriter) error {
if h.applicationPanic {
panic("application panicked")
}
if h.applicationErr {
rw.SetApplicationError()
}
return h.err
}

func (h fakeHandler) HandleOneway(context.Context, *transport.Request) error {
if h.applicationPanic {
panic("application panicked")
}
return h.err
}

func (h fakeHandler) HandleStream(stream *transport.ServerStream) error {
if h.applicationPanic {
panic("application panicked")
}
if h.handleStream != nil {
h.handleStream(stream)
}
Expand Down
10 changes: 10 additions & 0 deletions internal/observability/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ type edge struct {

calls *metrics.Counter
successes *metrics.Counter
panics *metrics.Counter
callerFailures *metrics.CounterVector
serverFailures *metrics.CounterVector

Expand Down Expand Up @@ -206,6 +207,14 @@ func newEdge(logger *zap.Logger, meter *metrics.Scope, req *transport.Request, d
if err != nil {
logger.Error("Failed to create successes counter.", zap.Error(err))
}
panics, err := meter.Counter(metrics.Spec{
Name: "panics",
Help: "Number of RPCs failed because of panic.",
ConstTags: tags,
})
if err != nil {
logger.Error("Failed to create panics counter.", zap.Error(err))
}
callerFailures, err := meter.CounterVector(metrics.Spec{
Name: "caller_failures",
Help: "Number of RPCs failed because of caller error.",
Expand Down Expand Up @@ -371,6 +380,7 @@ func newEdge(logger *zap.Logger, meter *metrics.Scope, req *transport.Request, d
logger: logger,
calls: calls,
successes: successes,
panics: panics,
callerFailures: callerFailures,
serverFailures: serverFailures,
latencies: latencies,
Expand Down
26 changes: 26 additions & 0 deletions internal/observability/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package observability

import (
"context"
"fmt"
"sync"

"go.uber.org/net/metrics"
Expand Down Expand Up @@ -137,6 +138,8 @@ func applyLogLevelsConfig(dst *levels, src *DirectionalLevelsConfig) {
// Handle implements middleware.UnaryInbound.
func (m *Middleware) Handle(ctx context.Context, req *transport.Request, w transport.ResponseWriter, h transport.UnaryHandler) error {
call := m.graph.begin(ctx, transport.Unary, _directionInbound, req)
defer m.handlePanicForCall(call, transport.Unary)

wrappedWriter := newWriter(w)
err := h.Handle(ctx, req, wrappedWriter)
call.EndWithAppError(err, wrappedWriter.isApplicationError)
Expand Down Expand Up @@ -176,6 +179,8 @@ func (m *Middleware) CallOneway(ctx context.Context, req *transport.Request, out
// HandleStream implements middleware.StreamInbound.
func (m *Middleware) HandleStream(serverStream *transport.ServerStream, h transport.StreamHandler) error {
call := m.graph.begin(serverStream.Context(), transport.Streaming, _directionInbound, serverStream.Request().Meta.ToRequest())
defer m.handlePanicForCall(call, transport.Streaming)

call.EndStreamHandshake()
err := h.HandleStream(call.WrapServerStream(serverStream))
call.EndStream(err)
Expand All @@ -192,3 +197,24 @@ func (m *Middleware) CallStream(ctx context.Context, request *transport.StreamRe
}
return call.WrapClientStream(clientStream), nil
}

// handlePanicForCall checks for a panic without actually recovering from it
// it must be called in defer otherwise recover will act as a no-op
// The only action this method takes is to emit panic metrics
func (m *Middleware) handlePanicForCall(call call, transportType transport.Type) {
// We only want to emit panic metrics without actually recovering from it
// Actual recovery from a panic happens at top of the stack in transport's Handler Invoker
// As this middleware is the one and only one with Metrics responsibility, we just panic again after
// checking for panic without actually recovering from it
if e := recover(); e != nil {
err := fmt.Errorf("panic: %v", e)

// Emit only the panic metrics
if transportType == transport.Streaming {
call.EndStreamWithPanic(err)
} else {
call.EndWithPanic(err)
}
panic(e)
}
}
166 changes: 165 additions & 1 deletion internal/observability/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,7 @@ func TestMiddlewareMetrics(t *testing.T) {
free()
assert.Equal(t, int64(tt.wantCalls), edge.calls.Load())
assert.Equal(t, int64(tt.wantSuccesses), edge.successes.Load())
assert.Equal(t, int64(0), edge.panics.Load())
for tagName, val := range tt.wantCallerFailures {
assert.Equal(t, int64(val), edge.callerFailures.MustGet(_error, tagName).Load())
}
Expand Down Expand Up @@ -996,6 +997,7 @@ func TestMiddlewareSuccessSnapshot(t *testing.T) {
want := &metrics.RootSnapshot{
Counters: []metrics.Snapshot{
{Name: "calls", Tags: tags, Value: 1},
{Name: "panics", Tags: tags, Value: 0},
{Name: "successes", Tags: tags, Value: 1},
},
Histograms: []metrics.HistogramSnapshot{
Expand Down Expand Up @@ -1075,6 +1077,7 @@ func TestMiddlewareFailureSnapshot(t *testing.T) {
want := &metrics.RootSnapshot{
Counters: []metrics.Snapshot{
{Name: "calls", Tags: tags, Value: 1},
{Name: "panics", Tags: tags, Value: 0},
{Name: "server_failures", Tags: errorTags, Value: 1},
{Name: "successes", Tags: tags, Value: 0},
},
Expand Down Expand Up @@ -1185,6 +1188,7 @@ func TestApplicationErrorSnapShot(t *testing.T) {
Counters: []metrics.Snapshot{
{Name: "caller_failures", Tags: errorTags, Value: 1},
{Name: "calls", Tags: tags, Value: 1},
{Name: "panics", Tags: tags, Value: 0},
{Name: "successes", Tags: tags, Value: 0},
},
Histograms: []metrics.HistogramSnapshot{
Expand All @@ -1211,6 +1215,159 @@ func TestApplicationErrorSnapShot(t *testing.T) {
}
}

func TestStreamingInboundApplicationPanics(t *testing.T) {
var err error
root := metrics.New()
scope := root.Scope()
mw := NewMiddleware(Config{
Logger: zap.NewNop(),
Scope: scope,
ContextExtractor: NewNopContextExtractor(),
})
newTags := func(direction directionName, withErr string) metrics.Tags {
tags := metrics.Tags{
"dest": "service",
"direction": string(direction),
"encoding": "raw",
"procedure": "procedure",
"routing_delegate": "rd",
"routing_key": "rk",
"rpc_type": transport.Unary.String(),
"source": "caller",
"transport": "unknown",
}
if withErr != "" {
tags["error"] = withErr
}
return tags
}
tags := newTags(_directionInbound, "")
errTags := newTags(_directionInbound, "application_error")

t.Run("Test panic in Handle", func(t *testing.T) {
// As our fake handler is mocked to panic in the call, test that the invocation panics
assert.Panics(t, func() {
err = mw.Handle(
context.Background(),
&transport.Request{
Caller: "caller",
Service: "service",
Transport: "",
Encoding: "raw",
Procedure: "procedure",
ShardKey: "sk",
RoutingKey: "rk",
RoutingDelegate: "rd",
},
&transporttest.FakeResponseWriter{},
fakeHandler{applicationPanic: true},
)
})
require.NoError(t, err)

want := &metrics.RootSnapshot{
Counters: []metrics.Snapshot{
{Name: "caller_failures", Tags: errTags, Value: 1},
{Name: "calls", Tags: tags, Value: 1},
{Name: "panics", Tags: tags, Value: 1},
{Name: "successes", Tags: tags, Value: 0},
},
Histograms: []metrics.HistogramSnapshot{
{
Name: "caller_failure_latency_ms",
Tags: tags,
Unit: time.Millisecond,
Values: []int64{1},
},
{
Name: "server_failure_latency_ms",
Tags: tags,
Unit: time.Millisecond,
},
{
Name: "success_latency_ms",
Tags: tags,
Unit: time.Millisecond,
},
},
}
assert.Equal(t, want, root.Snapshot(), "unexpected metrics snapshot")
})
}

func TestUnaryInboundApplicationPanics(t *testing.T) {
root := metrics.New()
scope := root.Scope()
mw := NewMiddleware(Config{
Logger: zap.NewNop(),
Scope: scope,
ContextExtractor: NewNopContextExtractor(),
})
stream, err := transport.NewServerStream(&fakeStream{
request: &transport.StreamRequest{
Meta: &transport.RequestMeta{
Caller: "caller",
Service: "service",
Transport: "",
Encoding: "raw",
Procedure: "procedure",
ShardKey: "sk",
RoutingKey: "rk",
RoutingDelegate: "rd",
},
},
})
require.NoError(t, err)
newTags := func(direction directionName, withErr string) metrics.Tags {
tags := metrics.Tags{
"dest": "service",
"direction": string(direction),
"encoding": "raw",
"procedure": "procedure",
"routing_delegate": "rd",
"routing_key": "rk",
"rpc_type": transport.Streaming.String(),
"source": "caller",
"transport": "unknown",
}
if withErr != "" {
tags["error"] = withErr
}
return tags
}
tags := newTags(_directionInbound, "")
errTags := newTags(_directionInbound, "unknown_internal_yarpc")

t.Run("Test panic in HandleStream", func(t *testing.T) {
// As our fake handler is mocked to panic in the call, test that the invocation panics
assert.Panics(t, func() {
err = mw.HandleStream(stream, &fakeHandler{applicationPanic: true})
})
require.NoError(t, err)

want := &metrics.RootSnapshot{
Counters: []metrics.Snapshot{
{Name: "calls", Tags: tags, Value: 1},
{Name: "panics", Tags: tags, Value: 1},
{Name: "server_failures", Tags: errTags, Value: 1},
{Name: "stream_receive_successes", Tags: tags, Value: 0},
{Name: "stream_receives", Tags: tags, Value: 0},
{Name: "stream_send_successes", Tags: tags, Value: 0},
{Name: "stream_sends", Tags: tags, Value: 0},
{Name: "successes", Tags: tags, Value: 1},
},
Gauges: []metrics.Snapshot{
{Name: "streams_active", Tags: tags, Value: 0},
},
Histograms: []metrics.HistogramSnapshot{
{Name: "stream_duration_ms", Tags: tags, Unit: time.Millisecond, Values: []int64{1}},
},
}
assert.Equal(t, want, root.Snapshot(), "unexpected metrics snapshot")
})

}

func TestStreamingMetrics(t *testing.T) {
defer stubTime()()

Expand Down Expand Up @@ -1272,6 +1429,7 @@ func TestStreamingMetrics(t *testing.T) {
want := &metrics.RootSnapshot{
Counters: []metrics.Snapshot{
{Name: "calls", Tags: tags, Value: 1},
{Name: "panics", Tags: tags, Value: 0},
{Name: "stream_receive_successes", Tags: tags, Value: 1},
{Name: "stream_receives", Tags: tags, Value: 1},
{Name: "stream_send_successes", Tags: tags, Value: 1},
Expand Down Expand Up @@ -1336,11 +1494,13 @@ func TestStreamingMetrics(t *testing.T) {
if statusFault(yarpcerrors.FromError(tt.err)) == clientFault {
counters = append(counters,
metrics.Snapshot{Name: "caller_failures", Tags: errTags, Value: 1},
metrics.Snapshot{Name: "calls", Tags: successTags, Value: 1})
metrics.Snapshot{Name: "calls", Tags: successTags, Value: 1},
metrics.Snapshot{Name: "panics", Tags: successTags, Value: 0})

} else {
counters = append(counters,
metrics.Snapshot{Name: "calls", Tags: successTags, Value: 1},
metrics.Snapshot{Name: "panics", Tags: successTags, Value: 0},
metrics.Snapshot{Name: "server_failures", Tags: errTags, Value: 1})
}

Expand Down Expand Up @@ -1400,6 +1560,7 @@ func TestStreamingMetrics(t *testing.T) {
want := &metrics.RootSnapshot{
Counters: []metrics.Snapshot{
{Name: "calls", Tags: successTags, Value: 1},
{Name: "panics", Tags: successTags, Value: 0},
{Name: "stream_receive_failures", Tags: errTags, Value: 1},
{Name: "stream_receive_successes", Tags: successTags, Value: 0},
{Name: "stream_receives", Tags: successTags, Value: 1},
Expand Down Expand Up @@ -1442,6 +1603,7 @@ func TestStreamingMetrics(t *testing.T) {
want := &metrics.RootSnapshot{
Counters: []metrics.Snapshot{
{Name: "calls", Tags: tags, Value: 1},
{Name: "panics", Tags: tags, Value: 0},
{Name: "stream_receive_successes", Tags: tags, Value: 1},
{Name: "stream_receives", Tags: tags, Value: 1},
{Name: "stream_send_successes", Tags: tags, Value: 1},
Expand Down Expand Up @@ -1480,6 +1642,7 @@ func TestStreamingMetrics(t *testing.T) {
// into tags()
Counters: []metrics.Snapshot{
{Name: "calls", Tags: successTags, Value: 1},
{Name: "panics", Tags: successTags, Value: 0},
{Name: "server_failures", Tags: errTags, Value: 1},
{Name: "stream_receive_successes", Tags: successTags, Value: 0},
{Name: "stream_receives", Tags: successTags, Value: 0},
Expand Down Expand Up @@ -1533,6 +1696,7 @@ func TestStreamingMetrics(t *testing.T) {
want := &metrics.RootSnapshot{
Counters: []metrics.Snapshot{
{Name: "calls", Tags: successTags, Value: 1},
{Name: "panics", Tags: successTags, Value: 0},
{Name: "server_failures", Tags: errTags, Value: 1},
{Name: "stream_receive_failures", Tags: errTags, Value: 1},
{Name: "stream_receive_successes", Tags: successTags, Value: 0},
Expand Down