Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Commit

Permalink
Changes to support pushing lag metrics into InfluxDB
Browse files Browse the repository at this point in the history
Merge 3e2f0bfaa3e646c27cb4158c0b547a2fcdd8fdea

Merge

Cosmetics

Syntax

Removing files

Refactored dependencies

Create db test

Removed unused import

Remove unused

Syntax

Cosmetics

Syntax

Removing files

Refactored dependencies

Create db test

Removed unused import

Changes to support pushing lag metrics into InfluxDB

Removed libraryDependencies from build.sbt
  • Loading branch information
hariprasad-k committed Nov 3, 2020
1 parent fb0aaaa commit 2841caf
Show file tree
Hide file tree
Showing 18 changed files with 424 additions and 114 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ lazy val kafkaLagExporter =
AkkaSlf4j,
AkkaStreams,
AkkaStreamsProtobuf,
AkkaInfluxDB,
Fabric8Model,
Fabric8Client,
Prometheus,
Expand All @@ -38,6 +39,7 @@ lazy val kafkaLagExporter =
AkkaStreamsTestKit,
AlpakkaKafkaTestKit,
Testcontainers,
EmbedInflux,
AkkaHttp
),
dockerRepository := Option(System.getenv("DOCKER_REPOSITORY")).orElse(None),
Expand Down Expand Up @@ -211,4 +213,4 @@ lazy val packageJavaApp = ReleaseStep(
val ref = extracted.get(thisProjectRef)
extracted.runAggregated(packageBin in Universal in ref, st)
}
)
)
32 changes: 17 additions & 15 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,21 @@ object Dependencies {
private val log4jExclusionRule = ExclusionRule("log4j")
private val slf4jExclusionRule = ExclusionRule("org.slf4j")

val LightbendConfig = "com.typesafe" % "config" % "1.3.2"
val Kafka = "org.apache.kafka" %% "kafka" % Version.Kafka excludeAll (jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule)
val Akka = "com.typesafe.akka" %% "akka-actor" % Version.Akka
val AkkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % Version.Akka
val AkkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % Version.Akka
val AkkaStreams = "com.typesafe.akka" %% "akka-stream" % Version.Akka
val AkkaStreamsProtobuf = "com.typesafe.akka" %% "akka-protobuf" % Version.Akka
val Logback = "ch.qos.logback" % "logback-classic" % "1.2.3"
val Prometheus = "io.prometheus" % "simpleclient" % Version.Prometheus
val PrometheusHotSpot = "io.prometheus" % "simpleclient_hotspot" % Version.Prometheus
val PrometheusHttpServer = "io.prometheus" % "simpleclient_httpserver" % Version.Prometheus
val Fabric8Model = "io.fabric8" % "kubernetes-model" % Version.Fabric8
val Fabric8Client = "io.fabric8" % "kubernetes-client" % Version.Fabric8
val ScalaJava8Compat = "org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0"
val LightbendConfig = "com.typesafe" % "config" % "1.3.2"
val Kafka = "org.apache.kafka" %% "kafka" % Version.Kafka excludeAll (jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule)
val Akka = "com.typesafe.akka" %% "akka-actor" % Version.Akka
val AkkaTyped = "com.typesafe.akka" %% "akka-actor-typed" % Version.Akka
val AkkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % Version.Akka
val AkkaStreams = "com.typesafe.akka" %% "akka-stream" % Version.Akka
val AkkaStreamsProtobuf = "com.typesafe.akka" %% "akka-protobuf" % Version.Akka
val AkkaInfluxDB = "com.lightbend.akka" %% "akka-stream-alpakka-influxdb" % "1.1.2"
val Logback = "ch.qos.logback" % "logback-classic" % "1.2.3"
val Prometheus = "io.prometheus" % "simpleclient" % Version.Prometheus
val PrometheusHotSpot = "io.prometheus" % "simpleclient_hotspot" % Version.Prometheus
val PrometheusHttpServer = "io.prometheus" % "simpleclient_httpserver" % Version.Prometheus
val Fabric8Model = "io.fabric8" % "kubernetes-model" % Version.Fabric8
val Fabric8Client = "io.fabric8" % "kubernetes-client" % Version.Fabric8
val ScalaJava8Compat = "org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0"

/* Test */
val ScalaTest = "org.scalatest" %% "scalatest" % "3.0.5" % Test
Expand All @@ -44,5 +45,6 @@ object Dependencies {
val MockitoScala = "org.mockito" %% "mockito-scala" % "1.0.8" % Test
val AlpakkaKafkaTestKit = "com.typesafe.akka" %% "akka-stream-kafka-testkit" % "2.0.4" % Test excludeAll(jacksonExclusionRule, log4jExclusionRule, slf4jExclusionRule)
val Testcontainers = "org.testcontainers" % "kafka" % "1.14.3" % Test
val AkkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.11" % Test
val AkkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.11"
val EmbedInflux = "com.github.fsanaulla" %% "scalatest-embedinflux" % "0.2.3" % Test
}
55 changes: 25 additions & 30 deletions src/main/scala/com/lightbend/kafkalagexporter/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,25 @@ import scala.util.Try
object AppConfig {
def apply(config: Config): AppConfig = {
val c = config.getConfig("kafka-lag-exporter")
val graphiteConfig: Option[GraphiteConfig] = (
for (host <- Try(c.getString("reporters.graphite.host"));
port <- Try(c.getInt("reporters.graphite.port")),
) yield GraphiteConfig(
host, port, Try(c.getString("reporters.graphite.prefix")).toOption)).toOption
val pollInterval = c.getDuration("poll-interval").toScala
val lookupTableSize = c.getInt("lookup-table-size")
val prometheusPortLegacy = Try(c.getInt("port")).toOption
val prometheusPortNew = Try(c.getInt("reporters.prometheus.port")).toOption
val prometheusConfig = (prometheusPortNew orElse prometheusPortLegacy).map { port => PrometheusConfig(port) }

val metricWhitelist = c.getStringList("metric-whitelist").asScala.toList

val sinks = if (c.hasPath("sinks"))
c.getStringList("sinks").asScala.toList
else
List("PrometheusEndpointSink")

val sinkConfigs : List[SinkConfig] = sinks.flatMap { sink =>
sink match {
case "PrometheusEndpointSink" => Some(new PrometheusEndpointSinkConfig(sink, metricWhitelist, c))
case "InfluxDBPusherSink" => Some(new InfluxDBPusherSinkConfig(sink, metricWhitelist, c))
case "GraphicEndpointSink" => Some(new GraphiteEndpointConfig(sink, metricWhitelist, c))
case _ => None
}
}

val clientGroupId = c.getString("client-group-id")
val kafkaClientTimeout = c.getDuration("kafka-client-timeout").toScala
val clusters = c.getConfigList("clusters").asScala.toList.map { clusterConfig =>
Expand Down Expand Up @@ -73,8 +82,8 @@ object AppConfig {
)
}
val strimziWatcher = c.getString("watchers.strimzi").toBoolean
val metricWhitelist = c.getStringList("metric-whitelist").asScala.toList
AppConfig(pollInterval, lookupTableSize, clientGroupId, kafkaClientTimeout, clusters, strimziWatcher, metricWhitelist, prometheusConfig, graphiteConfig)

AppConfig(pollInterval, lookupTableSize, sinkConfigs, clientGroupId, kafkaClientTimeout, clusters, strimziWatcher)
}

// Copied from Alpakka Kafka
Expand Down Expand Up @@ -127,36 +136,22 @@ final case class KafkaCluster(name: String, bootstrapBrokers: String,
""".stripMargin
}
}
final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, clientGroupId: String,
clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean,
metricWhitelist: List[String],
prometheusConfig: Option[PrometheusConfig],
graphiteConfig: Option[GraphiteConfig]) {

final case class AppConfig(pollInterval: FiniteDuration, lookupTableSize: Int, sinkConfigs: List[SinkConfig], clientGroupId: String,
clientTimeout: FiniteDuration, clusters: List[KafkaCluster], strimziWatcher: Boolean) {
override def toString(): String = {
val graphiteString =
graphiteConfig.map { graphite => s"""
|Graphite:
| host: ${graphite.host}
| port: ${graphite.port}
| prefix: ${graphite.prefix}
""".stripMargin }.getOrElse("")
val prometheusString =
prometheusConfig.map { prometheus => s"""
|Prometheus:
| port: ${prometheus.port}
""".stripMargin }.getOrElse("")
val clusterString =
if (clusters.isEmpty)
" (none)"
else clusters.map(_.toString).mkString("\n")
val sinksString = sinkConfigs.mkString("")
s"""
|Poll interval: $pollInterval
|Lookup table size: $lookupTableSize
|Metrics whitelist: [${metricWhitelist.mkString(", ")}]
|Metrics whitelist: [${sinkConfigs.head.metricWhitelist.mkString(", ")}]
|Admin client consumer group id: $clientGroupId
|Kafka client timeout: $clientTimeout
|$prometheusString
|$graphiteString
|$sinksString
|Statically defined Clusters:
|$clusterString
|Watchers:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ abstract class EndpointSink (clusterGlobalLabels: ClusterGlobalLabels) extends M
globalLabelNames.map(l => globalLabelValuesForCluster.getOrElse(l, ""))
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (C) 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package com.lightbend.kafkalagexporter

import com.typesafe.config.Config
import scala.util.Try

class GraphiteEndpointConfig(sinkType: String, metricWhitelist: List[String], config: Config) extends SinkConfig(sinkType, metricWhitelist, config)
{
val port: Int = config.getInt("reporters.graphite.port")
val host: String = config.getString("reporters.graphite.host")
val prefix: Option[String] = Try(config.getString("reporters.graphite.prefix")).toOption

override def toString(): String = {
s"""
|Graphite:
| host: ${host}
| port: ${port}
| prefix: ${prefix}
"""
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ import scala.util.Try

object GraphiteEndpointSink {

def apply(metricWhitelist: List[String], clusterGlobalLabels: ClusterGlobalLabels,
graphiteConfig: Option[GraphiteConfig]): MetricsSink = {
Try(new GraphiteEndpointSink(metricWhitelist, clusterGlobalLabels, graphiteConfig))
def apply(graphiteConfig: GraphiteEndpointConfig, clusterGlobalLabels: ClusterGlobalLabels
): MetricsSink = {
Try(new GraphiteEndpointSink(graphiteConfig, clusterGlobalLabels))
.fold(t => throw new Exception("Could not create Graphite Endpoint", t), sink => sink)
}
}

class GraphiteEndpointSink private(metricWhitelist: List[String], clusterGlobalLabels: ClusterGlobalLabels,
graphiteConfig: Option[GraphiteConfig]) extends EndpointSink(clusterGlobalLabels) {
def graphitePush(graphiteConfig: GraphiteConfig, metricName: String, metricValue: Double): Unit = {
class GraphiteEndpointSink private(graphiteConfig: GraphiteEndpointConfig, clusterGlobalLabels:
ClusterGlobalLabels) extends EndpointSink(clusterGlobalLabels) {
def graphitePush(graphiteConfig: GraphiteEndpointConfig, metricName: String, metricValue: Double): Unit = {
Try(new Socket(graphiteConfig.host, graphiteConfig.port)) match {
case Success(socket) =>
Try(new PrintWriter(socket.getOutputStream)) match {
Expand All @@ -44,23 +44,19 @@ class GraphiteEndpointSink private(metricWhitelist: List[String], clusterGlobalL
*
* @example { label1=value1, label2=value2, name=myName } => "value1.value2.myName"
*
*/
*/
def metricNameToGraphiteMetricName(metricValue: MetricValue): String = {
(getGlobalLabelValuesOrDefault(metricValue.clusterName) ++ metricValue.labels
).map( x => x.replaceAll("\\.", "_")).mkString(".") + "." + metricValue.definition.name;
}

override def report(m: MetricValue): Unit = {
if (metricWhitelist.exists(m.definition.name.matches)) {
graphiteConfig.foreach { conf =>
graphitePush(conf, metricNameToGraphiteMetricName(m), m.value);
if (graphiteConfig.metricWhitelist.exists(m.definition.name.matches)) {
graphitePush(graphiteConfig, metricNameToGraphiteMetricName(m), m.value);
}
}
}

override def remove(m: RemoveMetric): Unit = {
}


}

143 changes: 143 additions & 0 deletions src/main/scala/com/lightbend/kafkalagexporter/InfluxDBPusherSink.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright (C) 2019 Lightbend Inc. <http://www.lightbend.com>
*/

package com.lightbend.kafkalagexporter

import com.lightbend.kafkalagexporter.MetricsSink._
import com.lightbend.kafkalagexporter.EndpointSink.ClusterGlobalLabels
import org.influxdb.{InfluxDB, InfluxDBFactory, BatchOptions}
import org.influxdb.InfluxDB.ConsistencyLevel
import org.influxdb.dto.{Query, Point, BatchPoints}
import java.io.IOException
import java.util.function.Consumer
import java.util.function.BiConsumer
import org.influxdb.dto.QueryResult
import java.lang.Iterable
import com.typesafe.scalalogging.Logger

import scala.util.Try

object InfluxDBPusherSink
{
def apply(sinkConfig: InfluxDBPusherSinkConfig, clusterGlobalLabels: ClusterGlobalLabels): MetricsSink =
{
Try(new InfluxDBPusherSink(sinkConfig, clusterGlobalLabels))
.fold(t => throw new IOException("Could not create Influx DB Pusher Sink", t), sink => sink)
}
}

class InfluxDBPusherSink private(sinkConfig: InfluxDBPusherSinkConfig, clusterGlobalLabels: ClusterGlobalLabels) extends EndpointSink(clusterGlobalLabels) {

val logger = Logger("InfluxDBPusherSink")
val influxDB = connect()
createDatabase()
enableBatching()

override def report(m: MetricValue): Unit = {
if (sinkConfig.metricWhitelist.exists(m.definition.name.matches)) {
if (!m.value.isNaN) {
write(m)
}
}
}

def write(m: MetricValue): Unit = {
try {
val point = buildPoint(m)
if (sinkConfig.async)
writeAsync(point)
else
writeSync(point)
} catch {
case t: Throwable =>
handlingFailure(t)
}
}

def writeAsync(point: Point): Unit = {
influxDB.write(point)
}

def writeSync(point: Point): Unit = {
val batchPoints = BatchPoints
.database(sinkConfig.database)
.consistency(ConsistencyLevel.ALL)
.build()

batchPoints.point(point)
influxDB.write(batchPoints)
}

def buildPoint(m: MetricValue): Point = {
val point = Point.measurement(m.definition.name)
val fields = m.definition.labels zip m.labels
fields.foreach { field => point.tag(field._1, field._2) }
point.addField("value", m.value)
return point.build()
}

override def remove(m: RemoveMetric): Unit = {
if (sinkConfig.metricWhitelist.exists(m.definition.name.matches))
logger.warn("Remove is not supported by InfluxDBPusherSink")
}

def enableBatching(): Unit = {
if (sinkConfig.async) {
influxDB.setDatabase(sinkConfig.database)
influxDB.enableBatch(BatchOptions.DEFAULTS.exceptionHandler(createExceptionHandler()))
}
}

def connect(): InfluxDB =
{
val url = sinkConfig.endpoint + ":" + sinkConfig.port
if (!sinkConfig.username.isEmpty) return InfluxDBFactory.connect(url, sinkConfig.username, sinkConfig.password)
else return InfluxDBFactory.connect(url)
}

def createDatabase() =
{
influxDB.query(new Query("CREATE DATABASE " + sinkConfig.database, sinkConfig.database), successQueryHandler(), failQueryHandler())
}

def successQueryHandler(): Consumer[QueryResult] =
{
return new Consumer[QueryResult] {
override def accept(result:QueryResult): Unit = {
logger.info(result.toString())
}
}
}

def failQueryHandler(): Consumer[Throwable] =
{
return new Consumer[Throwable] {
override def accept(throwable:Throwable): Unit = {
handlingFailure(throwable)
}
}
}

def createExceptionHandler(): BiConsumer[Iterable[Point], Throwable] =
{
return new BiConsumer[Iterable[Point], Throwable] {
override def accept(failedPoints:Iterable[Point], throwable:Throwable): Unit = {
handlingFailure(throwable)
}
}
}

def handlingFailure(t: Throwable): Unit = {
logger.error("Unrecoverable exception, will stop ", t)
stop()
throw t
}

override def stop(): Unit = {
if (influxDB.isBatchEnabled()) {
influxDB.disableBatch()
}
influxDB.close()
}
}
Loading

0 comments on commit 2841caf

Please sign in to comment.