Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Ability to extend and configure desired sink to report lag metrics, adding support to push lag metrics into InfluxDB as well #157

Merged
merged 8 commits into from
Dec 17, 2020
35 changes: 21 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,9 @@ defaults defined in the project itself. See [`reference.conf`](./src/main/resou

### Reporters

It is possible to report (either or both):
It is possible to report (either one, multiple or all):

- to influxdb via the config `kafka-lag-exporter.reporters.influxdb`
- to graphite via the config `kafka-lag-exporter.reporters.graphite`
- as prometheus via the config `kafka-lag-exporter.reporters.prometheus`

Expand All @@ -240,19 +241,25 @@ See section below for more information.

General Configuration (`kafka-lag-exporter{}`)

| Key | Default | Description |
|-----------------------------|--------------------|---------------------------------------------------------------------------------------------------------------------------------------|
| `reporters.prometheus.port` | `8000` | The port to run the Prometheus endpoint on |
| `reporters.graphite.host` | None | The graphite host to send metrics to (if not set, will not output to graphite) |
| `reporters.graphite.port` | None | The graphite port to send metrics to (if not set, will not output to graphite) |
| `reporters.graphite.prefix` | None | The graphite metric prefix (if not set, prefix will be empty) |
| `poll-interval` | `30 seconds` | How often to poll Kafka for latest and group offsets |
| `lookup-table-size` | `60` | The maximum window size of the look up table **per partition** |
| `client-group-id` | `kafkalagexporter` | Consumer group id of kafka-lag-exporter's client connections |
| `kafka-client-timeout` | `10 seconds` | Connection timeout when making API calls to Kafka |
| `clusters` | `[]` | A statically defined list of Kafka connection details. This list is optional if you choose to use the Strimzi auto-discovery feature |
| `watchers` | `{}` | Settings for Kafka cluster "watchers" used for auto-discovery. |
| `metric-whitelist` | `[".*"]` | Regex of metrics to be exposed via Prometheus endpoint. Eg. `[".*_max_lag.*", "kafka_partition_latest_offset"]` |
| Key | Default | Description |
|-------------------------------|----------------------|---------------------------------------------------------------------------------------------------------------------------------------|
| `reporters.prometheus.port` | `8000` | The port to run the Prometheus endpoint on |
| `reporters.graphite.host` | None | The graphite host to send metrics to (if not set, will not output to graphite) |
| `reporters.graphite.port` | None | The graphite port to send metrics to (if not set, will not output to graphite) |
| `reporters.graphite.prefix` | None | The graphite metric prefix (if not set, prefix will be empty) |
| `reporters.influxdb.endpoint` | None | The influxdb host to send metrics to (if not set, will not output to influxdb) |
| `reporters.influxdb.port` | None | The influxdb port to send metrics to (if not set, will not output to influxdb) |
| `reporters.influxdb.database` | `kafka_lag_exporter` | The influxdb database to send metrics to |
| `reporters.influxdb.username` | None | The influxdb username to connect (if not set, username will be empty) |
| `reporters.influxdb.password` | None | The influxdb password to connect (if not set, password will be empty) |
| `reporters.influxdb.async` | `true` | Flag to enable influxdb async **non-blocking** write mode to send metrics |
| `poll-interval` | `30 seconds` | How often to poll Kafka for latest and group offsets |
| `lookup-table-size` | `60` | The maximum window size of the look up table **per partition** |
| `client-group-id` | `kafkalagexporter` | Consumer group id of kafka-lag-exporter's client connections |
| `kafka-client-timeout` | `10 seconds` | Connection timeout when making API calls to Kafka |
| `clusters` | `[]` | A statically defined list of Kafka connection details. This list is optional if you choose to use the Strimzi auto-discovery feature |
| `watchers` | `{}` | Settings for Kafka cluster "watchers" used for auto-discovery. |
| `metric-whitelist` | `[".*"]` | Regex of metrics to be exposed via Prometheus endpoint. Eg. `[".*_max_lag.*", "kafka_partition_latest_offset"]` |

Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`)

Expand Down
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ lazy val kafkaLagExporter =
AkkaSlf4j,
AkkaStreams,
AkkaStreamsProtobuf,
AkkaInfluxDB,
Fabric8Model,
Fabric8Client,
Prometheus,
Expand All @@ -38,6 +39,7 @@ lazy val kafkaLagExporter =
AkkaStreamsTestKit,
AlpakkaKafkaTestKit,
Testcontainers,
EmbedInflux,
AkkaHttp
),
dockerRepository := Option(System.getenv("DOCKER_REPOSITORY")).orElse(None),
Expand Down Expand Up @@ -211,4 +213,4 @@ lazy val packageJavaApp = ReleaseStep(
val ref = extracted.get(thisProjectRef)
extracted.runAggregated(packageBin in Universal in ref, st)
}
)
)
32 changes: 17 additions & 15 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,21 @@ object Dependencies {
private val log4jExclusionRule = ExclusionRule("log4j")
private val slf4jExclusionRule = ExclusionRule("org.slf4j")

val LightbendConfig = "com.typesafe" % "config" % "1.3.2"
val Kafka = "org.apache.kafka" %% "kafka" % Version.Kafka excludeAll (jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule)
val Akka = "com.typesafe.akka" %% "akka-actor" % Version.Akka
val AkkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % Version.Akka
val AkkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % Version.Akka
val AkkaStreams = "com.typesafe.akka" %% "akka-stream" % Version.Akka
val AkkaStreamsProtobuf = "com.typesafe.akka" %% "akka-protobuf" % Version.Akka
val Logback = "ch.qos.logback" % "logback-classic" % "1.2.3"
val Prometheus = "io.prometheus" % "simpleclient" % Version.Prometheus
val PrometheusHotSpot = "io.prometheus" % "simpleclient_hotspot" % Version.Prometheus
val PrometheusHttpServer = "io.prometheus" % "simpleclient_httpserver" % Version.Prometheus
val Fabric8Model = "io.fabric8" % "kubernetes-model" % Version.Fabric8
val Fabric8Client = "io.fabric8" % "kubernetes-client" % Version.Fabric8
val ScalaJava8Compat = "org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0"
val LightbendConfig = "com.typesafe" % "config" % "1.3.2"
val Kafka = "org.apache.kafka" %% "kafka" % Version.Kafka excludeAll (jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule)
val Akka = "com.typesafe.akka" %% "akka-actor" % Version.Akka
val AkkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % Version.Akka
val AkkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % Version.Akka
val AkkaStreams = "com.typesafe.akka" %% "akka-stream" % Version.Akka
val AkkaStreamsProtobuf = "com.typesafe.akka" %% "akka-protobuf" % Version.Akka
val AkkaInfluxDB = "com.lightbend.akka" %% "akka-stream-alpakka-influxdb" % "1.1.2"
val Logback = "ch.qos.logback" % "logback-classic" % "1.2.3"
val Prometheus = "io.prometheus" % "simpleclient" % Version.Prometheus
val PrometheusHotSpot = "io.prometheus" % "simpleclient_hotspot" % Version.Prometheus
val PrometheusHttpServer = "io.prometheus" % "simpleclient_httpserver" % Version.Prometheus
val Fabric8Model = "io.fabric8" % "kubernetes-model" % Version.Fabric8
val Fabric8Client = "io.fabric8" % "kubernetes-client" % Version.Fabric8
val ScalaJava8Compat = "org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0"

/* Test */
val ScalaTest = "org.scalatest" %% "scalatest" % "3.0.5" % Test
Expand All @@ -44,5 +45,6 @@ object Dependencies {
val MockitoScala = "org.mockito" %% "mockito-scala" % "1.0.8" % Test
val AlpakkaKafkaTestKit = "com.typesafe.akka" %% "akka-stream-kafka-testkit" % "2.0.4" % Test excludeAll(jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule)
val Testcontainers = "org.testcontainers" % "kafka" % "1.14.3" % Test
val AkkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.11" % Test
val AkkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.11"
val EmbedInflux = "com.github.fsanaulla" %% "scalatest-embedinflux" % "0.2.3" % Test
}
55 changes: 25 additions & 30 deletions src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,25 @@ import scala.util.Try
object AppConfig {
def apply(config: Config): AppConfig = {
val c = config.getConfig("kafka-lag-exporter")
val graphiteConfig: Option[GraphiteConfig] = (
for (host <- Try(c.getString("reporters.graphite.host"));
port <- Try(c.getInt("reporters.graphite.port")),
) yield GraphiteConfig(
host, port, Try(c.getString("reporters.graphite.prefix")).toOption)).toOption
val pollInterval = c.getDuration("poll-interval").toScala
val lookupTableSize = c.getInt("lookup-table-size")
val prometheusPortLegacy = Try(c.getInt("port")).toOption
val prometheusPortNew = Try(c.getInt("reporters.prometheus.port")).toOption
val prometheusConfig = (prometheusPortNew orElse prometheusPortLegacy).map { port => PrometheusConfig(port) }

val metricWhitelist = c.getStringList("metric-whitelist").asScala.toList

val sinks = if (c.hasPath("sinks"))
c.getStringList("sinks").asScala.toList
else
List("PrometheusEndpointSink")

val sinkConfigs : List[SinkConfig] = sinks.flatMap { sink =>
sink match {
case "PrometheusEndpointSink" => Some(new PrometheusEndpointSinkConfig(sink, metricWhitelist, c))
case "InfluxDBPusherSink" => Some(new InfluxDBPusherSinkConfig(sink, metricWhitelist, c))
case "GraphiteEndpointSink" => Some(new GraphiteEndpointConfig(sink, metricWhitelist, c))
case _ => None
}
}

val clientGroupId = c.getString("client-group-id")
val kafkaClientTimeout = c.getDuration("kafka-client-timeout").toScala
val clusters = c.getConfigList("clusters").asScala.toList.map { clusterConfig =>
Expand Down Expand Up @@ -73,8 +82,8 @@ object AppConfig {
)
}
val strimziWatcher = c.getString("watchers.strimzi").toBoolean
val metricWhitelist = c.getStringList("metric-whitelist").asScala.toList
AppConfig(pollInterval, lookupTableSize, clientGroupId, kafkaClientTimeout, clusters, strimziWatcher, metricWhitelist, prometheusConfig, graphiteConfig)

AppConfig(pollInterval, lookupTableSize, sinkConfigs, clientGroupId, kafkaClientTimeout, clusters, strimziWatcher)
}

// Copied from Alpakka Kafka
Expand Down Expand Up @@ -127,36 +136,22 @@ final case class KafkaCluster(name: String, bootstrapBrokers: String,
""".stripMargin
}
}
final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, clientGroupId: String,
clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean,
metricWhitelist: List[String],
prometheusConfig: Option[PrometheusConfig],
graphiteConfig: Option[GraphiteConfig]) {

final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, sinkConfigs: List[SinkConfig], clientGroupId: String,
clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean) {
override def toString(): String = {
val graphiteString =
graphiteConfig.map { graphite => s"""
|Graphite:
| host: ${graphite.host}
| port: ${graphite.port}
| prefix: ${graphite.prefix}
""".stripMargin }.getOrElse("")
val prometheusString =
prometheusConfig.map { prometheus => s"""
|Prometheus:
| port: ${prometheus.port}
""".stripMargin }.getOrElse("")
val clusterString =
if (clusters.isEmpty)
" (none)"
else clusters.map(_.toString).mkString("\n")
val sinksString = sinkConfigs.mkString("")
s"""
|Poll interval: $pollInterval
|Lookup table size: $lookupTableSize
|Metrics whitelist: [${metricWhitelist.mkString(", ")}]
|Metrics whitelist: [${sinkConfigs.head.metricWhitelist.mkString(", ")}]
|Admin client consumer group id: $clientGroupId
|Kafka client timeout: $clientTimeout
|$prometheusString
|$graphiteString
hariprasad-k marked this conversation as resolved.
Show resolved Hide resolved
|$sinksString
|Statically defined Clusters:
|$clusterString
|Watchers:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ abstract class EndpointSink (clusterGlobalLabels: ClusterGlobalLabels) extends M
globalLabelNames.map(l => globalLabelValuesForCluster.getOrElse(l, ""))
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (C) 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package com.lightbend.kafkalagexporter

import com.typesafe.config.Config
import scala.util.Try

class GraphiteEndpointConfig(sinkType: String, metricWhitelist: List[String], config: Config) extends SinkConfig(sinkType, metricWhitelist, config)
{
val port: Int = config.getInt("reporters.graphite.port")
val host: String = config.getString("reporters.graphite.host")
val prefix: Option[String] = Try(config.getString("reporters.graphite.prefix")).toOption

override def toString(): String = {
s"""
|Graphite:
| host: ${host}
| port: ${port}
| prefix: ${prefix}
"""
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ import scala.util.Try

object GraphiteEndpointSink {

def apply(metricWhitelist: List[String], clusterGlobalLabels: ClusterGlobalLabels,
graphiteConfig: Option[GraphiteConfig]): MetricsSink = {
Try(new GraphiteEndpointSink(metricWhitelist, clusterGlobalLabels, graphiteConfig))
def apply(graphiteConfig: GraphiteEndpointConfig, clusterGlobalLabels: ClusterGlobalLabels
): MetricsSink = {
Try(new GraphiteEndpointSink(graphiteConfig, clusterGlobalLabels))
.fold(t => throw new Exception("Could not create Graphite Endpoint", t), sink => sink)
}
}

class GraphiteEndpointSink private(metricWhitelist: List[String], clusterGlobalLabels: ClusterGlobalLabels,
graphiteConfig: Option[GraphiteConfig]) extends EndpointSink(clusterGlobalLabels) {
def graphitePush(graphiteConfig: GraphiteConfig, metricName: String, metricValue: Double): Unit = {
class GraphiteEndpointSink private(graphiteConfig: GraphiteEndpointConfig, clusterGlobalLabels:
ClusterGlobalLabels) extends EndpointSink(clusterGlobalLabels) {
def graphitePush(graphiteConfig: GraphiteEndpointConfig, metricName: String, metricValue: Double): Unit = {
Try(new Socket(graphiteConfig.host, graphiteConfig.port)) match {
case Success(socket) =>
Try(new PrintWriter(socket.getOutputStream)) match {
Expand All @@ -44,23 +44,19 @@ class GraphiteEndpointSink private(metricWhitelist: List[String], clusterGlobalL
*
* @example { label1=value1, label2=value2, name=myName } => "value1.value2.myName"
*
*/
*/
def metricNameToGraphiteMetricName(metricValue: MetricValue): String = {
(getGlobalLabelValuesOrDefault(metricValue.clusterName) ++ metricValue.labels
).map( x => x.replaceAll("\\.", "_")).mkString(".") + "." + metricValue.definition.name;
}

override def report(m: MetricValue): Unit = {
if (metricWhitelist.exists(m.definition.name.matches)) {
graphiteConfig.foreach { conf =>
graphitePush(conf, metricNameToGraphiteMetricName(m), m.value);
if (graphiteConfig.metricWhitelist.exists(m.definition.name.matches)) {
graphitePush(graphiteConfig, metricNameToGraphiteMetricName(m), m.value);
}
}
}

override def remove(m: RemoveMetric): Unit = {
}


}

Loading