Skip to content

Commit

Permalink
Merge pull request #290 from lphuberdeau/feature-loadbalancer
Browse files Browse the repository at this point in the history
Add  option to make the self label uniform when using a load balancer
  • Loading branch information
kbudde authored Jul 21, 2022
2 parents 1e8fc07 + 1b06d6a commit 2231dcf
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 28 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Environment variable|default|description
RABBIT_URL | <http://127.0.0.1:15672>| url to rabbitMQ management plugin (must start with http(s)://)
RABBIT_USER | guest | username for rabbitMQ management plugin. User needs monitoring tag!
RABBIT_PASSWORD | guest | password for rabbitMQ management plugin
RABBIT_CONNECTION | direct | direct or loadbalancer, strips the self label when loadbalancer
RABBIT_USER_FILE| | location of file with username (useful for docker secrets)
RABBIT_PASSWORD_FILE | | location of file with password (useful for docker secrets)
PUBLISH_PORT | 9419 | Listening port for the exporter
Expand Down
20 changes: 20 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var (
RabbitURL: "http://127.0.0.1:15672",
RabbitUsername: "guest",
RabbitPassword: "guest",
RabbitConnection: "direct",
PublishPort: "9419",
PublishAddr: "",
OutputFormat: "TTY", //JSON
Expand All @@ -43,6 +44,7 @@ type rabbitExporterConfig struct {
RabbitURL string `json:"rabbit_url"`
RabbitUsername string `json:"rabbit_user"`
RabbitPassword string `json:"rabbit_pass"`
RabbitConnection string `json:"rabbit_connection"`
PublishPort string `json:"publish_port"`
PublishAddr string `json:"publish_addr"`
OutputFormat string `json:"output_format"`
Expand Down Expand Up @@ -117,6 +119,14 @@ func initConfig() {
}
}

if connection := os.Getenv("RABBIT_CONNECTION"); connection != "" {
if valid, _ := regexp.MatchString("(direct|loadbalancer)", connection); valid {
config.RabbitConnection = connection
} else {
panic(fmt.Errorf("rabbit connection must be direct or loadbalancer"))
}
}

var user string
var pass string

Expand Down Expand Up @@ -251,3 +261,13 @@ func isCapEnabled(config rabbitExporterConfig, cap rabbitCapability) bool {
exists, enabled := config.RabbitCapabilities[cap]
return exists && enabled
}

func selfLabel(config rabbitExporterConfig, isSelf bool) string {
if config.RabbitConnection == "loadbalancer" {
return "lb"
} else if isSelf {
return "1"
} else {
return "0"
}
}
36 changes: 36 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,39 @@ func TestConfig_EnabledExporters(t *testing.T) {
t.Errorf("Invalid Exporters list. diff\n%v", diff)
}
}

func TestConfig_RabbitConnection_Default(t *testing.T) {
defer os.Unsetenv("RABBIT_CONNECTION")

os.Unsetenv("RABBIT_CONNECTION")
initConfig()

if config.RabbitConnection != "direct" {
t.Errorf("RabbitConnection unspecified. It should default to direct. expected=%v,got=%v", "direct", config.RabbitConnection)
}
}

func TestConfig_RabbitConnection_LoadBalaner(t *testing.T) {
newValue := "loadbalancer"
defer os.Unsetenv("RABBIT_CONNECTION")

os.Setenv("RABBIT_CONNECTION", newValue)
initConfig()

if config.RabbitConnection != newValue {
t.Errorf("RabbitConnection specified. It should be modified. expected=%v,got=%v", newValue, config.RabbitConnection)
}
}

func TestConfig_RabbitConnection_Invalid(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("initConfig should panic on invalid rabbit connection config")
}
}()
newValue := "invalid"
defer os.Unsetenv("RABBIT_CONNECTION")

os.Setenv("RABBIT_CONNECTION", newValue)
initConfig()
}
10 changes: 2 additions & 8 deletions exporter_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,14 @@ func (e exporterConnections) Collect(ctx context.Context, ch chan<- prometheus.M
for key, gauge := range e.connectionMetricsG {
for _, connD := range rabbitConnectionResponses {
if value, ok := connD.metrics[key]; ok {
self := "0"
if connD.labels["node"] == selfNode {
self = "1"
}
self := selfLabel(config, connD.labels["node"] == selfNode)
gauge.WithLabelValues(cluster, connD.labels["vhost"], connD.labels["node"], connD.labels["peer_host"], connD.labels["user"], self).Add(value)
}
}
}

for _, connD := range rabbitConnectionResponses {
self := "0"
if connD.labels["node"] == selfNode {
self = "1"
}
self := selfLabel(config, connD.labels["node"] == selfNode)
e.stateMetric.WithLabelValues(cluster, connD.labels["vhost"], connD.labels["node"], connD.labels["peer_host"], connD.labels["user"], connD.labels["state"], self).Add(1)
}

Expand Down
5 changes: 1 addition & 4 deletions exporter_federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ func (e exporterFederation) Collect(ctx context.Context, ch chan<- prometheus.Me
}

for _, federation := range federationData {
self := "0"
if federation.labels["node"] == selfNode {
self = "1"
}
self := selfLabel(config, federation.labels["node"] == selfNode)
e.stateMetric.WithLabelValues(cluster, federation.labels["vhost"], federation.labels["node"], federation.labels["queue"], federation.labels["exchange"], self, federation.labels["status"]).Set(1)
}

Expand Down
5 changes: 1 addition & 4 deletions exporter_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,7 @@ func (e exporterMemory) Collect(ctx context.Context, ch chan<- prometheus.Metric
}

for _, node := range nodeData {
self := "0"
if node.labels["name"] == selfNode {
self = "1"
}
self := selfLabel(config, node.labels["name"] == selfNode)
rabbitMemoryResponses, err := getMetricMap(config, fmt.Sprintf("nodes/%s/memory", node.labels["name"]))
if err != nil {
return err
Expand Down
5 changes: 1 addition & 4 deletions exporter_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,7 @@ func (e exporterNode) Collect(ctx context.Context, ch chan<- prometheus.Metric)
for key, gauge := range e.nodeMetricsGauge {
for _, node := range nodeData {
if value, ok := node.metrics[key]; ok {
self := "0"
if node.labels["name"] == selfNode {
self = "1"
}
self := selfLabel(config, node.labels["name"] == selfNode)
gauge.WithLabelValues(cluster, node.labels["name"], self).Set(value)
}
}
Expand Down
5 changes: 1 addition & 4 deletions exporter_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,7 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric)
continue
}

self := "0"
if queue.labels["node"] == selfNode {
self = "1"
}
self := selfLabel(config, queue.labels["node"] == selfNode)
labelValues := []string{cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self}

for key, gaugevec := range e.queueMetricsGauge {
Expand Down
5 changes: 1 addition & 4 deletions exporter_shovel.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ func (e exporterShovel) Collect(ctx context.Context, ch chan<- prometheus.Metric
}

for _, shovel := range shovelData {
self := "0"
if shovel.labels["node"] == selfNode {
self = "1"
}
self := selfLabel(config, shovel.labels["node"] == selfNode)
e.stateMetric.WithLabelValues(cluster, shovel.labels["vhost"], shovel.labels["name"], shovel.labels["type"], self, shovel.labels["state"]).Set(1)
}

Expand Down
17 changes: 17 additions & 0 deletions exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,23 @@ func TestResetMetricsOnRabbitFailure(t *testing.T) {
dontExpectSubstring(t, body, `rabbitmq_connection_received_packets{cluster="my-rabbit@ae74c041248b",node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",self="1",user="rmq_oms",vhost="/"}`)
})

t.Run("RabbitMQ is using loadbalancer -> self is always 1", func(t *testing.T) {
rabbitUP = true
rabbitQueuesUp = true
config.RabbitConnection = "loadbalancer"
req, _ := http.NewRequest("GET", "", nil)
w := httptest.NewRecorder()
promhttp.Handler().ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("Home page didn't return %v", http.StatusOK)
}
body := w.Body.String()
t.Log(body)

// queue
expectSubstring(t, body, `rabbitmq_queue_messages_ready{cluster="my-rabbit@ae74c041248b",durable="true",policy="ha-2",queue="myQueue2",self="lb",vhost="/"} 25`)
})

}

func TestExporter(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func main() {
"PUBLISH_PORT": config.PublishPort,
"RABBIT_URL": config.RabbitURL,
"RABBIT_USER": config.RabbitUsername,
"RABBIT_CONNECTION": config.RabbitConnection,
"OUTPUT_FORMAT": config.OutputFormat,
"RABBIT_CAPABILITIES": formatCapabilities(config.RabbitCapabilities),
"RABBIT_EXPORTERS": config.EnabledExporters,
Expand Down

0 comments on commit 2231dcf

Please sign in to comment.