Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Commit

Permalink
Ability to extend and configure desired sink to report lag metrics, a…
Browse files Browse the repository at this point in the history
…dding support to push lag metrics into InfluxDB as well (#157)

* Changes to support pushing lag metrics into InfluxDB

Merge 3e2f0bfaa3e646c27cb4158c0b547a2fcdd8fdea

Merge

Cosmetics

Syntax

Removing files

Refactored dependencies

Create db test

Removed unused import

Remove unused

Syntax

Cosmetics

Syntax

Removing files

Refactored dependencies

Create db test

Removed unused import

Changes to support pushing lag metrics into InfluxDB

Removed libraryDependencies from build.sbt

* Ignore sending Infinite data for InfluxDB

* Addressed review comments to report MetricSink/EndPointSink name and use ScalaTest eventually instead of a sleep.

* Addressed latest review comments on InfluxDBPusherSink tests

* Update src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala

* Updated README for influxDB reporter support with related configurations

* Fixed InfluxDB config to be lowercase as per review comments.

Co-authored-by: Sean Glover <[email protected]>
  • Loading branch information
hariprasad-k and seglo authored Dec 17, 2020
1 parent 465437b commit 998d58d
Show file tree
Hide file tree
Showing 19 changed files with 459 additions and 128 deletions.
35 changes: 21 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,9 @@ defaults defined in the project itself. See [`reference.conf`](./src/main/resou

### Reporters

It is possible to report (either or both):
It is possible to report (either one, multiple or all):

- to influxdb via the config `kafka-lag-exporter.reporters.influxdb`
- to graphite via the config `kafka-lag-exporter.reporters.graphite`
- as prometheus via the config `kafka-lag-exporter.reporters.prometheus`

Expand All @@ -240,19 +241,25 @@ See section below for more information.

General Configuration (`kafka-lag-exporter{}`)

| Key | Default | Description |
|-----------------------------|--------------------|---------------------------------------------------------------------------------------------------------------------------------------|
| `reporters.prometheus.port` | `8000` | The port to run the Prometheus endpoint on |
| `reporters.graphite.host` | None | The graphite host to send metrics to (if not set, will not output to graphite) |
| `reporters.graphite.port` | None | The graphite port to send metrics to (if not set, will not output to graphite) |
| `reporters.graphite.prefix` | None | The graphite metric prefix (if not set, prefix will be empty) |
| `poll-interval` | `30 seconds` | How often to poll Kafka for latest and group offsets |
| `lookup-table-size` | `60` | The maximum window size of the look up table **per partition** |
| `client-group-id` | `kafkalagexporter` | Consumer group id of kafka-lag-exporter's client connections |
| `kafka-client-timeout` | `10 seconds` | Connection timeout when making API calls to Kafka |
| `clusters` | `[]` | A statically defined list of Kafka connection details. This list is optional if you choose to use the Strimzi auto-discovery feature |
| `watchers` | `{}` | Settings for Kafka cluster "watchers" used for auto-discovery. |
| `metric-whitelist` | `[".*"]` | Regex of metrics to be exposed via Prometheus endpoint. Eg. `[".*_max_lag.*", "kafka_partition_latest_offset"]` |
| Key | Default | Description |
|-------------------------------|----------------------|---------------------------------------------------------------------------------------------------------------------------------------|
| `reporters.prometheus.port` | `8000` | The port to run the Prometheus endpoint on |
| `reporters.graphite.host` | None | The graphite host to send metrics to (if not set, will not output to graphite) |
| `reporters.graphite.port` | None | The graphite port to send metrics to (if not set, will not output to graphite) |
| `reporters.graphite.prefix` | None | The graphite metric prefix (if not set, prefix will be empty) |
| `reporters.influxdb.endpoint` | None | The influxdb host to send metrics to (if not set, will not output to influxdb) |
| `reporters.influxdb.port` | None | The influxdb port to send metrics to (if not set, will not output to influxdb) |
| `reporters.influxdb.database` | `kafka_lag_exporter` | The influxdb database to send metrics to |
| `reporters.influxdb.username` | None | The influxdb username to connect (if not set, username will be empty) |
| `reporters.influxdb.password` | None | The influxdb password to connect (if not set, password will be empty) |
| `reporters.influxdb.async` | `true` | Flag to enable influxdb async **non-blocking** write mode to send metrics |
| `poll-interval` | `30 seconds` | How often to poll Kafka for latest and group offsets |
| `lookup-table-size` | `60` | The maximum window size of the look up table **per partition** |
| `client-group-id` | `kafkalagexporter` | Consumer group id of kafka-lag-exporter's client connections |
| `kafka-client-timeout` | `10 seconds` | Connection timeout when making API calls to Kafka |
| `clusters` | `[]` | A statically defined list of Kafka connection details. This list is optional if you choose to use the Strimzi auto-discovery feature |
| `watchers` | `{}` | Settings for Kafka cluster "watchers" used for auto-discovery. |
| `metric-whitelist` | `[".*"]` | Regex of metrics to be exposed via Prometheus endpoint. Eg. `[".*_max_lag.*", "kafka_partition_latest_offset"]` |

Kafka Cluster Connection Details (`kafka-lag-exporter.clusters[]`)

Expand Down
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ lazy val kafkaLagExporter =
AkkaSlf4j,
AkkaStreams,
AkkaStreamsProtobuf,
AkkaInfluxDB,
Fabric8Model,
Fabric8Client,
Prometheus,
Expand All @@ -38,6 +39,7 @@ lazy val kafkaLagExporter =
AkkaStreamsTestKit,
AlpakkaKafkaTestKit,
Testcontainers,
EmbedInflux,
AkkaHttp
),
dockerRepository := Option(System.getenv("DOCKER_REPOSITORY")).orElse(None),
Expand Down Expand Up @@ -211,4 +213,4 @@ lazy val packageJavaApp = ReleaseStep(
val ref = extracted.get(thisProjectRef)
extracted.runAggregated(packageBin in Universal in ref, st)
}
)
)
32 changes: 17 additions & 15 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,21 @@ object Dependencies {
private val log4jExclusionRule = ExclusionRule("log4j")
private val slf4jExclusionRule = ExclusionRule("org.slf4j")

val LightbendConfig = "com.typesafe" % "config" % "1.3.2"
val Kafka = "org.apache.kafka" %% "kafka" % Version.Kafka excludeAll (jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule)
val Akka = "com.typesafe.akka" %% "akka-actor" % Version.Akka
val AkkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % Version.Akka
val AkkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % Version.Akka
val AkkaStreams = "com.typesafe.akka" %% "akka-stream" % Version.Akka
val AkkaStreamsProtobuf = "com.typesafe.akka" %% "akka-protobuf" % Version.Akka
val Logback = "ch.qos.logback" % "logback-classic" % "1.2.3"
val Prometheus = "io.prometheus" % "simpleclient" % Version.Prometheus
val PrometheusHotSpot = "io.prometheus" % "simpleclient_hotspot" % Version.Prometheus
val PrometheusHttpServer = "io.prometheus" % "simpleclient_httpserver" % Version.Prometheus
val Fabric8Model = "io.fabric8" % "kubernetes-model" % Version.Fabric8
val Fabric8Client = "io.fabric8" % "kubernetes-client" % Version.Fabric8
val ScalaJava8Compat = "org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0"
val LightbendConfig = "com.typesafe" % "config" % "1.3.2"
val Kafka = "org.apache.kafka" %% "kafka" % Version.Kafka excludeAll (jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule)
val Akka = "com.typesafe.akka" %% "akka-actor" % Version.Akka
val AkkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % Version.Akka
val AkkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % Version.Akka
val AkkaStreams = "com.typesafe.akka" %% "akka-stream" % Version.Akka
val AkkaStreamsProtobuf = "com.typesafe.akka" %% "akka-protobuf" % Version.Akka
val AkkaInfluxDB = "com.lightbend.akka" %% "akka-stream-alpakka-influxdb" % "1.1.2"
val Logback = "ch.qos.logback" % "logback-classic" % "1.2.3"
val Prometheus = "io.prometheus" % "simpleclient" % Version.Prometheus
val PrometheusHotSpot = "io.prometheus" % "simpleclient_hotspot" % Version.Prometheus
val PrometheusHttpServer = "io.prometheus" % "simpleclient_httpserver" % Version.Prometheus
val Fabric8Model = "io.fabric8" % "kubernetes-model" % Version.Fabric8
val Fabric8Client = "io.fabric8" % "kubernetes-client" % Version.Fabric8
val ScalaJava8Compat = "org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0"

/* Test */
val ScalaTest = "org.scalatest" %% "scalatest" % "3.0.5" % Test
Expand All @@ -44,5 +45,6 @@ object Dependencies {
val MockitoScala = "org.mockito" %% "mockito-scala" % "1.0.8" % Test
val AlpakkaKafkaTestKit = "com.typesafe.akka" %% "akka-stream-kafka-testkit" % "2.0.4" % Test excludeAll(jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule)
val Testcontainers = "org.testcontainers" % "kafka" % "1.14.3" % Test
val AkkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.11" % Test
val AkkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.11"
val EmbedInflux = "com.github.fsanaulla" %% "scalatest-embedinflux" % "0.2.3" % Test
}
55 changes: 25 additions & 30 deletions src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,25 @@ import scala.util.Try
object AppConfig {
def apply(config: Config): AppConfig = {
val c = config.getConfig("kafka-lag-exporter")
val graphiteConfig: Option[GraphiteConfig] = (
for (host <- Try(c.getString("reporters.graphite.host"));
port <- Try(c.getInt("reporters.graphite.port")),
) yield GraphiteConfig(
host, port, Try(c.getString("reporters.graphite.prefix")).toOption)).toOption
val pollInterval = c.getDuration("poll-interval").toScala
val lookupTableSize = c.getInt("lookup-table-size")
val prometheusPortLegacy = Try(c.getInt("port")).toOption
val prometheusPortNew = Try(c.getInt("reporters.prometheus.port")).toOption
val prometheusConfig = (prometheusPortNew orElse prometheusPortLegacy).map { port => PrometheusConfig(port) }

val metricWhitelist = c.getStringList("metric-whitelist").asScala.toList

val sinks = if (c.hasPath("sinks"))
c.getStringList("sinks").asScala.toList
else
List("PrometheusEndpointSink")

val sinkConfigs : List[SinkConfig] = sinks.flatMap { sink =>
sink match {
case "PrometheusEndpointSink" => Some(new PrometheusEndpointSinkConfig(sink, metricWhitelist, c))
case "InfluxDBPusherSink" => Some(new InfluxDBPusherSinkConfig(sink, metricWhitelist, c))
case "GraphiteEndpointSink" => Some(new GraphiteEndpointConfig(sink, metricWhitelist, c))
case _ => None
}
}

val clientGroupId = c.getString("client-group-id")
val kafkaClientTimeout = c.getDuration("kafka-client-timeout").toScala
val clusters = c.getConfigList("clusters").asScala.toList.map { clusterConfig =>
Expand Down Expand Up @@ -73,8 +82,8 @@ object AppConfig {
)
}
val strimziWatcher = c.getString("watchers.strimzi").toBoolean
val metricWhitelist = c.getStringList("metric-whitelist").asScala.toList
AppConfig(pollInterval, lookupTableSize, clientGroupId, kafkaClientTimeout, clusters, strimziWatcher, metricWhitelist, prometheusConfig, graphiteConfig)

AppConfig(pollInterval, lookupTableSize, sinkConfigs, clientGroupId, kafkaClientTimeout, clusters, strimziWatcher)
}

// Copied from Alpakka Kafka
Expand Down Expand Up @@ -127,36 +136,22 @@ final case class KafkaCluster(name: String, bootstrapBrokers: String,
""".stripMargin
}
}
final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, clientGroupId: String,
clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean,
metricWhitelist: List[String],
prometheusConfig: Option[PrometheusConfig],
graphiteConfig: Option[GraphiteConfig]) {

final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, sinkConfigs: List[SinkConfig], clientGroupId: String,
clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean) {
override def toString(): String = {
val graphiteString =
graphiteConfig.map { graphite => s"""
|Graphite:
| host: ${graphite.host}
| port: ${graphite.port}
| prefix: ${graphite.prefix}
""".stripMargin }.getOrElse("")
val prometheusString =
prometheusConfig.map { prometheus => s"""
|Prometheus:
| port: ${prometheus.port}
""".stripMargin }.getOrElse("")
val clusterString =
if (clusters.isEmpty)
" (none)"
else clusters.map(_.toString).mkString("\n")
val sinksString = sinkConfigs.mkString("")
s"""
|Poll interval: $pollInterval
|Lookup table size: $lookupTableSize
|Metrics whitelist: [${metricWhitelist.mkString(", ")}]
|Metrics whitelist: [${sinkConfigs.head.metricWhitelist.mkString(", ")}]
|Admin client consumer group id: $clientGroupId
|Kafka client timeout: $clientTimeout
|$prometheusString
|$graphiteString
|$sinksString
|Statically defined Clusters:
|$clusterString
|Watchers:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ abstract class EndpointSink (clusterGlobalLabels: ClusterGlobalLabels) extends M
globalLabelNames.map(l => globalLabelValuesForCluster.getOrElse(l, ""))
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (C) 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package com.lightbend.kafkalagexporter

import com.typesafe.config.Config
import scala.util.Try

class GraphiteEndpointConfig(sinkType: String, metricWhitelist: List[String], config: Config) extends SinkConfig(sinkType, metricWhitelist, config)
{
val port: Int = config.getInt("reporters.graphite.port")
val host: String = config.getString("reporters.graphite.host")
val prefix: Option[String] = Try(config.getString("reporters.graphite.prefix")).toOption

override def toString(): String = {
s"""
|Graphite:
| host: ${host}
| port: ${port}
| prefix: ${prefix}
"""
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ import scala.util.Try

object GraphiteEndpointSink {

def apply(metricWhitelist: List[String], clusterGlobalLabels: ClusterGlobalLabels,
graphiteConfig: Option[GraphiteConfig]): MetricsSink = {
Try(new GraphiteEndpointSink(metricWhitelist, clusterGlobalLabels, graphiteConfig))
def apply(graphiteConfig: GraphiteEndpointConfig, clusterGlobalLabels: ClusterGlobalLabels
): MetricsSink = {
Try(new GraphiteEndpointSink(graphiteConfig, clusterGlobalLabels))
.fold(t => throw new Exception("Could not create Graphite Endpoint", t), sink => sink)
}
}

class GraphiteEndpointSink private(metricWhitelist: List[String], clusterGlobalLabels: ClusterGlobalLabels,
graphiteConfig: Option[GraphiteConfig]) extends EndpointSink(clusterGlobalLabels) {
def graphitePush(graphiteConfig: GraphiteConfig, metricName: String, metricValue: Double): Unit = {
class GraphiteEndpointSink private(graphiteConfig: GraphiteEndpointConfig, clusterGlobalLabels:
ClusterGlobalLabels) extends EndpointSink(clusterGlobalLabels) {
def graphitePush(graphiteConfig: GraphiteEndpointConfig, metricName: String, metricValue: Double): Unit = {
Try(new Socket(graphiteConfig.host, graphiteConfig.port)) match {
case Success(socket) =>
Try(new PrintWriter(socket.getOutputStream)) match {
Expand All @@ -44,23 +44,19 @@ class GraphiteEndpointSink private(metricWhitelist: List[String], clusterGlobalL
*
* @example { label1=value1, label2=value2, name=myName } => "value1.value2.myName"
*
*/
*/
def metricNameToGraphiteMetricName(metricValue: MetricValue): String = {
(getGlobalLabelValuesOrDefault(metricValue.clusterName) ++ metricValue.labels
).map( x => x.replaceAll("\\.", "_")).mkString(".") + "." + metricValue.definition.name;
}

override def report(m: MetricValue): Unit = {
if (metricWhitelist.exists(m.definition.name.matches)) {
graphiteConfig.foreach { conf =>
graphitePush(conf, metricNameToGraphiteMetricName(m), m.value);
if (graphiteConfig.metricWhitelist.exists(m.definition.name.matches)) {
graphitePush(graphiteConfig, metricNameToGraphiteMetricName(m), m.value);
}
}
}

override def remove(m: RemoveMetric): Unit = {
}


}

Loading

0 comments on commit 998d58d

Please sign in to comment.