Skip to content

Commit 302739a

Browse files
authored
[+] get rid of control channel map, closes #434 (#435)
1 parent 81ac3ac commit 302739a

File tree

3 files changed

+30
-68
lines changed

3 files changed

+30
-68
lines changed

src/metrics/logparse.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func eventCountsToMetricStoreMessages(eventCounts, eventCountsTotal map[string]i
111111
}}
112112
}
113113

114-
func ParseLogs(ctx context.Context, conn db.PgxIface, mdb *sources.MonitoredDatabase, realDbname, metricName string, configMap map[string]float64, controlCh <-chan ControlMessage, storeCh chan<- []MeasurementMessage) {
114+
func ParseLogs(ctx context.Context, conn db.PgxIface, mdb *sources.MonitoredDatabase, realDbname, metricName string, configMap map[string]float64, storeCh chan<- []MeasurementMessage) {
115115

116116
var latest, previous, serverMessagesLang string
117117
var latestHandle *os.File
@@ -131,16 +131,8 @@ func ParseLogs(ctx context.Context, conn db.PgxIface, mdb *sources.MonitoredData
131131
logger := log.GetLogger(ctx)
132132
for { // re-try loop. re-start in case of FS errors or just to refresh host config
133133
select {
134-
case msg := <-controlCh:
135-
logger.Debug("got control msg", dbUniqueName, metricName, msg)
136-
if msg.Action == gathererStatusStart {
137-
config = msg.Config
138-
interval = config[metricName]
139-
logger.Debug("started MetricGathererLoop for ", dbUniqueName, metricName, " interval:", interval)
140-
} else if msg.Action == gathererStatusStop {
141-
logger.Debug("exiting MetricGathererLoop for ", dbUniqueName, metricName, " interval:", interval)
142-
return
143-
}
134+
case <-ctx.Done():
135+
return
144136
default:
145137
if interval == 0 {
146138
interval = config[metricName]

src/metrics/types.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -93,16 +93,6 @@ type MeasurementMessage struct {
9393
SystemIdentifier string
9494
}
9595

96-
const (
97-
gathererStatusStart = "START"
98-
gathererStatusStop = "STOP"
99-
)
100-
101-
type ControlMessage struct {
102-
Action string // START, STOP, PAUSE
103-
Config map[string]float64
104-
}
105-
10696
type Reader interface {
10797
GetMetrics() (*Metrics, error)
10898
}

src/reaper/reaper.go

Lines changed: 27 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"github.com/cybertec-postgresql/pgwatch3/sources"
1717
"github.com/shopspring/decimal"
1818
"github.com/sirupsen/logrus"
19-
"golang.org/x/exp/maps"
2019
)
2120

2221
var monitoredDbs sources.MonitoredDatabases
@@ -29,6 +28,7 @@ type Reaper struct {
2928
opts *config.Options
3029
sourcesReaderWriter sources.ReaderWriter
3130
metricsReaderWriter metrics.ReaderWriter
31+
measurementCh chan []metrics.MeasurementMessage
3232
}
3333

3434
func NewReaper(opts *config.Options, sourcesReaderWriter sources.ReaderWriter, metricsReaderWriter metrics.ReaderWriter) *Reaper {
@@ -42,7 +42,7 @@ func NewReaper(opts *config.Options, sourcesReaderWriter sources.ReaderWriter, m
4242
func (r *Reaper) Reap(mainContext context.Context) (err error) {
4343
var measurementsWriter *sinks.MultiWriter
4444

45-
controlChannels := make(map[string](chan metrics.ControlMessage)) // [db1+metric1]=chan
45+
cancelFuncs := make(map[string]context.CancelFunc) // [db1+metric1]=chan
4646

4747
firstLoop := true
4848
mainLoopCount := 0
@@ -60,14 +60,13 @@ func (r *Reaper) Reap(mainContext context.Context) (err error) {
6060
if measurementsWriter, err = sinks.NewMultiWriter(mainContext, opts, metricDefinitionMap); err != nil {
6161
logger.Fatal(err)
6262
}
63-
measurementCh := make(chan []metrics.MeasurementMessage, 10000)
63+
r.measurementCh = make(chan []metrics.MeasurementMessage, 10000)
6464
if !opts.Ping {
65-
go measurementsWriter.WriteMeasurements(mainContext, measurementCh)
65+
go measurementsWriter.WriteMeasurements(mainContext, r.measurementCh)
6666
}
6767

6868
for { //main loop
6969
hostsToShutDownDueToRoleChange := make(map[string]bool) // hosts went from master to standby and have "only if master" set
70-
var controlChannelNameList []string
7170
gatherersShutDown := 0
7271

7372
if monitoredDbs, err = sourcesReaderWriter.GetMonitoredDatabases(); err != nil {
@@ -92,7 +91,7 @@ func (r *Reaper) Reap(mainContext context.Context) (err error) {
9291
UpdateMonitoredDBCache(monitoredDbs)
9392

9493
if lastMonitoredDBsUpdate.IsZero() || lastMonitoredDBsUpdate.Before(time.Now().Add(-1*time.Second*monitoredDbsDatastoreSyncIntervalSeconds)) {
95-
go SyncMonitoredDBsToDatastore(mainContext, monitoredDbs, measurementCh)
94+
go SyncMonitoredDBsToDatastore(mainContext, monitoredDbs, r.measurementCh)
9695
lastMonitoredDBsUpdate = time.Now()
9796
}
9897

@@ -237,13 +236,14 @@ func (r *Reaper) Reap(mainContext context.Context) (err error) {
237236
}
238237

239238
dbMetric := dbUnique + dbMetricJoinStr + metric
240-
_, chOk := controlChannels[dbMetric]
239+
_, chOk := cancelFuncs[dbMetric]
241240

242241
if metricDefOk && !chOk { // initialize a new per db/per metric control channel
243242
if interval > 0 {
244243
hostMetricIntervalMap[dbMetric] = interval
245244
logger.WithField("source", dbUnique).WithField("metric", metric).WithField("interval", interval).Info("starting gatherer")
246-
controlChannels[dbMetric] = make(chan metrics.ControlMessage, 1)
245+
metricCtx, cancelFunc := context.WithCancel(mainContext)
246+
cancelFuncs[dbMetric] = cancelFunc
247247

248248
metricNameForStorage := metricName
249249
if _, isSpecialMetric := specialMetrics[metricName]; !isSpecialMetric {
@@ -264,21 +264,20 @@ func (r *Reaper) Reap(mainContext context.Context) (err error) {
264264
logger.Error(err)
265265
}
266266

267-
go MetricGathererLoop(mainContext,
267+
go r.reapMetricMeasurementsFromSource(metricCtx,
268268
dbUnique,
269269
dbUniqueOrig,
270270
srcType,
271271
metric,
272-
metricConfig,
273-
controlChannels[dbMetric],
274-
measurementCh,
275-
opts)
272+
metricConfig)
276273
}
277274
} else if (!metricDefOk && chOk) || interval <= 0 {
278275
// metric definition files were recently removed or interval set to zero
276+
if cancelFunc, isOk := cancelFuncs[dbMetric]; isOk {
277+
cancelFunc()
278+
}
279279
logger.Warning("shutting down metric", metric, "for", monitoredDB.DBUniqueName)
280-
controlChannels[dbMetric] <- metrics.ControlMessage{Action: gathererStatusStop}
281-
delete(controlChannels, dbMetric)
280+
delete(cancelFuncs, dbMetric)
282281
} else if !metricDefOk {
283282
epoch, ok := lastSQLFetchError.Load(metric)
284283
if !ok || ((time.Now().Unix() - epoch.(int64)) > 3600) { // complain only 1x per hour
@@ -288,8 +287,7 @@ func (r *Reaper) Reap(mainContext context.Context) (err error) {
288287
} else {
289288
// check if interval has changed
290289
if hostMetricIntervalMap[dbMetric] != interval {
291-
logger.Warning("sending interval update for", dbUnique, metric)
292-
controlChannels[dbMetric] <- metrics.ControlMessage{Action: gathererStatusStart, Config: metricConfig}
290+
logger.Warning("updating interval update for", dbUnique, metric)
293291
hostMetricIntervalMap[dbMetric] = interval
294292
}
295293
}
@@ -303,9 +301,7 @@ func (r *Reaper) Reap(mainContext context.Context) (err error) {
303301
// loop over existing channels and stop workers if DB or metric removed from config
304302
// or state change makes it uninteresting
305303
logger.Debug("checking if any workers need to be shut down...")
306-
controlChannelNameList = maps.Keys(controlChannels)
307-
308-
for _, dbMetric := range controlChannelNameList {
304+
for dbMetric, cancelFunc := range cancelFuncs {
309305
var currentMetricConfig map[string]float64
310306
var dbInfo *sources.MonitoredDatabase
311307
var ok, dbRemovedFromConfig bool
@@ -351,9 +347,9 @@ func (r *Reaper) Reap(mainContext context.Context) (err error) {
351347

352348
if mainContext.Err() != nil || wholeDbShutDownDueToRoleChange || dbRemovedFromConfig || singleMetricDisabled {
353349
logger.Infof("shutting down gatherer for [%s:%s] ...", db, metric)
354-
controlChannels[dbMetric] <- metrics.ControlMessage{Action: gathererStatusStop}
355-
delete(controlChannels, dbMetric)
356-
logger.Debugf("control channel for [%s:%s] deleted", db, metric)
350+
cancelFunc()
351+
delete(cancelFuncs, dbMetric)
352+
logger.Debugf("cancel function for [%s:%s] deleted", db, metric)
357353
gatherersShutDown++
358354
ClearDBUnreachableStateIfAny(db)
359355
if err := measurementsWriter.SyncMetrics(db, metric, "remove"); err != nil {
@@ -384,25 +380,19 @@ func (r *Reaper) Reap(mainContext context.Context) (err error) {
384380
}
385381

386382
// metrics.ControlMessage notifies of shutdown + interval change
387-
func MetricGathererLoop(ctx context.Context,
383+
func (r *Reaper) reapMetricMeasurementsFromSource(ctx context.Context,
388384
dbUniqueName, dbUniqueNameOrig string,
389385
srcType sources.Kind,
390386
metricName string,
391-
configMap map[string]float64,
392-
controlCh <-chan metrics.ControlMessage,
393-
storeCh chan<- []metrics.MeasurementMessage,
394-
opts *config.Options) {
387+
configMap map[string]float64) {
395388

396-
config := configMap
397-
interval := config[metricName]
398389
hostState := make(map[string]map[string]string)
399390
var lastUptimeS int64 = -1 // used for "server restarted" event detection
400391
var lastErrorNotificationTime time.Time
401392
var vme DBVersionMapEntry
402393
var mvp metrics.Metric
403394
var err error
404395
failedFetches := 0
405-
// metricNameForStorage := metricName
406396
lastDBVersionFetchTime := time.Unix(0, 0) // check DB ver. ev. 5 min
407397

408398
l := log.GetLogger(ctx).WithField("source", dbUniqueName).WithField("metric", metricName)
@@ -415,13 +405,14 @@ func MetricGathererLoop(ctx context.Context,
415405
realDbname := dbPgVersionMap[dbUniqueName].RealDbname // to manage 2 sets of event counts - monitored DB + global
416406
dbPgVersionMapLock.RUnlock()
417407
conn := mdb.Conn
418-
metrics.ParseLogs(ctx, conn, mdb, realDbname, metricName, configMap, controlCh, storeCh) // no return
408+
metrics.ParseLogs(ctx, conn, mdb, realDbname, metricName, configMap, r.measurementCh) // no return
419409
return
420410
}
421411

422412
for {
413+
interval := configMap[metricName]
423414
if lastDBVersionFetchTime.Add(time.Minute * time.Duration(5)).Before(time.Now()) {
424-
vme, err = DBGetPGVersion(ctx, dbUniqueName, srcType, false, opts.Measurements.SystemIdentifierField) // in case of errors just ignore metric "disabled" time ranges
415+
vme, err = DBGetPGVersion(ctx, dbUniqueName, srcType, false, r.opts.Measurements.SystemIdentifierField) // in case of errors just ignore metric "disabled" time ranges
425416
if err != nil {
426417
lastDBVersionFetchTime = time.Now()
427418
}
@@ -445,15 +436,15 @@ func MetricGathererLoop(ctx context.Context,
445436
}
446437

447438
// 1st try local overrides for some metrics if operating in push mode
448-
if opts.Metrics.DirectOSStats && IsDirectlyFetchableMetric(metricName) {
439+
if r.opts.Metrics.DirectOSStats && IsDirectlyFetchableMetric(metricName) {
449440
metricStoreMessages, err = FetchStatsDirectlyFromOS(ctx, mfm, vme, mvp)
450441
if err != nil {
451442
l.WithError(err).Errorf("Could not reader metric directly from OS")
452443
}
453444
}
454445
t1 := time.Now()
455446
if metricStoreMessages == nil {
456-
metricStoreMessages, err = FetchMetrics(ctx, mfm, hostState, storeCh, "", opts)
447+
metricStoreMessages, err = FetchMetrics(ctx, mfm, hostState, r.measurementCh, "", r.opts)
457448
}
458449
t2 := time.Now()
459450

@@ -499,27 +490,16 @@ func MetricGathererLoop(ctx context.Context,
499490
}
500491
}
501492

502-
storeCh <- metricStoreMessages
493+
r.measurementCh <- metricStoreMessages
503494
}
504495
}
505496

506497
select {
507498
case <-ctx.Done():
508499
return
509-
case msg := <-controlCh:
510-
l.Debug("got control msg", msg)
511-
if msg.Action == gathererStatusStart {
512-
config = msg.Config
513-
interval = config[metricName]
514-
l.Debug("started MetricGathererLoop with interval:", interval)
515-
} else if msg.Action == gathererStatusStop {
516-
l.Debug("exiting MetricGathererLoop with interval:", interval)
517-
return
518-
}
519500
case <-time.After(time.Second * time.Duration(interval)):
520501
l.Debugf("MetricGathererLoop slept for %s", time.Second*time.Duration(interval))
521502
}
522-
523503
}
524504
}
525505

0 commit comments

Comments
 (0)