From b81d26c5af69ab1a6b9e464ca8c0ce37352afae8 Mon Sep 17 00:00:00 2001 From: MarcWort <113890636+MarcWort@users.noreply.github.com> Date: Tue, 7 May 2024 10:43:28 +0200 Subject: [PATCH 1/2] feat: Add safe_wal_size to replication_slot Signed-off-by: MarcWort <113890636+MarcWort@users.noreply.github.com> --- collector/pg_replication_slot.go | 22 ++++++++++++++++++++-- collector/pg_replication_slot_test.go | 18 ++++++++++-------- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/collector/pg_replication_slot.go b/collector/pg_replication_slot.go index 7f1ba003e..c9fb68815 100644 --- a/collector/pg_replication_slot.go +++ b/collector/pg_replication_slot.go @@ -63,6 +63,15 @@ var ( "whether the replication slot is active or not", []string{"slot_name", "slot_type"}, nil, ) + pgReplicationSlotSafeWal = prometheus.NewDesc( + prometheus.BuildFQName( + namespace, + replicationSlotSubsystem, + "safe_wal_size_bytes", + ), + "number of bytes that can be written to WAL such that this slot is not in danger of getting in state lost", + []string{"slot_name", "slot_type"}, nil, + ) pgReplicationSlotQuery = `SELECT slot_name, @@ -73,7 +82,8 @@ var ( pg_current_wal_lsn() - '0/0' END AS current_wal_lsn, COALESCE(confirmed_flush_lsn, '0/0') - '0/0' AS confirmed_flush_lsn, - active + active, + safe_wal_size FROM pg_replication_slots;` ) @@ -92,7 +102,8 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance var walLSN sql.NullFloat64 var flushLSN sql.NullFloat64 var isActive sql.NullBool - if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive); err != nil { + var safeWalSize sql.NullInt64 + if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive, &safeWalSize); err != nil { return err } @@ -131,6 +142,13 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance pgReplicationSlotIsActiveDesc, prometheus.GaugeValue, isActiveValue, slotNameLabel, slotTypeLabel, ) + + if safeWalSize.Valid { + ch <- prometheus.MustNewConstMetric( + pgReplicationSlotSafeWal, + prometheus.GaugeValue, float64(safeWalSize.Int64), slotNameLabel, slotTypeLabel, + ) + } } return rows.Err() } diff --git a/collector/pg_replication_slot_test.go b/collector/pg_replication_slot_test.go index 212050c46..ec0bee135 100644 --- a/collector/pg_replication_slot_test.go +++ b/collector/pg_replication_slot_test.go @@ -31,9 +31,9 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"} rows := sqlmock.NewRows(columns). - AddRow("test_slot", "physical", 5, 3, true) + AddRow("test_slot", "physical", 5, 3, true, 323906992) mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -50,6 +50,7 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 5, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 3, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 1, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 323906992, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { @@ -72,9 +73,9 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"} rows := sqlmock.NewRows(columns). - AddRow("test_slot", "physical", 6, 12, false) + AddRow("test_slot", "physical", 6, 12, false, -4000) mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -90,6 +91,7 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { expected := []MetricResult{ {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: -4000, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { @@ -113,9 +115,9 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"} rows := sqlmock.NewRows(columns). - AddRow("test_slot", "physical", 6, 12, nil) + AddRow("test_slot", "physical", 6, 12, nil, nil) mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -153,9 +155,9 @@ func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"} rows := sqlmock.NewRows(columns). - AddRow(nil, nil, nil, nil, true) + AddRow(nil, nil, nil, nil, true, nil) mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) From 7525c2ab9df3df17e5d8bff90f7a5bfe4c2a1fc7 Mon Sep 17 00:00:00 2001 From: MarcWort <113890636+MarcWort@users.noreply.github.com> Date: Tue, 7 May 2024 11:00:21 +0200 Subject: [PATCH 2/2] feat: Add wal_status to replication_slot Signed-off-by: MarcWort <113890636+MarcWort@users.noreply.github.com> --- collector/pg_replication_slot.go | 22 ++++++++++++++++++++-- collector/pg_replication_slot_test.go | 19 +++++++++++-------- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/collector/pg_replication_slot.go b/collector/pg_replication_slot.go index c9fb68815..1d29f8498 100644 --- a/collector/pg_replication_slot.go +++ b/collector/pg_replication_slot.go @@ -72,6 +72,15 @@ var ( "number of bytes that can be written to WAL such that this slot is not in danger of getting in state lost", []string{"slot_name", "slot_type"}, nil, ) + pgReplicationSlotWalStatus = prometheus.NewDesc( + prometheus.BuildFQName( + namespace, + replicationSlotSubsystem, + "wal_status", + ), + "availability of WAL files claimed by this slot", + []string{"slot_name", "slot_type", "wal_status"}, nil, + ) pgReplicationSlotQuery = `SELECT slot_name, @@ -83,7 +92,8 @@ var ( END AS current_wal_lsn, COALESCE(confirmed_flush_lsn, '0/0') - '0/0' AS confirmed_flush_lsn, active, - safe_wal_size + safe_wal_size, + wal_status FROM pg_replication_slots;` ) @@ -103,7 +113,8 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance var flushLSN sql.NullFloat64 var isActive sql.NullBool var safeWalSize sql.NullInt64 - if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive, &safeWalSize); err != nil { + var walStatus sql.NullString + if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive, &safeWalSize, &walStatus); err != nil { return err } @@ -149,6 +160,13 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance prometheus.GaugeValue, float64(safeWalSize.Int64), slotNameLabel, slotTypeLabel, ) } + + if walStatus.Valid { + ch <- prometheus.MustNewConstMetric( + pgReplicationSlotWalStatus, + prometheus.GaugeValue, 1, slotNameLabel, slotTypeLabel, walStatus.String, + ) + } } return rows.Err() } diff --git a/collector/pg_replication_slot_test.go b/collector/pg_replication_slot_test.go index ec0bee135..174743ac3 100644 --- a/collector/pg_replication_slot_test.go +++ b/collector/pg_replication_slot_test.go @@ -31,9 +31,9 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). - AddRow("test_slot", "physical", 5, 3, true, 323906992) + AddRow("test_slot", "physical", 5, 3, true, 323906992, "reserved") mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -51,6 +51,7 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) { {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 3, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 1, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 323906992, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical", "wal_status": "reserved"}, value: 1, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { @@ -73,9 +74,9 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). - AddRow("test_slot", "physical", 6, 12, false, -4000) + AddRow("test_slot", "physical", 6, 12, false, -4000, "extended") mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -92,6 +93,7 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) { {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: -4000, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical", "wal_status": "extended"}, value: 1, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { @@ -115,9 +117,9 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). - AddRow("test_slot", "physical", 6, 12, nil, nil) + AddRow("test_slot", "physical", 6, 12, nil, nil, "lost") mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric) @@ -133,6 +135,7 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) { expected := []MetricResult{ {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE}, {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"slot_name": "test_slot", "slot_type": "physical", "wal_status": "lost"}, value: 1, metricType: dto.MetricType_GAUGE}, } convey.Convey("Metrics comparison", t, func() { @@ -155,9 +158,9 @@ func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) { inst := &instance{db: db} - columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"} + columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"} rows := sqlmock.NewRows(columns). - AddRow(nil, nil, nil, nil, true, nil) + AddRow(nil, nil, nil, nil, true, nil, nil) mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows) ch := make(chan prometheus.Metric)