Skip to content

Commit 57c06eb

Browse files
authored
[!] add Measurements group and use URIs for --sink options (#368)
* split Metrics into Metrics & Measurements * move options to appropriate groups * deprecate `UseConnPooling` option * improve Postgres writer log entries
1 parent 7e2aa9a commit 57c06eb

File tree

11 files changed

+141
-151
lines changed

11 files changed

+141
-151
lines changed

src/config/cmdoptions.go

Lines changed: 51 additions & 51 deletions
Large diffs are not rendered by default.

src/database.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,14 @@ func InitSQLConnPoolForMonitoredDBIfNil(md sources.MonitoredDatabase) error {
3333
}
3434

3535
conn, err := db.New(mainContext, md.ConnStr, func(conf *pgxpool.Config) error {
36-
conf.MaxConns = int32(opts.MaxParallelConnectionsPerDb)
36+
conf.MaxConns = int32(opts.Sources.MaxParallelConnectionsPerDb)
3737
return nil
3838
})
3939
if err != nil {
4040
return err
4141
}
4242

4343
monitoredDbConnCache[md.DBUniqueName] = conn
44-
logger.Debugf("[%s] Connection pool initialized with max %d parallel connections. Conn pooling: %v", md.DBUniqueName, opts.MaxParallelConnectionsPerDb, opts.UseConnPooling)
4544

4645
return nil
4746
}
@@ -57,13 +56,11 @@ func CloseOrLimitSQLConnPoolForMonitoredDBIfAny(dbUnique string) {
5756

5857
if IsDBUndersized(dbUnique) || IsDBIgnoredBasedOnRecoveryState(dbUnique) {
5958

60-
if opts.UseConnPooling {
61-
s := conn.Stat()
62-
if s.TotalConns() > 1 {
63-
logger.Debugf("[%s] Limiting SQL connection pool to max 1 connection due to dormant state ...", dbUnique)
64-
// conn.SetMaxIdleConns(1)
65-
// conn.SetMaxOpenConns(1)
66-
}
59+
s := conn.Stat()
60+
if s.TotalConns() > 1 {
61+
logger.Debugf("[%s] Limiting SQL connection pool to max 1 connection due to dormant state ...", dbUnique)
62+
// conn.SetMaxIdleConns(1)
63+
// conn.SetMaxOpenConns(1)
6764
}
6865

6966
} else { // removed from config
@@ -281,7 +278,7 @@ func DBGetPGVersion(ctx context.Context, dbUnique string, srcType sources.Kind,
281278
verNew.IsInRecovery = data[0]["pg_is_in_recovery"].(bool)
282279
verNew.RealDbname = data[0]["current_database"].(string)
283280

284-
if verNew.Version > VersionToInt("10.0") && opts.Metric.SystemIdentifierField > "" {
281+
if verNew.Version > VersionToInt("10.0") && opts.Measurements.SystemIdentifierField > "" {
285282
logger.Debugf("[%s] determining system identifier version (pg ver: %v)", dbUnique, verNew.VersionStr)
286283
data, err := DBExecReadByDbUniqueName(ctx, dbUnique, sqlSysid)
287284
if err == nil && len(data) > 0 {
@@ -940,12 +937,12 @@ func TryCreateMetricsFetchingHelpers(dbUnique string) error {
940937
}
941938

942939
if fileBasedMetrics {
943-
helpers, _, err := metrics.ReadMetricsFromFolder(mainContext, path.Join(opts.Metric.MetricsFolder, metrics.FileBasedMetricHelpersDir))
940+
helpers, _, err := metrics.ReadMetricsFromFolder(mainContext, path.Join(opts.Metrics.MetricsFolder, metrics.FileBasedMetricHelpersDir))
944941
if err != nil {
945-
logger.Errorf("Failed to fetch helpers from \"%s\": %s", path.Join(opts.Metric.MetricsFolder, metrics.FileBasedMetricHelpersDir), err)
942+
logger.Errorf("Failed to fetch helpers from \"%s\": %s", path.Join(opts.Metrics.MetricsFolder, metrics.FileBasedMetricHelpersDir), err)
946943
return err
947944
}
948-
logger.Debug("%d helper definitions found from \"%s\"...", len(helpers), path.Join(opts.Metric.MetricsFolder, metrics.FileBasedMetricHelpersDir))
945+
logger.Debug("%d helper definitions found from \"%s\"...", len(helpers), path.Join(opts.Metrics.MetricsFolder, metrics.FileBasedMetricHelpersDir))
949946

950947
for helperName := range helpers {
951948
if strings.Contains(helperName, "windows") {

src/db/conn.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ type PgxPoolIface interface {
2929
PgxIface
3030
Acquire(ctx context.Context) (*pgxpool.Conn, error)
3131
BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)
32-
Stat() *pgxpool.Stat
3332
Close()
33+
Config() *pgxpool.Config
3434
Ping(ctx context.Context) error
35+
Stat() *pgxpool.Stat
3536
}

src/log/formatter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
func newFormatter(noColors bool) *Formatter {
1515
return &Formatter{
1616
HideKeys: false,
17-
FieldsOrder: []string{"source", "metric", "sql", "params"},
17+
FieldsOrder: []string{"source", "sink", "metric", "rows", "sql", "params"},
1818
TimestampFormat: "2006-01-02 15:04:05.000",
1919
ShowFullLevel: true,
2020
NoColors: noColors,

src/main.go

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,6 @@ func VersionToInt(v string) uint {
188188
}
189189

190190
func RestoreSQLConnPoolLimitsForPreviouslyDormantDB(dbUnique string) {
191-
if !opts.UseConnPooling {
192-
return
193-
}
194191
monitoredDbConnCacheLock.Lock()
195192
defer monitoredDbConnCacheLock.Unlock()
196193

@@ -460,8 +457,8 @@ func FetchMetrics(ctx context.Context, msg MetricFetchMessage, hostState map[str
460457
}
461458

462459
isCacheable = IsCacheableMetric(msg, mvp)
463-
if isCacheable && opts.InstanceLevelCacheMaxSeconds > 0 && msg.Interval.Seconds() > float64(opts.InstanceLevelCacheMaxSeconds) {
464-
cachedData = GetFromInstanceCacheIfNotOlderThanSeconds(msg, opts.InstanceLevelCacheMaxSeconds)
460+
if isCacheable && opts.Metrics.InstanceLevelCacheMaxSeconds > 0 && msg.Interval.Seconds() > float64(opts.Metrics.InstanceLevelCacheMaxSeconds) {
461+
cachedData = GetFromInstanceCacheIfNotOlderThanSeconds(msg, opts.Metrics.InstanceLevelCacheMaxSeconds)
465462
if len(cachedData) > 0 {
466463
fromCache = true
467464
goto send_to_storageChannel
@@ -472,7 +469,7 @@ retry_with_superuser_sql: // if 1st fetch with normal SQL fails, try with SU SQL
472469

473470
sql = mvp.SQL
474471

475-
if opts.Metric.NoHelperFunctions && mvp.CallsHelperFunctions && mvp.SQLSU != "" {
472+
if opts.Metrics.NoHelperFunctions && mvp.CallsHelperFunctions && mvp.SQLSU != "" {
476473
logger.Debugf("[%s:%s] Using SU SQL instead of normal one due to --no-helper-functions input", msg.DBUniqueName, msg.MetricName)
477474
sql = mvp.SQLSU
478475
retryWithSuperuserSQL = false
@@ -557,13 +554,13 @@ retry_with_superuser_sql: // if 1st fetch with normal SQL fails, try with SU SQL
557554

558555
}
559556

560-
if isCacheable && opts.InstanceLevelCacheMaxSeconds > 0 && msg.Interval.Seconds() > float64(opts.InstanceLevelCacheMaxSeconds) {
557+
if isCacheable && opts.Metrics.InstanceLevelCacheMaxSeconds > 0 && msg.Interval.Seconds() > float64(opts.Metrics.InstanceLevelCacheMaxSeconds) {
561558
PutToInstanceCache(msg, data)
562559
}
563560

564561
send_to_storageChannel:
565562

566-
if (opts.Metric.RealDbnameField > "" || opts.Metric.SystemIdentifierField > "") && msg.Source == sources.SourcePostgres {
563+
if (opts.Measurements.RealDbnameField > "" || opts.Measurements.SystemIdentifierField > "") && msg.Source == sources.SourcePostgres {
567564
dbPgVersionMapLock.RLock()
568565
ver := dbPgVersionMap[msg.DBUniqueName]
569566
dbPgVersionMapLock.RUnlock()
@@ -658,16 +655,16 @@ func AddDbnameSysinfoIfNotExistsToQueryResultData(msg MetricFetchMessage, data m
658655

659656
logger.Debugf("Enriching all rows of [%s:%s] with sysinfo (%s) / real dbname (%s) if set. ", msg.DBUniqueName, msg.MetricName, ver.SystemIdentifier, ver.RealDbname)
660657
for _, dr := range data {
661-
if opts.Metric.RealDbnameField > "" && ver.RealDbname > "" {
662-
old, ok := dr[opts.Metric.RealDbnameField]
658+
if opts.Measurements.RealDbnameField > "" && ver.RealDbname > "" {
659+
old, ok := dr[opts.Measurements.RealDbnameField]
663660
if !ok || old == "" {
664-
dr[opts.Metric.RealDbnameField] = ver.RealDbname
661+
dr[opts.Measurements.RealDbnameField] = ver.RealDbname
665662
}
666663
}
667-
if opts.Metric.SystemIdentifierField > "" && ver.SystemIdentifier > "" {
668-
old, ok := dr[opts.Metric.SystemIdentifierField]
664+
if opts.Measurements.SystemIdentifierField > "" && ver.SystemIdentifier > "" {
665+
old, ok := dr[opts.Measurements.SystemIdentifierField]
669666
if !ok || old == "" {
670-
dr[opts.Metric.SystemIdentifierField] = ver.SystemIdentifier
667+
dr[opts.Measurements.SystemIdentifierField] = ver.SystemIdentifier
671668
}
672669
}
673670
enrichedData = append(enrichedData, dr)
@@ -779,7 +776,7 @@ func MetricGathererLoop(ctx context.Context,
779776
}
780777

781778
// 1st try local overrides for some metrics if operating in push mode
782-
if opts.DirectOSStats && IsDirectlyFetchableMetric(metricName) {
779+
if opts.Metrics.DirectOSStats && IsDirectlyFetchableMetric(metricName) {
783780
metricStoreMessages, err = FetchStatsDirectlyFromOS(mfm, vme, mvp)
784781
if err != nil {
785782
l.WithError(err).Errorf("Could not reader metric directly from OS")
@@ -1316,10 +1313,10 @@ func DoesEmergencyTriggerfileExist() bool {
13161313
// In highly automated K8s / IaC environments such a temporary change might involve pull requests, peer reviews, CI/CD etc
13171314
// which can all take too long vs "exec -it pgwatch3-pod -- touch /tmp/pgwatch3-emergency-pause".
13181315
// After creating the file it can still take up to --servers-refresh-loop-seconds (2min def.) for change to take effect!
1319-
if opts.EmergencyPauseTriggerfile == "" {
1316+
if opts.Metrics.EmergencyPauseTriggerfile == "" {
13201317
return false
13211318
}
1322-
_, err := os.Stat(opts.EmergencyPauseTriggerfile)
1319+
_, err := os.Stat(opts.Metrics.EmergencyPauseTriggerfile)
13231320
return err == nil
13241321
}
13251322

@@ -1374,7 +1371,7 @@ func LoadMetricDefs(ctx context.Context) (err error) {
13741371
var metricDefs metrics.MetricVersionDefs
13751372
var renamingDefs map[string]string
13761373
if fileBasedMetrics {
1377-
metricDefs, renamingDefs, err = metrics.ReadMetricsFromFolder(ctx, opts.Metric.MetricsFolder)
1374+
metricDefs, renamingDefs, err = metrics.ReadMetricsFromFolder(ctx, opts.Metrics.MetricsFolder)
13781375
} else {
13791376
metricDefs, renamingDefs, err = metrics.ReadMetricsFromPostgres(ctx, configDb)
13801377
}
@@ -1457,7 +1454,7 @@ func main() {
14571454
if err != nil {
14581455
logger.Fatal(err)
14591456
}
1460-
if opts.Source.Init {
1457+
if opts.Sources.Init {
14611458
return
14621459
}
14631460

@@ -1500,13 +1497,13 @@ func main() {
15001497
logger.Fatal("could not fetch active hosts - check config!", err)
15011498
} else {
15021499
logger.Error("could not fetch active hosts, using last valid config data. err:", err)
1503-
time.Sleep(time.Second * time.Duration(opts.Source.Refresh))
1500+
time.Sleep(time.Second * time.Duration(opts.Sources.Refresh))
15041501
continue
15051502
}
15061503
}
15071504

15081505
if DoesEmergencyTriggerfileExist() {
1509-
logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", opts.EmergencyPauseTriggerfile)
1506+
logger.Warningf("Emergency pause triggerfile detected at %s, ignoring currently configured DBs", opts.Metrics.EmergencyPauseTriggerfile)
15101507
monitoredDbs = make([]sources.MonitoredDatabase, 0)
15111508
}
15121509

@@ -1543,7 +1540,7 @@ func main() {
15431540
metricConfig = host.Metrics
15441541
wasInstancePreviouslyDormant := IsDBDormant(dbUnique)
15451542

1546-
if host.Encryption == "aes-gcm-256" && len(opts.Source.AesGcmKeyphrase) == 0 && len(opts.Source.AesGcmKeyphraseFile) == 0 {
1543+
if host.Encryption == "aes-gcm-256" && len(opts.Sources.AesGcmKeyphrase) == 0 && len(opts.Sources.AesGcmKeyphraseFile) == 0 {
15471544
// Warn if any encrypted hosts found but no keyphrase given
15481545
logger.Warningf("Encrypted password type found for host \"%s\", but no decryption keyphrase specified. Use --aes-gcm-keyphrase or --aes-gcm-keyphrase-file params", dbUnique)
15491546
}
@@ -1589,7 +1586,7 @@ func main() {
15891586
}
15901587

15911588
if !opts.Ping && host.IsSuperuser && host.IsPostgresSource() && !ver.IsInRecovery {
1592-
if opts.Metric.NoHelperFunctions {
1589+
if opts.Metrics.NoHelperFunctions {
15931590
logger.Infof("[%s] Skipping rollout out helper functions due to the --no-helper-functions flag ...", dbUnique)
15941591
} else {
15951592
logger.Infof("Trying to create helper functions if missing for \"%s\"...", dbUnique)
@@ -1602,10 +1599,10 @@ func main() {
16021599
if host.IsPostgresSource() {
16031600
var DBSizeMB int64
16041601

1605-
if opts.Source.MinDbSizeMB >= 8 { // an empty DB is a bit less than 8MB
1602+
if opts.Sources.MinDbSizeMB >= 8 { // an empty DB is a bit less than 8MB
16061603
DBSizeMB, _ = DBGetSizeMB(dbUnique) // ignore errors, i.e. only remove from monitoring when we're certain it's under the threshold
16071604
if DBSizeMB != 0 {
1608-
if DBSizeMB < opts.Source.MinDbSizeMB {
1605+
if DBSizeMB < opts.Sources.MinDbSizeMB {
16091606
logger.Infof("[%s] DB will be ignored due to the --min-db-size-mb filter. Current (up to %v cached) DB size = %d MB", dbUnique, dbSizeCachingInterval, DBSizeMB)
16101607
hostsToShutDownDueToRoleChange[dbUnique] = true // for the case when DB size was previosly above the threshold
16111608
SetUndersizedDBState(dbUnique, true)
@@ -1640,8 +1637,8 @@ func main() {
16401637
RestoreSQLConnPoolLimitsForPreviouslyDormantDB(dbUnique)
16411638
}
16421639

1643-
if mainLoopCount == 0 && opts.TryCreateListedExtsIfMissing != "" && !ver.IsInRecovery {
1644-
extsToCreate := strings.Split(opts.TryCreateListedExtsIfMissing, ",")
1640+
if mainLoopCount == 0 && opts.Sources.TryCreateListedExtsIfMissing != "" && !ver.IsInRecovery {
1641+
extsToCreate := strings.Split(opts.Sources.TryCreateListedExtsIfMissing, ",")
16451642
extsCreated := TryCreateMissingExtensions(dbUnique, extsToCreate, ver.Extensions)
16461643
logger.Infof("[%s] %d/%d extensions created based on --try-create-listed-exts-if-missing input %v", dbUnique, len(extsCreated), len(extsToCreate), extsCreated)
16471644
}
@@ -1804,9 +1801,9 @@ func main() {
18041801
mainLoopCount++
18051802
prevLoopMonitoredDBs = monitoredDbs
18061803

1807-
logger.Debugf("main sleeping %ds...", opts.Source.Refresh)
1804+
logger.Debugf("main sleeping %ds...", opts.Sources.Refresh)
18081805
select {
1809-
case <-time.After(time.Second * time.Duration(opts.Source.Refresh)):
1806+
case <-time.After(time.Second * time.Duration(opts.Sources.Refresh)):
18101807
// pass
18111808
case <-mainContext.Done():
18121809
return

src/sinks/json_file.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package sinks
33
import (
44
"context"
55
"encoding/json"
6-
"io"
76
"sync/atomic"
87

98
"github.com/cybertec-postgresql/pgwatch3/metrics"
@@ -16,24 +15,22 @@ var (
1615
)
1716

1817
type JSONWriter struct {
19-
ctx context.Context
20-
filename string
21-
w io.Writer
18+
ctx context.Context
19+
lw *lumberjack.Logger
2220
}
2321

2422
func NewJSONWriter(ctx context.Context, fname string) (*JSONWriter, error) {
2523
return &JSONWriter{
26-
ctx: ctx,
27-
filename: fname,
28-
w: &lumberjack.Logger{Filename: fname, Compress: true},
24+
ctx: ctx,
25+
lw: &lumberjack.Logger{Filename: fname, Compress: true},
2926
}, nil
3027
}
3128

3229
func (jw *JSONWriter) Write(msgs []metrics.MeasurementMessage) error {
3330
if len(msgs) == 0 {
3431
return nil
3532
}
36-
enc := json.NewEncoder(jw.w)
33+
enc := json.NewEncoder(jw.lw)
3734
for _, msg := range msgs {
3835
dataRow := map[string]any{
3936
"metric": msg.MetricName,

src/sinks/multiwriter.go

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package sinks
33
import (
44
"context"
55
"errors"
6+
"fmt"
7+
"strings"
68
"sync"
79

810
"github.com/cybertec-postgresql/pgwatch3/config"
@@ -23,37 +25,34 @@ type MultiWriter struct {
2325
}
2426

2527
// NewMultiWriter creates and returns new instance of MultiWriter struct.
26-
func NewMultiWriter(ctx context.Context, opts *config.Options, metricDefs metrics.MetricVersionDefs) (*MultiWriter, error) {
28+
func NewMultiWriter(ctx context.Context, opts *config.Options, metricDefs metrics.MetricVersionDefs) (mw *MultiWriter, err error) {
29+
var w Writer
2730
logger := log.GetLogger(ctx)
28-
mw := &MultiWriter{}
29-
for _, f := range opts.Metric.JSONStorageFile {
30-
jw, err := NewJSONWriter(ctx, f)
31-
if err != nil {
32-
return nil, err
31+
mw = &MultiWriter{}
32+
for _, s := range opts.Measurements.Sinks {
33+
scheme, path, found := strings.Cut(s, "://")
34+
if !found || scheme == "" || path == "" {
35+
return nil, fmt.Errorf("malformed sink URI %s", s)
3336
}
34-
mw.AddWriter(jw)
35-
logger.WithField("file", f).Info(`JSON output enabled`)
36-
}
37-
38-
for _, connstr := range opts.Metric.PGMetricStoreConnStr {
39-
pgw, err := NewPostgresWriter(ctx, connstr, opts, metricDefs)
40-
if err != nil {
41-
return nil, err
37+
switch scheme {
38+
case "jsonfile":
39+
w, err = NewJSONWriter(ctx, path)
40+
case "postgres", "postgresql":
41+
w, err = NewPostgresWriter(ctx, s, opts, metricDefs)
42+
case "prometheus":
43+
w, err = NewPrometheusWriter(ctx, path)
44+
default:
45+
return nil, fmt.Errorf("unknown schema %s in sink URI %s", scheme, s)
4246
}
43-
mw.AddWriter(pgw)
44-
logger.WithField("connstr", connstr).Info(`PostgreSQL output enabled`)
45-
}
46-
47-
if opts.Metric.PrometheusListenAddr > "" {
48-
promw, err := NewPrometheusWriter(ctx, opts)
4947
if err != nil {
5048
return nil, err
5149
}
52-
mw.AddWriter(promw)
53-
logger.WithField("listen", opts.Metric.PrometheusListenAddr).Info(`Prometheus output enabled`)
50+
mw.AddWriter(w)
51+
logger.WithField("sink", s).Info(`measurements sink added`)
5452
}
53+
5554
if len(mw.writers) == 0 {
56-
return nil, errors.New("no storages specified for metrics")
55+
return nil, errors.New("no sinks specified for measurements")
5756
}
5857
return mw, nil
5958
}

0 commit comments

Comments
 (0)