@@ -6,11 +6,31 @@ import (
6
6
"fmt"
7
7
"net/http"
8
8
"strings"
9
+ "time"
9
10
10
11
"github.com/dmitsh/pushgatewayredis/pkg/config"
11
12
"github.com/dmitsh/pushgatewayredis/pkg/redis"
12
13
"github.com/go-kit/kit/log"
13
14
"github.com/go-kit/kit/log/level"
15
+ "github.com/prometheus/client_golang/prometheus"
16
+ "github.com/prometheus/client_golang/prometheus/promauto"
17
+ "github.com/prometheus/client_golang/prometheus/promhttp"
18
+ )
19
+
20
+ var (
21
+ metricsCacheSize = promauto .NewGauge (
22
+ prometheus.GaugeOpts {
23
+ Name : "metrics_cache_size" ,
24
+ Help : "Number of metrics in cache." ,
25
+ })
26
+
27
+ metricsRequestDuration = promauto .NewHistogramVec (
28
+ prometheus.HistogramOpts {
29
+ Name : "metrics_request_duration_seconds" ,
30
+ Help : "Time (in seconds) spent serving metrics requests." ,
31
+ Buckets : prometheus .DefBuckets ,
32
+ },
33
+ []string {"status_code" })
14
34
)
15
35
16
36
type MetricsManager struct {
@@ -26,59 +46,20 @@ func NewMetricsManager(logger log.Logger, cfg *config.Config, db *redis.RedisCli
26
46
db : db ,
27
47
logger : logger ,
28
48
}
49
+
50
+ mux := http .NewServeMux ()
51
+ mux .HandleFunc (cfg .MetricsPath , mm .serveMetricsPath )
52
+ mux .HandleFunc (cfg .IngestPath , mm .serveIngestPath )
53
+ mux .Handle (cfg .TelemetryPath , promhttp .Handler ())
54
+ mux .HandleFunc ("/" , mm .serveDefault )
55
+
29
56
mm .server = & http.Server {
30
57
Addr : fmt .Sprintf (":%d" , cfg .Port ),
31
- Handler : mm ,
58
+ Handler : mux ,
32
59
}
33
60
return mm
34
61
}
35
62
36
- func (mm * MetricsManager ) ServeHTTP (w http.ResponseWriter , r * http.Request ) {
37
- switch r .URL .Path {
38
- case mm .cfg .MetricsPath :
39
- if r .Method != "GET" {
40
- http .Error (w , "Method is not supported." , http .StatusNotFound )
41
- return
42
- }
43
- metrics , err := mm .db .GetAll (r .Context ())
44
- if err != nil {
45
- level .Error (mm .logger ).Log ("msg" , "Redis error" , "err" , err )
46
- http .Error (w , "redis error" , http .StatusInternalServerError )
47
- return
48
- }
49
- fmt .Fprintf (w , strings .Join (metrics , "\n " ))
50
- case mm .cfg .IngestPath :
51
- if r .Method != "POST" {
52
- http .Error (w , "Method is not supported." , http .StatusNotFound )
53
- return
54
- }
55
- scanner := bufio .NewScanner (r .Body )
56
- keys , vals := []string {}, []string {}
57
- for scanner .Scan () {
58
- line := strings .TrimSpace (scanner .Text ())
59
- if strings .HasPrefix (line , "#" ) {
60
- continue
61
- }
62
- if indx := strings .LastIndex (line , " " ); indx != - 1 {
63
- keys = append (keys , line [:indx ])
64
- vals = append (vals , line [indx :])
65
- }
66
- }
67
- if err := scanner .Err (); err != nil {
68
- level .Error (mm .logger ).Log ("msg" , "Read error" , "err" , err )
69
- http .Error (w , "can't read body" , http .StatusBadRequest )
70
- return
71
- }
72
- if err := mm .db .MSet (r .Context (), keys , vals ); err != nil {
73
- level .Error (mm .logger ).Log ("msg" , "Redis error" , "err" , err )
74
- http .Error (w , "redis error" , http .StatusInternalServerError )
75
- return
76
- }
77
- default :
78
- http .Error (w , "404 not found." , http .StatusNotFound )
79
- }
80
- }
81
-
82
63
func (mm * MetricsManager ) Run () error {
83
64
if mm .cfg .TLSEnabled {
84
65
return mm .server .ListenAndServeTLS (mm .cfg .TLSCertPath , mm .cfg .TLSKeyPath )
@@ -89,3 +70,56 @@ func (mm *MetricsManager) Run() error {
89
70
func (mm * MetricsManager ) Close (ctx context.Context ) error {
90
71
return mm .server .Shutdown (ctx )
91
72
}
73
+
74
+ func (mm * MetricsManager ) serveMetricsPath (w http.ResponseWriter , r * http.Request ) {
75
+ start := time .Now ()
76
+ if r .Method != "GET" {
77
+ metricsRequestDuration .WithLabelValues ("405" ).Observe (time .Now ().Sub (start ).Seconds ())
78
+ http .Error (w , "Method is not supported." , http .StatusMethodNotAllowed )
79
+ return
80
+ }
81
+ metrics , err := mm .db .GetAll (r .Context ())
82
+ if err != nil {
83
+ level .Error (mm .logger ).Log ("msg" , "Redis error" , "err" , err )
84
+ metricsRequestDuration .WithLabelValues ("500" ).Observe (time .Now ().Sub (start ).Seconds ())
85
+ http .Error (w , "Redis error" , http .StatusInternalServerError )
86
+ return
87
+ }
88
+ metricsRequestDuration .WithLabelValues ("200" ).Observe (time .Now ().Sub (start ).Seconds ())
89
+ metricsCacheSize .Set (float64 (len (metrics )))
90
+ fmt .Fprintf (w , strings .Join (metrics , "\n " ))
91
+ }
92
+
93
+ func (mm * MetricsManager ) serveIngestPath (w http.ResponseWriter , r * http.Request ) {
94
+ if r .Method != "POST" {
95
+ http .Error (w , "Method is not supported." , http .StatusMethodNotAllowed )
96
+ return
97
+ }
98
+ scanner := bufio .NewScanner (r .Body )
99
+ keys , vals := []string {}, []string {}
100
+ for scanner .Scan () {
101
+ line := strings .TrimSpace (scanner .Text ())
102
+ if strings .HasPrefix (line , "#" ) {
103
+ continue
104
+ }
105
+ if indx := strings .LastIndex (line , " " ); indx != - 1 {
106
+ keys = append (keys , line [:indx ])
107
+ vals = append (vals , line [indx :])
108
+ }
109
+ }
110
+ if err := scanner .Err (); err != nil {
111
+ level .Error (mm .logger ).Log ("msg" , "Read error" , "err" , err )
112
+ http .Error (w , "Cannot read body" , http .StatusBadRequest )
113
+ return
114
+ }
115
+ if err := mm .db .MSet (r .Context (), keys , vals ); err != nil {
116
+ level .Error (mm .logger ).Log ("msg" , "Redis error" , "err" , err )
117
+ http .Error (w , "Redis error" , http .StatusInternalServerError )
118
+ return
119
+ }
120
+ }
121
+
122
+ func (mm * MetricsManager ) serveDefault (w http.ResponseWriter , r * http.Request ) {
123
+ level .Error (mm .logger ).Log ("msg" , "Unsupported path" , "url" , r .URL .Path )
124
+ http .Error (w , "404 page not found." , http .StatusNotFound )
125
+ }
0 commit comments