Skip to content

Commit

Permalink
Merge pull request #132 from marevers/add-query-iterator
Browse files Browse the repository at this point in the history
Support for iterating over query results
  • Loading branch information
dewey authored Oct 22, 2024
2 parents e69d565 + 55b6ef7 commit a72f7b8
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 11 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ jobs:
startup_sql:
- 'SET lock_timeout = 1000'
- 'SET idle_in_transaction_session_timeout = 100'
# iterator is an optional mechanism to iterate over a series of values, e.g. multiple databases
iterator:
# sql is the SQL to execute to retrieve the list of values to iterate over -
# query result must be a single column
sql: 'SELECT database_name FROM databases'
# placeholder should be present in the original query and not also used as an environment variable
# e.g. {{PLACEHOLDER}} - it will be replaced by the values retrieved by the query
placeholder: PLACEHOLDER
# label is the label name to which the iterator value gets assigned
label: database
# queries is a map of Metric/Query mappings
queries:
# name is prefixed with sql_ and used as the metric name
Expand Down
9 changes: 9 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ type Job struct {
Connections []string `yaml:"connections"`
Queries []*Query `yaml:"queries"`
StartupSQL []string `yaml:"startup_sql"` // SQL executed on startup
Iterator Iterator `yaml:"iterator"` // Iterator configuration
}

type connection struct {
Expand All @@ -162,6 +163,7 @@ type connection struct {
database string
user string
tokenExpirationTime time.Time
iteratorValues []string
}

// Query is an SQL query that is executed on a connection
Expand All @@ -180,3 +182,10 @@ type Query struct {
Query string `yaml:"query"` // a literal query
QueryRef string `yaml:"query_ref"` // references a query in the query map
}

// Iterator is a mechanism to repeat queries from a job based on the results of another query
type Iterator struct {
SQL string `yaml:"sql"` // SQL to execute to retrieve iterator values
Placeholder string `yaml:"placeholder"` // Placeholder in query to be replaced
Label string `yaml:"label"` // Label to assign iterator values to
}
56 changes: 51 additions & 5 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ func (j *Job) Init(logger log.Logger, queries map[string]string) error {
// try to satisfy prometheus naming restrictions
name := MetricNameRE.ReplaceAllString("sql_"+q.Name, "")
help := q.Help

// append the iterator label if it is set
if j.Iterator.Label != "" {
q.Labels = append(q.Labels, j.Iterator.Label)
}

// prepare a new metrics descriptor
//
// the tricky part here is that the *order* of labels has to match the
Expand Down Expand Up @@ -428,6 +434,36 @@ func (j *Job) runOnceConnection(conn *connection, done chan int) {
return
}

// execute iterator SQL
if j.Iterator.SQL != "" {
level.Debug(j.log).Log("msg", "IteratorSQL", "Query:", j.Iterator.SQL)
rows, err := conn.conn.Queryx(j.Iterator.SQL)
if err != nil {
level.Warn(j.log).Log("msg", "Failed to run iterator query", "err", err, "host", conn.host)
j.markFailed(conn)
// we don't have the query name yet.
failedQueryCounter.WithLabelValues(j.Name, "").Inc()
return
}

defer rows.Close()

var ivs []string
for rows.Next() {
var value string
err := rows.Scan(&value)
if err != nil {
level.Warn(j.log).Log("msg", "Failed to read iterator values", "err", err, "host", conn.host)
j.markFailed(conn)
// we don't have the query name yet.
failedQueryCounter.WithLabelValues(j.Name, "").Inc()
return
}
ivs = append(ivs, value)
}
conn.iteratorValues = ivs
}

for _, q := range j.Queries {
if q == nil {
continue
Expand All @@ -437,11 +473,21 @@ func (j *Job) runOnceConnection(conn *connection, done chan int) {
level.Warn(q.log).Log("msg", "Skipping query. Collector is nil")
continue
}
level.Debug(q.log).Log("msg", "Running Query")
// execute the query on the connection
if err := q.Run(conn); err != nil {
level.Warn(q.log).Log("msg", "Failed to run query", "err", err)
continue
// repeat query with iterator values if set and the query has the iterator placeholder
if conn.iteratorValues != nil && q.HasIterator(j.Iterator.Placeholder) {
level.Debug(q.log).Log("msg", "Running Iterator Query")
// execute the query with iterator on the connection
if err := q.RunIterator(conn, j.Iterator.Placeholder, conn.iteratorValues, j.Iterator.Label); err != nil {
level.Warn(q.log).Log("msg", "Failed to run query", "err", err)
continue
}
} else {
level.Debug(q.log).Log("msg", "Running Query")
// execute the query on the connection
if err := q.Run(conn); err != nil {
level.Warn(q.log).Log("msg", "Failed to run query", "err", err)
continue
}
}
level.Debug(q.log).Log("msg", "Query finished")
updated++
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
prom_collectors_version "github.com/prometheus/client_golang/prometheus/collectors/version"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
_ "go.uber.org/automaxprocs"
)

Expand Down
97 changes: 93 additions & 4 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"
"strconv"
"strings"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -50,7 +51,7 @@ func (q *Query) Run(conn *connection) error {
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
continue
}
m, err := q.updateMetrics(conn, res)
m, err := q.updateMetrics(conn, res, "", "")
if err != nil {
level.Error(q.log).Log("msg", "Failed to update metrics", "err", err, "host", conn.host, "db", conn.database)
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
Expand All @@ -77,8 +78,90 @@ func (q *Query) Run(conn *connection) error {
return nil
}

// RunIterator runs the query for each iterator value on a single connection
func (q *Query) RunIterator(conn *connection, ph string, ivs []string, il string) error {
if q.log == nil {
q.log = log.NewNopLogger()
}
queryCounter.WithLabelValues(q.jobName, q.Name).Inc()
if q.desc == nil {
failedQueryCounter.WithLabelValues(q.jobName, q.Name).Inc()
return fmt.Errorf("metrics descriptor is nil")
}
if q.Query == "" {
failedQueryCounter.WithLabelValues(q.jobName, q.Name).Inc()
return fmt.Errorf("query is empty")
}
if conn == nil || conn.conn == nil {
failedQueryCounter.WithLabelValues(q.jobName, q.Name).Inc()
return fmt.Errorf("db connection not initialized (should not happen)")
}

// execute query for each iterator value
now := time.Now()
metrics := make([]prometheus.Metric, 0, len(q.metrics))
updated := 0
for _, iv := range ivs {
rows, err := conn.conn.Queryx(q.ReplaceIterator(ph, iv))
if err != nil {
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
failedQueryCounter.WithLabelValues(q.jobName, q.Name).Inc()
return err
}
defer rows.Close()

for rows.Next() {
res := make(map[string]interface{})
err := rows.MapScan(res)
if err != nil {
level.Error(q.log).Log("msg", "Failed to scan", "err", err, "host", conn.host, "db", conn.database)
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
continue
}
m, err := q.updateMetrics(conn, res, iv, il)
if err != nil {
level.Error(q.log).Log("msg", "Failed to update metrics", "err", err, "host", conn.host, "db", conn.database)
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
continue
}
metrics = append(metrics, m...)
updated++
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(0.0)
}
}

duration := time.Since(now)
queryDurationHistogram.WithLabelValues(q.jobName, q.Name).Observe(duration.Seconds())

if updated < 1 {
if q.AllowZeroRows {
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(0.0)
} else {
return fmt.Errorf("zero rows returned")
}
}

// update the metrics cache
q.Lock()
q.metrics[conn] = metrics
q.Unlock()

return nil
}

// HasIterator returns true if the query contains the given placeholder
func (q *Query) HasIterator(ph string) bool {
return strings.Contains(q.Query, ph)
}

// ReplaceIterator replaces a given placeholder with an iterator value and returns a new query
func (q *Query) ReplaceIterator(ph string, iv string) string {
iteratorReplacer := strings.NewReplacer(fmt.Sprint("{{", ph, "}}"), iv)
return iteratorReplacer.Replace(q.Query)
}

// updateMetrics parses the result set and returns a slice of const metrics
func (q *Query) updateMetrics(conn *connection, res map[string]interface{}) ([]prometheus.Metric, error) {
func (q *Query) updateMetrics(conn *connection, res map[string]interface{}, iv string, il string) ([]prometheus.Metric, error) {
// if no value were defined to be parsed, return immediately
if len(q.Values) == 0 {
level.Debug(q.log).Log("msg", "No values defined in configuration, skipping metric update")
Expand All @@ -87,7 +170,7 @@ func (q *Query) updateMetrics(conn *connection, res map[string]interface{}) ([]p
updated := 0
metrics := make([]prometheus.Metric, 0, len(q.Values))
for _, valueName := range q.Values {
m, err := q.updateMetric(conn, res, valueName)
m, err := q.updateMetric(conn, res, valueName, iv, il)
if err != nil {
level.Error(q.log).Log(
"msg", "Failed to update metric",
Expand All @@ -108,7 +191,7 @@ func (q *Query) updateMetrics(conn *connection, res map[string]interface{}) ([]p
}

// updateMetrics parses a single row and returns a const metric
func (q *Query) updateMetric(conn *connection, res map[string]interface{}, valueName string) (prometheus.Metric, error) {
func (q *Query) updateMetric(conn *connection, res map[string]interface{}, valueName string, iv string, il string) (prometheus.Metric, error) {
var value float64
if i, ok := res[valueName]; ok {
switch f := i.(type) {
Expand Down Expand Up @@ -154,6 +237,12 @@ func (q *Query) updateMetric(conn *connection, res map[string]interface{}, value
// added below
labels := make([]string, 0, len(q.Labels)+5)
for _, label := range q.Labels {
// append iterator value to the labels
if label == il && iv != "" {
labels = append(labels, iv)
continue
}

// we need to fill every spot in the slice or the key->value mapping
// won't match up in the end.
//
Expand Down

0 comments on commit a72f7b8

Please sign in to comment.