Skip to content

Commit

Permalink
feat: Add safe_wal_size and wal_status to replication_slot (#1027)
Browse files Browse the repository at this point in the history
* feat: Add safe_wal_size to replication_slot

Signed-off-by: MarcWort <[email protected]>

* feat: Add wal_status to replication_slot

Signed-off-by: MarcWort <[email protected]>

---------

Signed-off-by: MarcWort <[email protected]>
  • Loading branch information
MarcWort authored May 11, 2024
1 parent cc0fd2e commit a4ac0e6
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 10 deletions.
40 changes: 38 additions & 2 deletions collector/pg_replication_slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ var (
"whether the replication slot is active or not",
[]string{"slot_name", "slot_type"}, nil,
)
pgReplicationSlotSafeWal = prometheus.NewDesc(
prometheus.BuildFQName(
namespace,
replicationSlotSubsystem,
"safe_wal_size_bytes",
),
"number of bytes that can be written to WAL such that this slot is not in danger of getting in state lost",
[]string{"slot_name", "slot_type"}, nil,
)
pgReplicationSlotWalStatus = prometheus.NewDesc(
prometheus.BuildFQName(
namespace,
replicationSlotSubsystem,
"wal_status",
),
"availability of WAL files claimed by this slot",
[]string{"slot_name", "slot_type", "wal_status"}, nil,
)

pgReplicationSlotQuery = `SELECT
slot_name,
Expand All @@ -73,7 +91,9 @@ var (
pg_current_wal_lsn() - '0/0'
END AS current_wal_lsn,
COALESCE(confirmed_flush_lsn, '0/0') - '0/0' AS confirmed_flush_lsn,
active
active,
safe_wal_size,
wal_status
FROM pg_replication_slots;`
)

Expand All @@ -92,7 +112,9 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance
var walLSN sql.NullFloat64
var flushLSN sql.NullFloat64
var isActive sql.NullBool
if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive); err != nil {
var safeWalSize sql.NullInt64
var walStatus sql.NullString
if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive, &safeWalSize, &walStatus); err != nil {
return err
}

Expand Down Expand Up @@ -131,6 +153,20 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance
pgReplicationSlotIsActiveDesc,
prometheus.GaugeValue, isActiveValue, slotNameLabel, slotTypeLabel,
)

if safeWalSize.Valid {
ch <- prometheus.MustNewConstMetric(
pgReplicationSlotSafeWal,
prometheus.GaugeValue, float64(safeWalSize.Int64), slotNameLabel, slotTypeLabel,
)
}

if walStatus.Valid {
ch <- prometheus.MustNewConstMetric(
pgReplicationSlotWalStatus,
prometheus.GaugeValue, 1, slotNameLabel, slotTypeLabel, walStatus.String,
)
}
}
return rows.Err()
}
21 changes: 13 additions & 8 deletions collector/pg_replication_slot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) {

inst := &instance{db: db}

columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"}
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
rows := sqlmock.NewRows(columns).
AddRow("test_slot", "physical", 5, 3, true)
AddRow("test_slot", "physical", 5, 3, true, 323906992, "reserved")
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
Expand All @@ -50,6 +50,8 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) {
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 5, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 3, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 1, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 323906992, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical", "wal_status": "reserved"}, value: 1, metricType: dto.MetricType_GAUGE},
}

convey.Convey("Metrics comparison", t, func() {
Expand All @@ -72,9 +74,9 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) {

inst := &instance{db: db}

columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"}
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
rows := sqlmock.NewRows(columns).
AddRow("test_slot", "physical", 6, 12, false)
AddRow("test_slot", "physical", 6, 12, false, -4000, "extended")
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
Expand All @@ -90,6 +92,8 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) {
expected := []MetricResult{
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: -4000, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical", "wal_status": "extended"}, value: 1, metricType: dto.MetricType_GAUGE},
}

convey.Convey("Metrics comparison", t, func() {
Expand All @@ -113,9 +117,9 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) {

inst := &instance{db: db}

columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"}
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
rows := sqlmock.NewRows(columns).
AddRow("test_slot", "physical", 6, 12, nil)
AddRow("test_slot", "physical", 6, 12, nil, nil, "lost")
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
Expand All @@ -131,6 +135,7 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) {
expected := []MetricResult{
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical", "wal_status": "lost"}, value: 1, metricType: dto.MetricType_GAUGE},
}

convey.Convey("Metrics comparison", t, func() {
Expand All @@ -153,9 +158,9 @@ func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) {

inst := &instance{db: db}

columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active"}
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
rows := sqlmock.NewRows(columns).
AddRow(nil, nil, nil, nil, true)
AddRow(nil, nil, nil, nil, true, nil, nil)
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
Expand Down

0 comments on commit a4ac0e6

Please sign in to comment.