Skip to content

Commit

Permalink
Adding opencensus metrics
Browse files Browse the repository at this point in the history
This PR creates an initial structure where the eventlistener sink is able
to record metrics about its operation to a recorder that operates based off
the same config structure that the triggers controller uses.

The metrics recorded in this PR are the histogram of HTTP request duration
and the count of resources created by this eventlistener, categorized
by resource kind.
  • Loading branch information
jmcshane committed May 4, 2021
1 parent 4fc2343 commit 354c3db
Show file tree
Hide file tree
Showing 19 changed files with 784 additions and 186 deletions.
35 changes: 32 additions & 3 deletions cmd/eventlistenersink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,17 @@ import (
"github.com/tektoncd/triggers/pkg/client/informers/externalversions"
triggerLogging "github.com/tektoncd/triggers/pkg/logging"
"github.com/tektoncd/triggers/pkg/sink"
"github.com/tektoncd/triggers/pkg/system"
"go.uber.org/zap"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
cminformer "knative.dev/pkg/configmap/informer"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/profiling"
"knative.dev/pkg/signals"
)

Expand All @@ -57,7 +63,7 @@ func main() {
if err != nil {
log.Fatalf("Failed to get in cluster config: %v", err)
}

ctx, startInformers := injection.EnableInjectionOrDie(ctx, clusterConfig)
kubeClient, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
log.Fatalf("Failed to get the Kubernetes client set: %v", err)
Expand All @@ -68,9 +74,20 @@ func main() {
log.Fatalf("Failed to get the dynamic client: %v", err)
}
dynamicCS := dynamicClientset.New(tekton.WithClient(dynamicClient))
configMapWatcher := cminformer.NewInformedWatcher(kubeClient, system.GetNamespace())

logger := triggerLogging.ConfigureLogging(EventListenerLogKey, ConfigName, ctx.Done(), kubeClient)
logger := triggerLogging.ConfigureLogging(EventListenerLogKey, ConfigName, ctx.Done(), kubeClient, configMapWatcher)
ctx = logging.WithLogger(ctx, logger)

profilingHandler := profiling.NewHandler(logger, false)
profilingServer := profiling.NewServer(profilingHandler)
metrics.MemStatsOrDie(ctx)

sharedmain.WatchObservabilityConfigOrDie(ctx, configMapWatcher, profilingHandler, logger, EventListenerLogKey)
logger.Info("Starting configuration manager...")
if err := configMapWatcher.Start(ctx.Done()); err != nil {
logger.Fatalw("Failed to start configuration manager", zap.Error(err))
}
defer func() {
err := logger.Sync()
if err != nil {
Expand Down Expand Up @@ -98,6 +115,10 @@ func main() {
30*time.Second)
}

recorder, err := sink.NewRecorder()
if err != nil {
logger.Fatal(err)
}
// Create EventListener Sink
r := sink.Sink{
KubeClientSet: kubeClient,
Expand All @@ -108,6 +129,7 @@ func main() {
EventListenerName: sinkArgs.ElName,
EventListenerNamespace: sinkArgs.ElNamespace,
Logger: logger,
Recorder: recorder,
Auth: sink.DefaultAuthOverride{},
// Register all the listers we'll need
EventListenerLister: factory.Triggers().V1alpha1().EventListeners().Lister(),
Expand All @@ -118,6 +140,7 @@ func main() {
ClusterInterceptorLister: factory.Triggers().V1alpha1().ClusterInterceptors().Lister(),
}

startInformers()
// Start and sync the informers before we start taking traffic
withTimeout, cancel := context.WithTimeout(ctx, cacheSyncTimeout)
defer cancel()
Expand All @@ -134,7 +157,9 @@ func main() {
logger.Infof("Listen and serve on port %s", sinkArgs.Port)
mux := http.NewServeMux()
eventHandler := http.HandlerFunc(r.HandleEvent)
mux.Handle("/", r.IsValidPayload(eventHandler))
metricsRecorder := &sink.MetricsHandler{Handler: r.IsValidPayload(eventHandler)}

mux.HandleFunc("/", http.HandlerFunc(metricsRecorder.Intercept(r.NewMetricsRecorderInterceptor())))

// For handling Liveness Probe
// TODO(dibyom): Livness, metrics etc. should be on a separate port
Expand All @@ -161,4 +186,8 @@ func main() {
logger.Fatalf("failed to start eventlistener sink: %v", err)
}
}
err = profilingServer.Shutdown(context.Background())
if err != nil {
logger.Fatalf("failed to shutdown profiling server: %v", err)
}
}
16 changes: 16 additions & 0 deletions docs/eventlisteners.md
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,22 @@ To access your `EventListener` logs, query your cluster for Pods whose `eventlis
kubectl get pods --selector eventlistener=my-eventlistener
```

## Configuring metrics for `EventListeners`

The following pipeline metrics are available on the `eventlistener` Service on port `9090`.

| Name | Type | Labels/Tags | Status |
| ---------- | ----------- | ----------- | ----------- |
| `eventlistener_triggered_resources` | Counter | `kind`=<kind> | experimental |
| `eventlistener_http_duration_seconds_[bucket, sum, count]` | Histogram | `status`=&lt;status&gt; <br> | experimental |

Several kinds of exporters can be configured for an `EventListener`, including Prometheus, Google Stackdriver, and many others.
You can configure metrics using the [`config-observability-triggers` config map](../config/config-observability.yaml) in the `EventListener` namespaces.
There is a `config-observability-triggers` configmap in the `tekton-pipelines` namespace that can be configured for the operation of the Triggers
webhook and controller components.

See [the Knative documentation](https://github.com/knative/pkg/blob/main/metrics/README.md) for more information about available exporters and configuration values.

## Exposing an `EventListener` outside of the cluster

By default, `ClusterIP` services such as `EventListeners` are only accessible within the cluster on which they are running.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/tektoncd/plumbing v0.0.0-20210420200944-17170d5e7bc9
github.com/tidwall/gjson v1.3.5 // indirect
github.com/tidwall/sjson v1.0.4
go.opencensus.io v0.23.0
go.uber.org/zap v1.16.0
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
google.golang.org/genproto v0.0.0-20201211151036-40ec1c210f7a
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-containerregistry v0.4.1-0.20210128200529-19c2b639fab1 h1:o2ykCuuhHeUwtzNg89pH2hi+821aqjLWkaREVR3ziTQ=
Expand Down Expand Up @@ -815,6 +816,8 @@ go.opencensus.io v0.22.4-0.20200608061201-1901b56b9515/go.mod h1:yxeiOL68Rb0Xd1d
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down
8 changes: 1 addition & 7 deletions pkg/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"flag"
"log"

"github.com/tektoncd/triggers/pkg/system"
"go.uber.org/zap"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/configmap"
Expand All @@ -30,7 +29,7 @@ import (
)

// Configure logging
func ConfigureLogging(logKeyString, configName string, stopCh <-chan struct{}, kubeClient *kubernetes.Clientset) *zap.SugaredLogger {
func ConfigureLogging(logKeyString, configName string, stopCh <-chan struct{}, kubeClient *kubernetes.Clientset, configMapWatcher *cminformer.InformedWatcher) *zap.SugaredLogger {
flag.Parse()
cm, err := configmap.Load("/etc/config-logging")
if err != nil {
Expand All @@ -46,11 +45,6 @@ func ConfigureLogging(logKeyString, configName string, stopCh <-chan struct{}, k

logger.Infof("Starting the Configuration %v", logKeyString)

// Watch the logging config map and dynamically update logging levels.
configMapWatcher := cminformer.NewInformedWatcher(kubeClient, system.GetNamespace())
configMapWatcher.Watch(configName, logging.UpdateLevelFromConfigMap(logger, atomicLevel, logKeyString))
if err = configMapWatcher.Start(stopCh); err != nil {
logger.Fatalf("failed to start configuration manager: %v", err)
}
return logger
}
63 changes: 61 additions & 2 deletions pkg/reconciler/v1alpha1/eventlistener/eventlistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
corev1lister "k8s.io/client-go/listers/core/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/ptr"
pkgreconciler "knative.dev/pkg/reconciler"
)
Expand All @@ -64,11 +65,15 @@ const (
eventListenerServiceTLSPortName = "https-listener"
// eventListenerContainerPort defines the port exposed by the EventListener Container
eventListenerContainerPort = 8000
// eventListenerMetricsPort defines the port exposed by the EventListener metrics endpoint
eventListenerMetricsPort = 9090
// GeneratedResourcePrefix is the name prefix for resources generated in the
// EventListener reconciler
GeneratedResourcePrefix = "el"

defaultConfig = `{"level": "info","development": false,"sampling": {"initial": 100,"thereafter": 100},"outputPaths": ["stdout"],"errorOutputPaths": ["stderr"],"encoding": "json","encoderConfig": {"timeKey": "ts","levelKey": "level","nameKey": "logger","callerKey": "caller","messageKey": "msg","stacktraceKey": "stacktrace","lineEnding": "","levelEncoder": "","timeEncoder": "iso8601","durationEncoder": "","callerEncoder": ""}}`

triggersMetricsDomain = "tekton.dev/triggers"
)

// Reconciler implements controller.Reconciler for Configuration resources.
Expand Down Expand Up @@ -183,8 +188,7 @@ func (r *Reconciler) reconcileService(ctx context.Context, logger *zap.SugaredLo
Spec: corev1.ServiceSpec{
Selector: GenerateResourceLabels(el.Name, r.config.StaticResourceLabels),
Type: serviceType,
Ports: []corev1.ServicePort{servicePort},
},
Ports: []corev1.ServicePort{servicePort}},
}
existingService, err := r.serviceLister.Services(el.Namespace).Get(el.Status.Configuration.GeneratedResourceName)
switch {
Expand Down Expand Up @@ -246,12 +250,29 @@ func (r *Reconciler) reconcileLoggingConfig(ctx context.Context, logger *zap.Sug
return nil
}

func (r *Reconciler) reconcileObservabilityConfig(ctx context.Context, logger *zap.SugaredLogger, el *v1alpha1.EventListener) error {
if _, err := r.configmapLister.ConfigMaps(el.Namespace).Get(metrics.ConfigMapName()); errors.IsNotFound(err) {
if _, err := r.KubeClientSet.CoreV1().ConfigMaps(el.Namespace).Create(ctx, defaultObservabilityConfigMap(), metav1.CreateOptions{}); err != nil {
logger.Errorf("Failed to create observability config: %s. EventListener won't start.", err)
return err
}
} else if err != nil {
logger.Errorf("Error retrieving ConfigMap %q: %s", metrics.ConfigMapName(), err)
return err
}
return nil
}

func (r *Reconciler) reconcileDeployment(ctx context.Context, logger *zap.SugaredLogger, el *v1alpha1.EventListener) error {
// check logging config, create if it doesn't exist
if err := r.reconcileLoggingConfig(ctx, logger, el); err != nil {
logger.Error(err)
return err
}
if err := r.reconcileObservabilityConfig(ctx, logger, el); err != nil {
logger.Error(err)
return err
}

container := getContainer(el, r.config, nil)
container.VolumeMounts = []corev1.VolumeMount{{
Expand All @@ -265,6 +286,14 @@ func (r *Reconciler) reconcileDeployment(ctx context.Context, logger *zap.Sugare
FieldPath: "metadata.namespace",
}},
})
container.Env = append(container.Env, corev1.EnvVar{
Name: "CONFIG_OBSERVABILITY_NAME",
Value: metrics.ConfigMapName(),
})
container.Env = append(container.Env, corev1.EnvVar{
Name: "METRICS_DOMAIN",
Value: triggersMetricsDomain,
})
container = addCertsForSecureConnection(container, r.config)

deployment := getDeployment(el, r.config)
Expand Down Expand Up @@ -423,6 +452,14 @@ func (r *Reconciler) reconcileCustomObject(ctx context.Context, logger *zap.Suga
// https://github.com/knative/serving/blob/master/pkg/apis/config/features.go#L48
Value: el.Namespace,
})
container.Env = append(container.Env, corev1.EnvVar{
Name: "CONFIG_OBSERVABILITY_NAME",
Value: metrics.ConfigMapName(),
})
container.Env = append(container.Env, corev1.EnvVar{
Name: "METRICS_DOMAIN",
Value: triggersMetricsDomain,
})

podlabels := mergeMaps(el.Labels, GenerateResourceLabels(el.Name, r.config.StaticResourceLabels))

Expand Down Expand Up @@ -656,6 +693,15 @@ func getDeployment(el *v1alpha1.EventListener, c Config) *appsv1.Deployment {
FieldPath: "metadata.namespace",
}},
})
container.Env = append(container.Env, corev1.EnvVar{
Name: "CONFIG_OBSERVABILITY_NAME",
Value: metrics.ConfigMapName(),
})
container.Env = append(container.Env, corev1.EnvVar{
Name: "METRICS_DOMAIN",
Value: triggersMetricsDomain,
})

container = addCertsForSecureConnection(container, c)
for _, v := range container.Env {
// If TLS related env are set then mount secret volume which will be used while starting the eventlistener.
Expand Down Expand Up @@ -804,6 +850,9 @@ func getContainer(el *v1alpha1.EventListener, c Config, pod *duckv1.WithPod) cor
Ports: []corev1.ContainerPort{{
ContainerPort: int32(eventListenerContainerPort),
Protocol: corev1.ProtocolTCP,
}, {
ContainerPort: int32(eventListenerMetricsPort),
Protocol: corev1.ProtocolTCP,
}},
Resources: resources,
Args: []string{
Expand Down Expand Up @@ -927,3 +976,13 @@ func mergeMaps(m1, m2 map[string]string) map[string]string {
}
return merged
}

func defaultObservabilityConfigMap() *corev1.ConfigMap {
return &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: metrics.ConfigMapName()},
Data: map[string]string{
//TODO: Better nonempty config
"_example": "See tekton-pipelines namespace for valid values",
},
}
}
Loading

0 comments on commit 354c3db

Please sign in to comment.