Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Prometheus metric to track sql stmts per transaction #29

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ get:
fmt: get
go fmt ./...

tidy:
go mod tidy

# assert that there is no difference after running format
no-diff:
git diff --exit-code
Expand All @@ -25,7 +28,7 @@ test: vet get-ginkgo


# test target which includes the no-diff fail condition
ci-test: fmt no-diff test
ci-test: fmt tidy no-diff test

test-docker:
docker build -f Dockerfile.test .
Expand Down
38 changes: 37 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type managedConn struct {
reset bool
killed bool
mu sync.RWMutex

execStmtsCounter int // count the number of exec calls in a transaction
queryStmtsCounter int // count the number of query calls in a transaction
}

// BeginTx calls the underlying BeginTx method unless the supervising context
Expand All @@ -34,7 +37,12 @@ func (c *managedConn) BeginTx(ctx context.Context, opts driver.TxOptions) (drive
}

if conn, ok := c.conn.(driver.ConnBeginTx); ok {
return conn.BeginTx(ctx, opts)
tx, err := conn.BeginTx(ctx, opts)
if err != nil {
return nil, err
}

return &managedTx{tx: tx, conn: c, ctx: ctx}, nil
}

// same as is defined in go sql package to call Begin method if the TxOptions are default
Expand Down Expand Up @@ -71,6 +79,7 @@ func (c *managedConn) Exec(query string, args []driver.Value) (driver.Result, er
if !ok {
return nil, driver.ErrSkip
}
c.incExecStmtsCounter() //increment the exec counter to keep track of the number of exec calls
return conn.Exec(query, args)
}

Expand All @@ -79,6 +88,7 @@ func (c *managedConn) ExecContext(ctx context.Context, query string, args []driv
if !ok {
return nil, driver.ErrSkip
}
c.incExecStmtsCounter() //increment the exec counter to keep track of the number of exec calls
return conn.ExecContext(ctx, query, args)
}

Expand All @@ -95,6 +105,7 @@ func (c *managedConn) Query(query string, args []driver.Value) (driver.Rows, err
if !ok {
return nil, driver.ErrSkip
}
c.incQueryStmtsCounter() //increment the query counter to keep track of the number of query calls
return conn.Query(query, args)
}

Expand All @@ -103,6 +114,7 @@ func (c *managedConn) QueryContext(ctx context.Context, query string, args []dri
if !ok {
return nil, driver.ErrSkip
}
c.incQueryStmtsCounter() //increment the query counter to keep track of the number of query calls
return conn.QueryContext(ctx, query, args)
}

Expand Down Expand Up @@ -184,3 +196,27 @@ func (c *managedConn) GetKill() bool {
defer c.mu.RUnlock()
return c.killed
}

func (c *managedConn) incExecStmtsCounter() {
c.mu.Lock()
defer c.mu.Unlock()
c.execStmtsCounter++
}

func (c *managedConn) resetExecStmtsCounter() {
c.mu.Lock()
defer c.mu.Unlock()
c.execStmtsCounter = 0
}

func (c *managedConn) incQueryStmtsCounter() {
c.mu.Lock()
defer c.mu.Unlock()
c.queryStmtsCounter++
}

func (c *managedConn) resetQueryStmtsCounter() {
c.mu.Lock()
defer c.mu.Unlock()
c.queryStmtsCounter = 0
}
202 changes: 201 additions & 1 deletion conn_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package hotload

import (
"context"
"database/sql/driver"
"io"
"strings"
"sync"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"sync"
"github.com/prometheus/client_golang/prometheus/testutil"
)

var _ = Describe("managedConn", func() {
Expand Down Expand Up @@ -34,3 +40,197 @@ var _ = Describe("managedConn", func() {
Consistently(readLockAcquired).Should(BeFalse())
})
})

/**** Mocks for Prometheus Metrics ****/

type mockDriverConn struct{}

type mockTx struct{}

func (mockTx) Commit() error {
return nil
}

func (mockTx) Rollback() error {
return nil
}

func (mockDriverConn) Prepare(query string) (driver.Stmt, error) {
return nil, nil
}

func (mockDriverConn) Begin() (driver.Tx, error) {
return mockTx{}, nil
}

func (mockDriverConn) Close() error {
return nil
}

func (mockDriverConn) IsValid() bool {
return true
}

func (mockDriverConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
return mockTx{}, nil
}

func (mockDriverConn) Exec(query string, args []driver.Value) (driver.Result, error) {
return nil, nil
}

func (mockDriverConn) Query(query string, args []driver.Value) (driver.Rows, error) {
return nil, nil
}

func (mockDriverConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
return nil, nil
}

func (mockDriverConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
return nil, nil
}

/**** End Mocks for Prometheus Metrics ****/

var _ = Describe("PrometheusMetrics", func() {
const help = `
# HELP transaction_sql_stmts The number of sql stmts called in a transaction by statement type per grpc service and method
# TYPE transaction_sql_stmts summary
`

var service1Metrics = `
transaction_sql_stmts_sum{grpc_method="method_1",grpc_service="service_1",stmt="exec"} 3
transaction_sql_stmts_count{grpc_method="method_1",grpc_service="service_1",stmt="exec"} 1
transaction_sql_stmts_sum{grpc_method="method_1",grpc_service="service_1",stmt="query"} 3
transaction_sql_stmts_count{grpc_method="method_1",grpc_service="service_1",stmt="query"} 1
`

var service2Metrics = `
transaction_sql_stmts_sum{grpc_method="method_2",grpc_service="service_2",stmt="exec"} 4
transaction_sql_stmts_count{grpc_method="method_2",grpc_service="service_2",stmt="exec"} 1
transaction_sql_stmts_sum{grpc_method="method_2",grpc_service="service_2",stmt="query"} 4
transaction_sql_stmts_count{grpc_method="method_2",grpc_service="service_2",stmt="query"} 1
`

var service1RerunMetrics = `
transaction_sql_stmts_sum{grpc_method="method_1",grpc_service="service_1",stmt="exec"} 4
transaction_sql_stmts_count{grpc_method="method_1",grpc_service="service_1",stmt="exec"} 2
transaction_sql_stmts_sum{grpc_method="method_1",grpc_service="service_1",stmt="query"} 4
transaction_sql_stmts_count{grpc_method="method_1",grpc_service="service_1",stmt="query"} 2
`

var noMethodMetrics = `
transaction_sql_stmts_sum{grpc_method="",grpc_service="",stmt="exec"} 1
transaction_sql_stmts_count{grpc_method="",grpc_service="",stmt="exec"} 1
transaction_sql_stmts_sum{grpc_method="",grpc_service="",stmt="query"} 1
transaction_sql_stmts_count{grpc_method="",grpc_service="",stmt="query"} 1
`

It("Should emit the correct metrics", func() {
mc := newManagedConn(context.Background(), mockDriverConn{})

ctx := ContextWithExecLabels(context.Background(), map[string]string{"grpc_method": "method_1", "grpc_service": "service_1"})

// begin a transaction
tx, err := mc.BeginTx(ctx, driver.TxOptions{})
Expect(err).ShouldNot(HaveOccurred())

// exec a statement
mc.Exec("INSERT INTO table (column) VALUES (?)", []driver.Value{"value"})

// query a statement
mc.Query("SELECT * FROM table WHERE column = ?", []driver.Value{"value"})
mc.Query("SELECT * FROM table WHERE column = ?", []driver.Value{"value"})

// exec a statement with context
mc.ExecContext(ctx, "INSERT INTO table (column) VALUES (?)", []driver.NamedValue{{Value: "value"}})
mc.ExecContext(ctx, "INSERT INTO table (column) VALUES (?)", []driver.NamedValue{{Value: "value"}})

// query a statement with context
mc.QueryContext(ctx, "SELECT * FROM table WHERE column = ?", []driver.NamedValue{{Value: "value"}})

// commit the transaction
err = tx.Commit()
Expect(err).ShouldNot(HaveOccurred())

// collect and compare metrics
err = testutil.CollectAndCompare(sqlStmtsSummary, strings.NewReader(help+service1Metrics))
Expect(err).ShouldNot(HaveOccurred())

// reset the metrics
// new context
ctx = ContextWithExecLabels(context.Background(), map[string]string{"grpc_method": "method_2", "grpc_service": "service_2"})
// begin a transaction
tx, err = mc.BeginTx(ctx, driver.TxOptions{})
Expect(err).ShouldNot(HaveOccurred())

// exec a statement
mc.Exec("INSERT INTO table (column) VALUES (?)", []driver.Value{"value"})
mc.Exec("INSERT INTO table (column) VALUES (?)", []driver.Value{"value"})

// query a statement
mc.Query("SELECT * FROM table WHERE column = ?", []driver.Value{"value"})
mc.Query("SELECT * FROM table WHERE column = ?", []driver.Value{"value"})

// exec a statement with context
mc.ExecContext(ctx, "INSERT INTO table (column) VALUES (?)", []driver.NamedValue{{Value: "value"}})
mc.ExecContext(ctx, "INSERT INTO table (column) VALUES (?)", []driver.NamedValue{{Value: "value"}})

// query a statement with context
mc.QueryContext(ctx, "SELECT * FROM table WHERE column = ?", []driver.NamedValue{{Value: "value"}})
mc.QueryContext(ctx, "SELECT * FROM table WHERE column = ?", []driver.NamedValue{{Value: "value"}})

// commit the transaction
err = tx.Commit()
Expect(err).ShouldNot(HaveOccurred())

// collect and compare metrics
err = testutil.CollectAndCompare(sqlStmtsSummary, strings.NewReader(help+service1Metrics+service2Metrics))
Expect(err).ShouldNot(HaveOccurred())

// rerun with initial metrics
ctx = ContextWithExecLabels(context.Background(), map[string]string{"grpc_method": "method_1", "grpc_service": "service_1"})
// begin a transaction
tx, err = mc.BeginTx(ctx, driver.TxOptions{})
Expect(err).ShouldNot(HaveOccurred())

// exec a statement with context
mc.ExecContext(ctx, "INSERT INTO table (column) VALUES (?)", []driver.NamedValue{{Value: "value"}})

// query a statement with context
mc.QueryContext(ctx, "SELECT * FROM table WHERE column = ?", []driver.NamedValue{{Value: "value"}})

// rollback the transaction
err = tx.Rollback()
Expect(err).ShouldNot(HaveOccurred())

// collect and compare metrics
err = testutil.CollectAndCompare(sqlStmtsSummary, strings.NewReader(help+service1RerunMetrics+service2Metrics))
Expect(err).ShouldNot(HaveOccurred())

// non labeled context
ctx = context.Background()
// begin a transaction
tx, err = mc.BeginTx(ctx, driver.TxOptions{})
Expect(err).ShouldNot(HaveOccurred())

// exec query context
mc.ExecContext(ctx, "INSERT INTO table (column) VALUES (?)", []driver.NamedValue{{Value: "value"}})

// query a statement with context
mc.QueryContext(ctx, "SELECT * FROM table WHERE column = ?", []driver.NamedValue{{Value: "value"}})

// commit the transaction
err = tx.Commit()
Expect(err).ShouldNot(HaveOccurred())

// collect and compare metrics
err = testutil.CollectAndCompare(sqlStmtsSummary, strings.NewReader(help+noMethodMetrics+service1RerunMetrics+service2Metrics))
Expect(err).ShouldNot(HaveOccurred())
})
})

func CollectAndCompareMetrics(r io.Reader) error {
return testutil.CollectAndCompare(sqlStmtsSummary, r)
}
21 changes: 15 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,26 @@ require (
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.27.6
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.18.0
github.com/sirupsen/logrus v1.9.0
)

require (
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/nxadm/tail v1.4.8 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.7.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading
Loading