Skip to content
This repository has been archived by the owner on Mar 17, 2024. It is now read-only.

Ability to extend and configure desired sink to report lag metrics, adding support to push lag metrics into InfluxDB as well #157

Merged
merged 8 commits into from
Dec 17, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ object MetricsReporter {
case (context, Stop(sender)) =>
Behaviors.stopped { () =>
metricsSink.stop()
context.log.info("Gracefully stopped metrics sink")
context.log.info(s"Gracefully stopped $metricsSink")
sender ! KafkaClusterManager.Done
}
case (context, m) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.github.fsanaulla.scalatest.embedinflux.EmbeddedInfluxDB
import org.scalatest.concurrent.{Eventually, IntegrationPatience}
import org.scalatest.{Matchers, TryValues}
import org.scalatest.time.{Seconds, Millis , Span}
import sys.process._

class InfluxDBPusherSinkTest extends fixture.FreeSpec with Matchers
Expand All @@ -32,37 +33,37 @@ class InfluxDBPusherSinkTest extends fixture.FreeSpec with Matchers
test(Fixture(properties, port))
}

def doQuery(url: String, query: String): String = {
val database = "kafka_lag_exporter"
val cmd = Seq("curl", "-G", s"$url/query", "--data-urlencode", s"db=$database", "--data-urlencode", s"q=$query")
val output = cmd.!!
return output
}
seglo marked this conversation as resolved.
Show resolved Hide resolved

"InfluxDBPusherSinkImpl should" - {

"create database" in { fixture =>
val sink = InfluxDBPusherSink(new InfluxDBPusherSinkConfig("InfluxDBPusherSink", List("kafka_consumergroup_group_max_lag"), ConfigFactory.parseMap(mapAsJavaMap(fixture.properties))), Map("cluster" -> Map.empty))
Thread.sleep(1000)
val port = fixture.port
val url = s"http://localhost:$port"
val query = "SHOW DATABASES"
val cmd = Seq("curl", "-G", s"$url/query", "--data-urlencode", s"q=$query")
val output = cmd.!!
output should include("kafka_lag_exporter")

eventually (timeout(Span(5, Seconds)), interval(Span(500, Millis))) { doQuery(url, query) should include("kafka_lag_exporter") }
seglo marked this conversation as resolved.
Show resolved Hide resolved
}

"report metrics which match the regex" in { fixture =>
val sink = InfluxDBPusherSink(new InfluxDBPusherSinkConfig("InfluxDBPusherSink", List("kafka_consumergroup_group_max_lag"), ConfigFactory.parseMap(mapAsJavaMap(fixture.properties))), Map("cluster" -> Map.empty))
sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupOffsetLagMetric, "cluster_test", "group_test", 100))
sink.report(Metrics.GroupValueMessage(Metrics.MaxGroupTimeLagMetric, "cluster_test", "group_test", 101))

Thread.sleep(5000)

val port = fixture.port
val url = s"http://localhost:$port"
val database = "kafka_lag_exporter"

val whitelist_query = "SELECT * FROM kafka_consumergroup_group_max_lag"
val blacklist_query = "SELECT * FROM kafka_consumergroup_group_max_lag_seconds"
var cmd = Seq("curl", "-G", s"$url/query", "--data-urlencode", s"db=$database", "--data-urlencode", s"q=$whitelist_query")
var output = cmd.!!
output should (include("cluster_test") and include("group_test") and include("100"))
cmd = Seq("curl", "-G", s"$url/query", "--data-urlencode", s"db=$database", "--data-urlencode", s"q=$blacklist_query")
output = cmd.!!
output should (not include("cluster_test") and not include("group_test") and not include("101"))

eventually (timeout(Span(5, Seconds)), interval(Span(500, Millis))) { doQuery(url, whitelist_query) should (include("cluster_test") and include("group_test") and include("100")) }
eventually (timeout(Span(5, Seconds)), interval(Span(500, Millis))) { doQuery(url, blacklist_query) should (not include("cluster_test") and not include("group_test") and not include("101")) }
}
}
}