Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions internal/manager/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,95 @@ var aggregateLatencyHistogramVec = prometheus.NewHistogramVec(prometheus.Histogr
Buckets: prometheus.ExponentialBuckets(0.001, 2, 12),
}, []string{"rpaas_instance", "service_name", "zone"})

// Error/Reliability Metrics
var readOperationsCounterVec = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "rate_limit_control_plane",
Name: "rpaas_nginx_pod_read_operations_total",
Help: "Total number of read operations on RPaaS nginx pods",
}, []string{"pod_name", "service_name", "rpaas_instance", "zone", "status"})

var aggregationFailuresCounterVec = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "rate_limit_control_plane",
Name: "rpaas_instance_aggregation_failures_total",
Help: "Total number of aggregation failures for RPaaS instances",
}, []string{"rpaas_instance", "service_name", "zone", "error_type"})

var podHealthStatusGaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "rate_limit_control_plane",
Name: "rpaas_nginx_pod_health_status",
Help: "Health status of RPaaS nginx pods (1=healthy, 0=unhealthy)",
}, []string{"pod_name", "service_name", "rpaas_instance", "zone"})

// Performance/Throughput Metrics
var requestsPerSecondGaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "rate_limit_control_plane",
Name: "rpaas_nginx_pod_requests_per_second",
Help: "Current requests per second processed by RPaaS nginx pods",
}, []string{"pod_name", "service_name", "rpaas_instance", "zone"})

var zoneDataSizeHistogramVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "rate_limit_control_plane",
Name: "rpaas_zone_data_size_bytes",
Help: "Size distribution of zone data processed in bytes",
Buckets: prometheus.ExponentialBuckets(1024, 2, 12),
}, []string{"rpaas_instance", "service_name", "zone"})

var activeWorkersGaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "rate_limit_control_plane",
Name: "rpaas_active_workers_count",
Help: "Number of active workers by type",
}, []string{"service_name", "worker_type"})

// Rate Limiting Metrics
var rateLimitEntriesCounterVec = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "rate_limit_control_plane",
Name: "rpaas_rate_limit_entries_total",
Help: "Total number of rate limit entries by action",
}, []string{"rpaas_instance", "service_name", "zone", "action"})

var rateLimitRulesActiveGaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "rate_limit_control_plane",
Name: "rpaas_rate_limit_rules_active",
Help: "Number of active rate limit rules per zone",
}, []string{"rpaas_instance", "service_name", "zone"})

// System/Resource Metrics
var workerUptimeGaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "rate_limit_control_plane",
Name: "rpaas_worker_uptime_seconds",
Help: "Worker uptime in seconds",
}, []string{"worker_id", "worker_type", "rpaas_instance", "service_name"})

var zoneDataRepositoryMemoryGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "rate_limit_control_plane",
Name: "rpaas_zone_data_repository_memory_bytes",
Help: "Memory usage of the zone data repository in bytes",
})

// GetZoneDataRepositoryMemoryGauge returns the memory gauge for external use
func GetZoneDataRepositoryMemoryGauge() prometheus.Gauge {
return zoneDataRepositoryMemoryGauge
}

func init() {
metrics.Registry.MustRegister(readLatencyHistogramVec)
metrics.Registry.MustRegister(aggregateLatencyHistogramVec)

// Register error/reliability metrics
metrics.Registry.MustRegister(readOperationsCounterVec)
metrics.Registry.MustRegister(aggregationFailuresCounterVec)
metrics.Registry.MustRegister(podHealthStatusGaugeVec)

// Register performance/throughput metrics
metrics.Registry.MustRegister(requestsPerSecondGaugeVec)
metrics.Registry.MustRegister(zoneDataSizeHistogramVec)
metrics.Registry.MustRegister(activeWorkersGaugeVec)

// Register rate limiting metrics
metrics.Registry.MustRegister(rateLimitEntriesCounterVec)
metrics.Registry.MustRegister(rateLimitRulesActiveGaugeVec)

// Register system/resource metrics
metrics.Registry.MustRegister(workerUptimeGaugeVec)
metrics.Registry.MustRegister(zoneDataRepositoryMemoryGauge)
}
23 changes: 22 additions & 1 deletion internal/manager/rpaasInstanceSyncWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type RpaasInstanceSyncWorker struct {
zoneDataChan chan Optional[ratelimit.Zone]
notify chan ratelimit.RpaasZoneData
fullZones map[string]map[ratelimit.FullZoneKey]*ratelimit.RateLimitEntry
startTime time.Time
}

type RpaasInstanceSignals struct {
Expand All @@ -43,7 +44,7 @@ func NewRpaasInstanceSyncWorker(rpaasInstanceName, rpaasServiceName string, zone
ticker := time.NewTicker(config.Spec.ControllerMinutesInternal)
instanceLogger := logger.With("instanceName", rpaasInstanceName)

return &RpaasInstanceSyncWorker{
worker := &RpaasInstanceSyncWorker{
RpaasInstanceName: rpaasInstanceName,
RpaasServiceName: rpaasServiceName,
Zones: zones,
Expand All @@ -54,15 +55,31 @@ func NewRpaasInstanceSyncWorker(rpaasInstanceName, rpaasServiceName string, zone
zoneDataChan: make(chan Optional[ratelimit.Zone]),
notify: notify,
fullZones: make(map[string]map[ratelimit.FullZoneKey]*ratelimit.RateLimitEntry),
startTime: time.Now(),
}

// Initialize instance worker metrics
activeWorkersGaugeVec.WithLabelValues(rpaasServiceName, "instance").Inc()
workerUptimeGaugeVec.WithLabelValues(rpaasInstanceName, "instance", rpaasInstanceName, rpaasServiceName).Set(0)

return worker
}

func (w *RpaasInstanceSyncWorker) Work() {
// Update worker uptime periodically
uptimeTicker := time.NewTicker(30 * time.Second)
defer uptimeTicker.Stop()

for {
select {
case <-w.Ticker.C:
w.processTick()
case <-uptimeTicker.C:
uptime := time.Since(w.startTime).Seconds()
workerUptimeGaugeVec.WithLabelValues(w.RpaasInstanceName, "instance", w.RpaasInstanceName, w.RpaasServiceName).Set(uptime)
case <-w.RpaasInstanceSignals.StopChan:
// Decrement active worker count
activeWorkersGaugeVec.WithLabelValues(w.RpaasServiceName, "instance").Dec()
w.cleanup()
return
}
Expand Down Expand Up @@ -98,6 +115,7 @@ func (w *RpaasInstanceSyncWorker) processTick() {
result := <-w.zoneDataChan
if result.Error != nil {
w.logger.Error("Error getting zone data", "error", result.Error)
aggregationFailuresCounterVec.WithLabelValues(w.RpaasInstanceName, w.RpaasServiceName, zone, "collection_error").Inc()
continue
}
zoneData = append(zoneData, result.Value)
Expand Down Expand Up @@ -127,6 +145,9 @@ func (w *RpaasInstanceSyncWorker) processTick() {

rpaasZoneData.Data = append(rpaasZoneData.Data, aggregatedZone)

// Record rate limit entries metrics
rateLimitEntriesCounterVec.WithLabelValues(w.RpaasInstanceName, w.RpaasServiceName, zone, "aggregated").Add(float64(len(aggregatedZone.RateLimitEntries)))

if config.Spec.FeatureFlagPersistAggregatedData {
// Write aggregated data back to pod workers
w.PodWorkerManager.ForEachWorker(func(worker Worker) {
Expand Down
39 changes: 37 additions & 2 deletions internal/manager/rpaasPodWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ type RpaasPodWorker struct {
WriteZoneChan chan ratelimit.Zone
StopChan chan struct{}
RoundSmallestLast int64
startTime time.Time
}

func NewRpaasPodWorker(podURL, podName, rpaasInstanceName, rpaasServiceName string, logger *slog.Logger, zoneDataChan chan Optional[ratelimit.Zone]) *RpaasPodWorker {
podLogger := logger.With("podName", podName, "podURL", podURL)
return &RpaasPodWorker{
worker := &RpaasPodWorker{
PodURL: podURL,
PodName: podName,
RpaasInstanceName: rpaasInstanceName,
Expand All @@ -38,7 +39,14 @@ func NewRpaasPodWorker(podURL, podName, rpaasInstanceName, rpaasServiceName stri
ReadZoneChan: make(chan string),
WriteZoneChan: make(chan ratelimit.Zone),
StopChan: make(chan struct{}),
startTime: time.Now(),
}

// Initialize worker metrics
activeWorkersGaugeVec.WithLabelValues(rpaasServiceName, "pod").Inc()
workerUptimeGaugeVec.WithLabelValues(podName, "pod", rpaasInstanceName, rpaasServiceName).Set(0)

return worker
}

func (w *RpaasPodWorker) Start() {
Expand All @@ -48,6 +56,8 @@ func (w *RpaasPodWorker) Start() {
func (w *RpaasPodWorker) Stop() {
if w.StopChan != nil {
w.StopChan <- struct{}{}
// Decrement active worker count
activeWorkersGaugeVec.WithLabelValues(w.RpaasServiceName, "pod").Dec()
}
}

Expand All @@ -56,6 +66,10 @@ func (w *RpaasPodWorker) GetID() string {
}

func (w *RpaasPodWorker) Work() {
// Update worker uptime periodically
uptimeTicker := time.NewTicker(30 * time.Second)
defer uptimeTicker.Stop()

for {
select {
case zoneName := <-w.ReadZoneChan:
Expand All @@ -69,6 +83,9 @@ func (w *RpaasPodWorker) Work() {
}()
case <-w.WriteZoneChan:
// TODO: Implement the logic to write zone data to the pod
case <-uptimeTicker.C:
uptime := time.Since(w.startTime).Seconds()
workerUptimeGaugeVec.WithLabelValues(w.PodName, "pod", w.RpaasInstanceName, w.RpaasServiceName).Set(uptime)
case <-w.StopChan:
w.cleanup()
return
Expand All @@ -95,11 +112,19 @@ func (w *RpaasPodWorker) getZoneData(zone string) (ratelimit.Zone, error) {
}
start := time.Now()
response, err := http.DefaultClient.Do(req)
reqDuration := time.Since(start)

if err != nil {
// Record failed operation
readOperationsCounterVec.WithLabelValues(w.PodName, w.RpaasServiceName, w.RpaasInstanceName, zone, "error").Inc()
podHealthStatusGaugeVec.WithLabelValues(w.PodName, w.RpaasServiceName, w.RpaasInstanceName, zone).Set(0)
return ratelimit.Zone{}, fmt.Errorf("error making request to pod %s (%s): %w", w.PodURL, w.PodName, err)
}
reqDuration := time.Since(start)

// Record successful operation and latency
readOperationsCounterVec.WithLabelValues(w.PodName, w.RpaasServiceName, w.RpaasInstanceName, zone, "success").Inc()
readLatencyHistogramVec.WithLabelValues(w.PodName, w.RpaasServiceName, w.RpaasInstanceName, zone).Observe(reqDuration.Seconds())
podHealthStatusGaugeVec.WithLabelValues(w.PodName, w.RpaasServiceName, w.RpaasInstanceName, zone).Set(1)
if reqDuration > config.Spec.WarnZoneReadTime {
w.logger.Warn("Request took too long", "duration", reqDuration, "zone", zone, "contentLength", response.ContentLength)
}
Expand All @@ -116,6 +141,7 @@ func (w *RpaasPodWorker) getZoneData(zone string) (ratelimit.Zone, error) {
}, nil
}
w.logger.Error("Error decoding header", "error", err)
readOperationsCounterVec.WithLabelValues(w.PodName, w.RpaasServiceName, w.RpaasInstanceName, zone, "error").Inc()
return ratelimit.Zone{}, err
}
for {
Expand All @@ -125,6 +151,7 @@ func (w *RpaasPodWorker) getZoneData(zone string) (ratelimit.Zone, error) {
break
}
w.logger.Error("Error decoding entry", "error", err)
readOperationsCounterVec.WithLabelValues(w.PodName, w.RpaasServiceName, w.RpaasInstanceName, zone, "error").Inc()
return ratelimit.Zone{}, err
}
if w.RoundSmallestLast == 0 {
Expand All @@ -136,6 +163,14 @@ func (w *RpaasPodWorker) getZoneData(zone string) (ratelimit.Zone, error) {
rateLimitEntries = append(rateLimitEntries, message)
}
w.logger.Debug("Received rate limit entries", "zone", zone, "entries", len(rateLimitEntries))

// Record zone data size and rate limit rules count
zoneDataSize := response.ContentLength
if zoneDataSize > 0 {
zoneDataSizeHistogramVec.WithLabelValues(w.RpaasInstanceName, w.RpaasServiceName, zone).Observe(float64(zoneDataSize))
}
rateLimitRulesActiveGaugeVec.WithLabelValues(w.RpaasInstanceName, w.RpaasServiceName, zone).Set(float64(len(rateLimitEntries)))

return ratelimit.Zone{
Name: zone,
RateLimitHeader: rateLimitHeader,
Expand Down
11 changes: 9 additions & 2 deletions internal/repository/zone_data_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"encoding/json"
"log/slog"
"os"
"runtime"
"sync"

"github.com/tsuru/rate-limit-control-plane/internal/config"
"github.com/tsuru/rate-limit-control-plane/internal/logger"
"github.com/tsuru/rate-limit-control-plane/internal/manager"
"github.com/tsuru/rate-limit-control-plane/internal/ratelimit"
)

Expand Down Expand Up @@ -39,6 +41,8 @@ func (z *ZoneDataRepository) startReader() {

func (z *ZoneDataRepository) insert(rpaasZoneData ratelimit.RpaasZoneData) {
z.Lock()
defer z.Unlock()

serverData := []Data{}
for _, zone := range rpaasZoneData.Data {
for _, entry := range zone.RateLimitEntries {
Expand All @@ -54,11 +58,14 @@ func (z *ZoneDataRepository) insert(rpaasZoneData ratelimit.RpaasZoneData) {
dataBytes, err := json.MarshalIndent(serverData, " ", " ")
if err != nil {
z.logger.Error("Error marshaling JSON", "error", err)
z.Unlock()
return
}
z.Data[rpaasZoneData.RpaasName] = dataBytes
z.Unlock()

// Update repository memory usage metric
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
manager.GetZoneDataRepositoryMemoryGauge().Set(float64(memStats.HeapInuse))
}

func (z *ZoneDataRepository) GetRpaasZoneData(rpaasName string) ([]byte, bool) {
Expand Down