Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.18 AS builder
FROM golang:1.22 AS builder

ENV USER=app
ENV UID=10001
Expand Down
20 changes: 20 additions & 0 deletions config/metric_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,33 @@ type MetricConfig struct {
StaticValue *float64 `yaml:"static_value,omitempty"`
TimestampValue string `yaml:"timestamp_value,omitempty"` // optional column name containing a valid timestamp value

// SHOW STATS filtering and transformation features
RowFilters []RowFilter `yaml:"row_filters,omitempty"` // filter rows post-query
ColumnFilters []string `yaml:"column_filters,omitempty"` // include only these columns
LagCalculations []LagCalculation `yaml:"lag_calculations,omitempty"` // calculate time lag for timestamp fields

valueType prometheus.ValueType // TypeString converted to prometheus.ValueType
query *QueryConfig // QueryConfig resolved from QueryRef or generated from Query

// Catches all undefined fields and must be empty after parsing.
XXX map[string]any `yaml:",inline" json:"-"`
}

// RowFilter defines conditions to filter rows after query execution
type RowFilter struct {
Column string `yaml:"column"` // column name to filter on
Operator string `yaml:"operator"` // "equals", "in", "not_in", "contains", "not_equals"
Value string `yaml:"value,omitempty"` // single value for equals/not_equals/contains
Values []string `yaml:"values,omitempty"` // multiple values for in/not_in
}

// LagCalculation defines how to calculate time lag from timestamp fields
type LagCalculation struct {
SourceColumn string `yaml:"source_column"` // column containing the timestamp (e.g., "high_value")
OutputColumn string `yaml:"output_column"` // new column name for the lag value (e.g., "lag_seconds")
TimestampFormat string `yaml:"timestamp_format,omitempty"` // format of timestamp, defaults to Trino format
}

// ValueType returns the metric type, converted to a prometheus.ValueType.
func (m *MetricConfig) ValueType() prometheus.ValueType {
return m.valueType
Expand Down
215 changes: 211 additions & 4 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"
"log/slog"
"strings"
"time"

"github.com/burningalchemist/sql_exporter/config"
Expand Down Expand Up @@ -41,14 +42,35 @@ func NewQuery(logContext string, qc *config.QueryConfig, metricFamilies ...*Metr
columnTypes := make(columnTypeMap)

for _, mf := range metricFamilies {
// Create a map of output columns created by transformations
transformedColumns := make(map[string]bool)
for _, lagCalc := range mf.config.LagCalculations {
transformedColumns[lagCalc.OutputColumn] = true
// Add source columns to columnTypes since they're needed from SQL
// Use columnTypeKey since timestamp values are strings, not numbers
if err := setColumnType(logContext, lagCalc.SourceColumn, columnTypeKey, columnTypes); err != nil {
return nil, err
}
}

// Add columns used in row filters
for _, filter := range mf.config.RowFilters {
if err := setColumnType(logContext, filter.Column, columnTypeKey, columnTypes); err != nil {
return nil, err
}
}

for _, kcol := range mf.config.KeyLabels {
if err := setColumnType(logContext, kcol, columnTypeKey, columnTypes); err != nil {
return nil, err
}
}
for _, vcol := range mf.config.Values {
if err := setColumnType(logContext, vcol, columnTypeValue, columnTypes); err != nil {
return nil, err
// Skip columns that are created by transformations
if !transformedColumns[vcol] {
if err := setColumnType(logContext, vcol, columnTypeValue, columnTypes); err != nil {
return nil, err
}
}
}
if mf.config.TimestampValue != "" {
Expand All @@ -64,6 +86,14 @@ func NewQuery(logContext string, qc *config.QueryConfig, metricFamilies ...*Metr
columnTypes: columnTypes,
logContext: logContext,
}

// Debug logging to see what columns we're expecting
expectedColumns := make([]string, 0, len(columnTypes))
for col := range columnTypes {
expectedColumns = append(expectedColumns, col)
}
slog.Debug("Expected columns from SQL", "logContext", logContext, "columns", expectedColumns)

return &q, nil
}

Expand All @@ -82,11 +112,13 @@ func setColumnType(logContext, columnName string, ctype columnType, columnTypes

// Collect is the equivalent of prometheus.Collector.Collect() but takes a context to run in and a database to run on.
func (q *Query) Collect(ctx context.Context, conn *sql.DB, ch chan<- Metric) {
collectStart := time.Now()

if ctx.Err() != nil {
ch <- NewInvalidMetric(errors.Wrap(q.logContext, ctx.Err()))

return
}

rows, err := q.run(ctx, conn)
if err != nil {
ch <- NewInvalidMetric(err)
Expand All @@ -103,19 +135,48 @@ func (q *Query) Collect(ctx context.Context, conn *sql.DB, ch chan<- Metric) {
ch <- NewInvalidMetric(err)
return
}

totalRowsProcessed := 0
totalRowsFiltered := 0
metricsGenerated := 0

for rows.Next() {
totalRowsProcessed++

row, err := q.scanRow(rows, dest)
if err != nil {
ch <- NewInvalidMetric(err)
continue
}

// Apply row filtering and transformations for each metric family
for _, mf := range q.metricFamilies {
mf.Collect(row, ch)
// Apply row filters - skip row if it doesn't match
if !q.shouldIncludeRow(row, mf.config) {
totalRowsFiltered++
continue
}

// Apply lag calculations and other transformations
transformedRow := q.applyTransformations(row, mf.config)

mf.Collect(transformedRow, ch)
metricsGenerated++
}
}

if err1 := rows.Err(); err1 != nil {
ch <- NewInvalidMetric(errors.Wrap(q.logContext, err1))
}

// Log performance summary
slog.Debug("Query collection completed",
"logContext", q.logContext,
"duration_ms", time.Since(collectStart).Milliseconds(),
"rows_processed", totalRowsProcessed,
"rows_filtered", totalRowsFiltered,
"metrics_generated", metricsGenerated,
)
}

// run executes the query on the provided database, in the provided context.
Expand Down Expand Up @@ -230,3 +291,149 @@ func (q *Query) scanRow(rows *sql.Rows, dest []any) (map[string]any, errors.With
}
return result, nil
}

// shouldIncludeRow checks if a row matches the configured row filters
func (q *Query) shouldIncludeRow(row map[string]any, metric *config.MetricConfig) bool {
for _, filter := range metric.RowFilters {
if !q.applyRowFilter(row, filter) {
return false
}
}
return true
}

// applyRowFilter applies a single row filter to determine if row should be included
func (q *Query) applyRowFilter(row map[string]any, filter config.RowFilter) bool {
value, exists := row[filter.Column]
if !exists {
return false
}

// Handle sql.NullString, sql.NullFloat64, sql.NullTime types from updated codebase
var valueStr string
switch v := value.(type) {
case sql.NullString:
if !v.Valid {
return false
}
valueStr = v.String
case sql.NullFloat64:
if !v.Valid {
return false
}
valueStr = fmt.Sprintf("%v", v.Float64)
case sql.NullTime:
if !v.Valid {
return false
}
valueStr = v.Time.Format("2006-01-02 15:04:05.000 UTC")
default:
valueStr = fmt.Sprintf("%v", value)
}

switch filter.Operator {
case "equals":
return valueStr == filter.Value
case "not_equals":
return valueStr != filter.Value
case "in":
for _, v := range filter.Values {
if valueStr == v {
return true
}
}
return false
case "not_in":
for _, v := range filter.Values {
if valueStr == v {
return false
}
}
return true
case "contains":
return strings.Contains(valueStr, filter.Value)
default:
slog.Warn("Unknown filter operator", "operator", filter.Operator)
return true
}
}

// applyTransformations applies configured transformations like lag calculations to a row
func (q *Query) applyTransformations(row map[string]any, metric *config.MetricConfig) map[string]any {
result := make(map[string]any)

// Copy original row data
for k, v := range row {
result[k] = v
}

// Apply lag calculations
for _, lagCalc := range metric.LagCalculations {
if sourceValue, exists := row[lagCalc.SourceColumn]; exists {
lagSeconds := q.calculateLag(sourceValue, lagCalc.TimestampFormat)
// Create a sql.NullFloat64 to match the expected type system
result[lagCalc.OutputColumn] = sql.NullFloat64{Float64: lagSeconds, Valid: lagSeconds != 0}
}
}

// Apply column filtering if specified
if len(metric.ColumnFilters) > 0 {
filtered := make(map[string]any)
for _, column := range metric.ColumnFilters {
if value, exists := result[column]; exists {
filtered[column] = value
}
}
return filtered
}

return result
}

// calculateLag calculates the lag in seconds between a timestamp and current time
func (q *Query) calculateLag(timestampValue any, format string) float64 {
if timestampValue == nil {
return 0
}

var timestampStr string

// Handle different timestamp value types from the updated codebase
switch v := timestampValue.(type) {
case sql.NullString:
if !v.Valid {
return 0
}
timestampStr = v.String
case sql.NullTime:
if !v.Valid {
return 0
}
// Calculate lag directly from time.Time
return time.Since(v.Time).Seconds()
case string:
timestampStr = v
default:
timestampStr = fmt.Sprintf("%v", timestampValue)
}

if timestampStr == "" {
return 0
}

// Default format for Trino timestamps
if format == "" {
format = "2006-01-02 15:04:05.000 UTC"
}

// Parse the timestamp
parsedTime, err := time.Parse(format, timestampStr)
if err != nil {
slog.Warn("Failed to parse timestamp for lag calculation", "timestamp", timestampStr, "format", format, "error", err)
return 0
}

// Calculate lag in seconds
lag := time.Since(parsedTime).Seconds()
return lag
}
Loading