Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Ability to extend and configure desired sink to report lag metrics, adding support to push lag metrics into InfluxDB as well #157

Merged
merged 8 commits into from
Dec 17, 2020
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ lazy val kafkaLagExporter =
AkkaSlf4j,
AkkaStreams,
AkkaStreamsProtobuf,
AkkaInfluxDB,
Fabric8Model,
Fabric8Client,
Prometheus,
Expand All @@ -38,6 +39,7 @@ lazy val kafkaLagExporter =
AkkaStreamsTestKit,
AlpakkaKafkaTestKit,
Testcontainers,
EmbedInflux,
AkkaHttp
),
dockerRepository := Option(System.getenv("DOCKER_REPOSITORY")).orElse(None),
Expand Down Expand Up @@ -211,4 +213,4 @@ lazy val packageJavaApp = ReleaseStep(
val ref = extracted.get(thisProjectRef)
extracted.runAggregated(packageBin in Universal in ref, st)
}
)
)
32 changes: 17 additions & 15 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,21 @@ object Dependencies {
private val log4jExclusionRule = ExclusionRule("log4j")
private val slf4jExclusionRule = ExclusionRule("org.slf4j")

val LightbendConfig = "com.typesafe" % "config" % "1.3.2"
val Kafka = "org.apache.kafka" %% "kafka" % Version.Kafka excludeAll (jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule)
val Akka = "com.typesafe.akka" %% "akka-actor" % Version.Akka
val AkkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % Version.Akka
val AkkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % Version.Akka
val AkkaStreams = "com.typesafe.akka" %% "akka-stream" % Version.Akka
val AkkaStreamsProtobuf = "com.typesafe.akka" %% "akka-protobuf" % Version.Akka
val Logback = "ch.qos.logback" % "logback-classic" % "1.2.3"
val Prometheus = "io.prometheus" % "simpleclient" % Version.Prometheus
val PrometheusHotSpot = "io.prometheus" % "simpleclient_hotspot" % Version.Prometheus
val PrometheusHttpServer = "io.prometheus" % "simpleclient_httpserver" % Version.Prometheus
val Fabric8Model = "io.fabric8" % "kubernetes-model" % Version.Fabric8
val Fabric8Client = "io.fabric8" % "kubernetes-client" % Version.Fabric8
val ScalaJava8Compat = "org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0"
val LightbendConfig = "com.typesafe" % "config" % "1.3.2"
val Kafka = "org.apache.kafka" %% "kafka" % Version.Kafka excludeAll (jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule)
val Akka = "com.typesafe.akka" %% "akka-actor" % Version.Akka
val AkkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % Version.Akka
val AkkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % Version.Akka
val AkkaStreams = "com.typesafe.akka" %% "akka-stream" % Version.Akka
val AkkaStreamsProtobuf = "com.typesafe.akka" %% "akka-protobuf" % Version.Akka
val AkkaInfluxDB = "com.lightbend.akka" %% "akka-stream-alpakka-influxdb" % "1.1.2"
val Logback = "ch.qos.logback" % "logback-classic" % "1.2.3"
val Prometheus = "io.prometheus" % "simpleclient" % Version.Prometheus
val PrometheusHotSpot = "io.prometheus" % "simpleclient_hotspot" % Version.Prometheus
val PrometheusHttpServer = "io.prometheus" % "simpleclient_httpserver" % Version.Prometheus
val Fabric8Model = "io.fabric8" % "kubernetes-model" % Version.Fabric8
val Fabric8Client = "io.fabric8" % "kubernetes-client" % Version.Fabric8
val ScalaJava8Compat = "org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0"

/* Test */
val ScalaTest = "org.scalatest" %% "scalatest" % "3.0.5" % Test
Expand All @@ -44,5 +45,6 @@ object Dependencies {
val MockitoScala = "org.mockito" %% "mockito-scala" % "1.0.8" % Test
val AlpakkaKafkaTestKit = "com.typesafe.akka" %% "akka-stream-kafka-testkit" % "2.0.4" % Test excludeAll(jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule)
val Testcontainers = "org.testcontainers" % "kafka" % "1.14.3" % Test
val AkkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.11" % Test
val AkkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.11"
val EmbedInflux = "com.github.fsanaulla" %% "scalatest-embedinflux" % "0.2.3" % Test
}
55 changes: 25 additions & 30 deletions src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,25 @@ import scala.util.Try
object AppConfig {
def apply(config: Config): AppConfig = {
val c = config.getConfig("kafka-lag-exporter")
val graphiteConfig: Option[GraphiteConfig] = (
for (host <- Try(c.getString("reporters.graphite.host"));
port <- Try(c.getInt("reporters.graphite.port")),
) yield GraphiteConfig(
host, port, Try(c.getString("reporters.graphite.prefix")).toOption)).toOption
val pollInterval = c.getDuration("poll-interval").toScala
val lookupTableSize = c.getInt("lookup-table-size")
val prometheusPortLegacy = Try(c.getInt("port")).toOption
val prometheusPortNew = Try(c.getInt("reporters.prometheus.port")).toOption
val prometheusConfig = (prometheusPortNew orElse prometheusPortLegacy).map { port => PrometheusConfig(port) }

val metricWhitelist = c.getStringList("metric-whitelist").asScala.toList

val sinks = if (c.hasPath("sinks"))
c.getStringList("sinks").asScala.toList
else
List("PrometheusEndpointSink")

val sinkConfigs : List[SinkConfig] = sinks.flatMap { sink =>
sink match {
case "PrometheusEndpointSink" => Some(new PrometheusEndpointSinkConfig(sink, metricWhitelist, c))
case "InfluxDBPusherSink" => Some(new InfluxDBPusherSinkConfig(sink, metricWhitelist, c))
case "GraphicEndpointSink" => Some(new GraphiteEndpointConfig(sink, metricWhitelist, c))
seglo marked this conversation as resolved.
Show resolved Hide resolved
case _ => None
}
}

val clientGroupId = c.getString("client-group-id")
val kafkaClientTimeout = c.getDuration("kafka-client-timeout").toScala
val clusters = c.getConfigList("clusters").asScala.toList.map { clusterConfig =>
Expand Down Expand Up @@ -73,8 +82,8 @@ object AppConfig {
)
}
val strimziWatcher = c.getString("watchers.strimzi").toBoolean
val metricWhitelist = c.getStringList("metric-whitelist").asScala.toList
AppConfig(pollInterval, lookupTableSize, clientGroupId, kafkaClientTimeout, clusters, strimziWatcher, metricWhitelist, prometheusConfig, graphiteConfig)

AppConfig(pollInterval, lookupTableSize, sinkConfigs, clientGroupId, kafkaClientTimeout, clusters, strimziWatcher)
}

// Copied from Alpakka Kafka
Expand Down Expand Up @@ -127,36 +136,22 @@ final case class KafkaCluster(name: String, bootstrapBrokers: String,
""".stripMargin
}
}
final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, clientGroupId: String,
clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean,
metricWhitelist: List[String],
prometheusConfig: Option[PrometheusConfig],
graphiteConfig: Option[GraphiteConfig]) {

final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, sinkConfigs: List[SinkConfig], clientGroupId: String,
clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean) {
override def toString(): String = {
val graphiteString =
graphiteConfig.map { graphite => s"""
|Graphite:
| host: ${graphite.host}
| port: ${graphite.port}
| prefix: ${graphite.prefix}
""".stripMargin }.getOrElse("")
val prometheusString =
prometheusConfig.map { prometheus => s"""
|Prometheus:
| port: ${prometheus.port}
""".stripMargin }.getOrElse("")
val clusterString =
if (clusters.isEmpty)
" (none)"
else clusters.map(_.toString).mkString("\n")
val sinksString = sinkConfigs.mkString("")
s"""
|Poll interval: $pollInterval
|Lookup table size: $lookupTableSize
|Metrics whitelist: [${metricWhitelist.mkString(", ")}]
|Metrics whitelist: [${sinkConfigs.head.metricWhitelist.mkString(", ")}]
|Admin client consumer group id: $clientGroupId
|Kafka client timeout: $clientTimeout
|$prometheusString
|$graphiteString
hariprasad-k marked this conversation as resolved.
Show resolved Hide resolved
|$sinksString
|Statically defined Clusters:
|$clusterString
|Watchers:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ abstract class EndpointSink (clusterGlobalLabels: ClusterGlobalLabels) extends M
globalLabelNames.map(l => globalLabelValuesForCluster.getOrElse(l, ""))
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (C) 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package com.lightbend.kafkalagexporter

import com.typesafe.config.Config
import scala.util.Try

class GraphiteEndpointConfig(sinkType: String, metricWhitelist: List[String], config: Config) extends SinkConfig(sinkType, metricWhitelist, config)
{
val port: Int = config.getInt("reporters.graphite.port")
val host: String = config.getString("reporters.graphite.host")
val prefix: Option[String] = Try(config.getString("reporters.graphite.prefix")).toOption

override def toString(): String = {
s"""
|Graphite:
| host: ${host}
| port: ${port}
| prefix: ${prefix}
"""
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ import scala.util.Try

object GraphiteEndpointSink {

def apply(metricWhitelist: List[String], clusterGlobalLabels: ClusterGlobalLabels,
graphiteConfig: Option[GraphiteConfig]): MetricsSink = {
Try(new GraphiteEndpointSink(metricWhitelist, clusterGlobalLabels, graphiteConfig))
def apply(graphiteConfig: GraphiteEndpointConfig, clusterGlobalLabels: ClusterGlobalLabels
): MetricsSink = {
Try(new GraphiteEndpointSink(graphiteConfig, clusterGlobalLabels))
.fold(t => throw new Exception("Could not create Graphite Endpoint", t), sink => sink)
}
}

class GraphiteEndpointSink private(metricWhitelist: List[String], clusterGlobalLabels: ClusterGlobalLabels,
graphiteConfig: Option[GraphiteConfig]) extends EndpointSink(clusterGlobalLabels) {
def graphitePush(graphiteConfig: GraphiteConfig, metricName: String, metricValue: Double): Unit = {
class GraphiteEndpointSink private(graphiteConfig: GraphiteEndpointConfig, clusterGlobalLabels:
ClusterGlobalLabels) extends EndpointSink(clusterGlobalLabels) {
def graphitePush(graphiteConfig: GraphiteEndpointConfig, metricName: String, metricValue: Double): Unit = {
Try(new Socket(graphiteConfig.host, graphiteConfig.port)) match {
case Success(socket) =>
Try(new PrintWriter(socket.getOutputStream)) match {
Expand All @@ -44,23 +44,19 @@ class GraphiteEndpointSink private(metricWhitelist: List[String], clusterGlobalL
*
* @example { label1=value1, label2=value2, name=myName } => "value1.value2.myName"
*
*/
*/
def metricNameToGraphiteMetricName(metricValue: MetricValue): String = {
(getGlobalLabelValuesOrDefault(metricValue.clusterName) ++ metricValue.labels
).map( x => x.replaceAll("\\.", "_")).mkString(".") + "." + metricValue.definition.name;
}

override def report(m: MetricValue): Unit = {
if (metricWhitelist.exists(m.definition.name.matches)) {
graphiteConfig.foreach { conf =>
graphitePush(conf, metricNameToGraphiteMetricName(m), m.value);
if (graphiteConfig.metricWhitelist.exists(m.definition.name.matches)) {
graphitePush(graphiteConfig, metricNameToGraphiteMetricName(m), m.value);
}
}
}

override def remove(m: RemoveMetric): Unit = {
}


}

143 changes: 143 additions & 0 deletions src/main/scala/com/lightbend/kafkalagexporter/InfluxDBPusherSink.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright (C) 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package com.lightbend.kafkalagexporter

import com.lightbend.kafkalagexporter.MetricsSink._
import com.lightbend.kafkalagexporter.EndpointSink.ClusterGlobalLabels
import org.influxdb.{InfluxDB, InfluxDBFactory, BatchOptions}
import org.influxdb.InfluxDB.ConsistencyLevel
import org.influxdb.dto.{Query, Point, BatchPoints}
import java.io.IOException
import java.util.function.Consumer
import java.util.function.BiConsumer
import org.influxdb.dto.QueryResult
import java.lang.Iterable
import com.typesafe.scalalogging.Logger

import scala.util.Try

object InfluxDBPusherSink
{
def apply(sinkConfig: InfluxDBPusherSinkConfig, clusterGlobalLabels: ClusterGlobalLabels): MetricsSink =
{
Try(new InfluxDBPusherSink(sinkConfig, clusterGlobalLabels))
.fold(t => throw new IOException("Could not create Influx DB Pusher Sink", t), sink => sink)
}
}

class InfluxDBPusherSink private(sinkConfig: InfluxDBPusherSinkConfig, clusterGlobalLabels: ClusterGlobalLabels) extends EndpointSink(clusterGlobalLabels) {

val logger = Logger("InfluxDBPusherSink")
val influxDB = connect()
createDatabase()
hariprasad-k marked this conversation as resolved.
Show resolved Hide resolved
enableBatching()

override def report(m: MetricValue): Unit = {
if (sinkConfig.metricWhitelist.exists(m.definition.name.matches)) {
if (!m.value.isNaN && !m.value.isInfinite) {
write(m)
}
}
}

def write(m: MetricValue): Unit = {
try {
val point = buildPoint(m)
if (sinkConfig.async)
writeAsync(point)
else
writeSync(point)
} catch {
case t: Throwable =>
handlingFailure(t)
}
}

def writeAsync(point: Point): Unit = {
influxDB.write(point)
}

def writeSync(point: Point): Unit = {
val batchPoints = BatchPoints
.database(sinkConfig.database)
.consistency(ConsistencyLevel.ALL)
.build()

batchPoints.point(point)
influxDB.write(batchPoints)
}

def buildPoint(m: MetricValue): Point = {
val point = Point.measurement(m.definition.name)
val fields = m.definition.labels zip m.labels
fields.foreach { field => point.tag(field._1, field._2) }
point.addField("value", m.value)
return point.build()
}

override def remove(m: RemoveMetric): Unit = {
if (sinkConfig.metricWhitelist.exists(m.definition.name.matches))
logger.warn("Remove is not supported by InfluxDBPusherSink")
}

def enableBatching(): Unit = {
if (sinkConfig.async) {
influxDB.setDatabase(sinkConfig.database)
influxDB.enableBatch(BatchOptions.DEFAULTS.exceptionHandler(createExceptionHandler()))
}
}

def connect(): InfluxDB =
{
val url = sinkConfig.endpoint + ":" + sinkConfig.port
if (!sinkConfig.username.isEmpty) return InfluxDBFactory.connect(url, sinkConfig.username, sinkConfig.password)
else return InfluxDBFactory.connect(url)
}

def createDatabase() =
{
influxDB.query(new Query("CREATE DATABASE " + sinkConfig.database, sinkConfig.database), successQueryHandler(), failQueryHandler())
}

def successQueryHandler(): Consumer[QueryResult] =
{
return new Consumer[QueryResult] {
override def accept(result:QueryResult): Unit = {
logger.info(result.toString())
}
}
}

def failQueryHandler(): Consumer[Throwable] =
{
return new Consumer[Throwable] {
override def accept(throwable:Throwable): Unit = {
handlingFailure(throwable)
}
}
}

def createExceptionHandler(): BiConsumer[Iterable[Point], Throwable] =
{
return new BiConsumer[Iterable[Point], Throwable] {
override def accept(failedPoints:Iterable[Point], throwable:Throwable): Unit = {
handlingFailure(throwable)
}
}
}

def handlingFailure(t: Throwable): Unit = {
logger.error("Unrecoverable exception, will stop ", t)
stop()
throw t
}

override def stop(): Unit = {
if (influxDB.isBatchEnabled()) {
influxDB.disableBatch()
}
influxDB.close()
}
}
Loading