Skip to content

Commit ba8d15c

Browse files
authored
Merge pull request #61 from dcwangmit01/webhook
Added Webhook Input Type Support
2 parents 7ae298d + c94501e commit ba8d15c

File tree

8 files changed

+612
-18
lines changed

8 files changed

+612
-18
lines changed

CONFIG.md

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ The `retention_check_interval` is the interval at which `grok_exporter` checks f
5656
Input Section
5757
-------------
5858

59-
We currently support two input types: `file` and `stdin`. The following two sections describe the `file` input type and the `stdin` input type:
59+
We currently support three input types: `file`, `stdin`, and `webhook`. The following three sections describe the input types respectively:
6060

6161
### File Input Type
6262

@@ -114,6 +114,50 @@ the exporter will terminate as soon as `sample.log` is processed,
114114
and we will not be able to access the result via HTTP(S) after that.
115115
Always use a command that keeps the output open (like `tail -f`) when testing the `grok_exporter` with the `stdin` input.
116116

117+
### Webhook Input Type
118+
119+
The grok_exporter is capable of receive log entries from webhook sources. It supports webhook reception in various formats... plain-text or JSON, single entries or bulk entries.
120+
121+
The following input configuration example which demonstrates how to configure grok_exporter to receive HTTP webhooks from the [Logstash HTTP Output Plugin](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-http.html) configured in `json_batch` mode, which allows the transmission of multiple json log entries in a single webhook.
122+
123+
```yaml
124+
input:
125+
126+
type: webhook
127+
128+
# HTTP Path to POST the webhook
129+
# Default is `/webhook`
130+
webhook_path: /webhook
131+
132+
# HTTP Body POST Format
133+
# text_single: Webhook POST body is a single plain text log entry
134+
# text_bulk: Webhook POST body contains multiple plain text log entries
135+
# separated by webhook_text_bulk_separator (default: \n\n)
136+
# json_single: Webhook POST body is a single json log entry. Log entry
137+
# text is selected from the value of a json key determined by
138+
# webhook_json_selector.
139+
# json_bulk: Webhook POST body contains multiple json log entries. The
140+
# POST body envelope must be a json array "[ <entry>, <entry> ]". Log
141+
# entry text is selected from the value of a json key determined by
142+
# webhook_json_selector.
143+
# Default is `text_single`
144+
webhook_format: json_bulk
145+
146+
# JSON Path Selector
147+
# Within an json log entry, text is selected from the value of this json selector
148+
# Example ".path.to.element"
149+
# Default is `.message`
150+
webhook_json_selector: .message
151+
152+
# Bulk Text Separator
153+
# Separator for text_bulk log entries
154+
# Default is `\n\n`
155+
webhook_text_bulk_separator: "\n\n"
156+
```
157+
158+
This configuration example may be found in the examples directory
159+
[here](example/config_logstash_http_input_ipv6.yml).
160+
117161
Grok Section
118162
------------
119163
@@ -411,7 +455,7 @@ server:
411455
```
412456
413457
* `protocol` can be `http` or `https`. Default is `http`.
414-
* `host` can be a hostname or an IP address. If host is specified, `grok_exporter` will listen on the network interface with the given address. If host is omitted, `grok_exporter` will listen on all available network interfaces.
458+
* `host` can be a hostname or an IP address. If host is specified, `grok_exporter` will listen on the network interface with the given address. If host is omitted, `grok_exporter` will listen on all available network interfaces. If `host` is set to `[::]`, `grok_exporter` will listen on all IPV6 addresses.
415459
* `port` is the TCP port to be used. Default is `9144`.
416460
* `path` is the path where the metrics are exposed. Default is `/metrics`, i.e. by default metrics will be exported on [http://localhost:9144/metrics].
417461
* `cert` is the path to the SSL certificate file for protocol `https`. It is optional. If omitted, a hard-coded default certificate will be used.

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ server:
5454
port: 9144
5555
```
5656
57-
[CONFIG.md] describes the `grok_exporter` configuration file and shows how to define Grok patterns, Prometheus metrics, and labels.
57+
[CONFIG.md] describes the `grok_exporter` configuration file and shows how to define Grok patterns, Prometheus metrics, and labels. It also details how to configure file, stdin, and webhook inputs.
5858

5959
Status
6060
------

config/v2/configV2.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const (
2727
defaultRetentionCheckInterval = 53 * time.Second
2828
inputTypeStdin = "stdin"
2929
inputTypeFile = "file"
30+
inputTypeWebhook = "webhook"
3031
)
3132

3233
func Unmarshal(config []byte) (*Config, error) {
@@ -64,6 +65,10 @@ type InputConfig struct {
6465
PollIntervalSeconds string `yaml:"poll_interval_seconds,omitempty"` // TODO: Use time.Duration directly
6566
PollInterval time.Duration `yaml:"-"` // parsed version of PollIntervalSeconds
6667
MaxLinesInBuffer int `yaml:"max_lines_in_buffer,omitempty"`
68+
WebhookPath string `yaml:"webhook_path,omitempty"`
69+
WebhookFormat string `yaml:"webhook_format,omitempty"`
70+
WebhookJsonSelector string `yaml:"webhook_json_selector,omitempty"`
71+
WebhookTextBulkSeparator string `yaml:"webhook_text_bulk_separator,omitempty"`
6772
}
6873

6974
type GrokConfig struct {
@@ -127,6 +132,20 @@ func (c *InputConfig) addDefaults() {
127132
if c.Type == inputTypeFile && len(c.FailOnMissingLogfileString) == 0 {
128133
c.FailOnMissingLogfileString = "true"
129134
}
135+
if c.Type == inputTypeWebhook {
136+
if len(c.WebhookPath) == 0 {
137+
c.WebhookPath = "/webhook"
138+
}
139+
if len(c.WebhookFormat) == 0 {
140+
c.WebhookFormat = "text_single"
141+
}
142+
if len(c.WebhookJsonSelector) == 0 {
143+
c.WebhookJsonSelector = ".message"
144+
}
145+
if len(c.WebhookTextBulkSeparator) == 0 {
146+
c.WebhookTextBulkSeparator = "\n\n"
147+
}
148+
}
130149
}
131150

132151
func (c *GrokConfig) addDefaults() {}
@@ -195,6 +214,23 @@ func (c *InputConfig) validate() error {
195214
return fmt.Errorf("invalid input configuration: '%v' is not a valid boolean value in 'input.fail_on_missing_logfile'", c.FailOnMissingLogfileString)
196215
}
197216
}
217+
case c.Type == inputTypeWebhook:
218+
if c.WebhookPath == "" {
219+
return fmt.Errorf("invalid input configuration: 'input.webhook_path' is required for input type \"webhook\"")
220+
} else if c.WebhookPath[0] != '/' {
221+
return fmt.Errorf("invalid input configuration: 'input.webhook_path' must start with \"/\"")
222+
}
223+
if c.WebhookFormat != "text_single" && c.WebhookFormat != "text_bulk" && c.WebhookFormat != "json_single" && c.WebhookFormat != "json_bulk" {
224+
return fmt.Errorf("invalid input configuration: 'input.webhook_format' must be \"text_single|text_bulk|json_single|json_bulk\"")
225+
}
226+
if c.WebhookJsonSelector == "" {
227+
return fmt.Errorf("invalid input configuration: 'input.webhook_json_selector' is required for input type \"webhook\"")
228+
} else if c.WebhookJsonSelector[0] != '.' {
229+
return fmt.Errorf("invalid input configuration: 'input.webhook_json_selector' must start with \".\"")
230+
}
231+
if c.WebhookFormat == "text_bulk" && c.WebhookTextBulkSeparator == "" {
232+
return fmt.Errorf("invalid input configuration: 'input.webhook_text_bulk_separator' is required for input type \"webhook\" and webhook_format \"text_bulk\"")
233+
}
198234
default:
199235
return fmt.Errorf("unsupported 'input.type': %v", c.Type)
200236
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# The grok_exporter is capable of receive log entries from webhook sources.
2+
#
3+
# This input configuration example demonstrates how to configure grok_exporter
4+
# to receive HTTP webhooks from the Logstash HTTP Output Plugin configured in
5+
# "json_batch" mode, which allows the transmission of multiple json log
6+
# entries in a single webhook.
7+
# https://www.elastic.co/guide/en/logstash/current/plugins-outputs-http.html
8+
#
9+
# The grok_exporter supports webhook reception in various formats. Plain-text
10+
# or JSON, single entries or bulk entries. To learn more, read the comments
11+
# in this example, and review the webhookTeailer_test.go unit tests.
12+
13+
global:
14+
config_version: 2
15+
input:
16+
17+
type: webhook
18+
19+
# HTTP Path to POST the webhook
20+
# Default is `/webhook`
21+
webhook_path: /webhook
22+
23+
# HTTP Body POST Format
24+
# text_single: Webhook POST body is a single plain text log entry
25+
# text_bulk: Webhook POST body contains multiple plain text log entries
26+
# separated by webhook_text_bulk_separator (default: \n\n)
27+
# json_single: Webhook POST body is a single json log entry. Log entry
28+
# text is selected from the value of a json key determined by
29+
# webhook_json_selector.
30+
# json_bulk: Webhook POST body contains multiple json log entries. The
31+
# POST body envelope must be a json array "[ <entry>, <entry> ]". Log
32+
# entry text is selected from the value of a json key determined by
33+
# webhook_json_selector.
34+
# Default is `text_single`
35+
webhook_format: json_bulk
36+
37+
# JSON Path Selector
38+
# Within an json log entry, text is selected from the value of this json selector
39+
# Example ".path.to.element"
40+
# Default is `.message`
41+
webhook_json_selector: .message
42+
43+
# Bulk Text Separator
44+
# Separator for text_bulk log entries
45+
# Default is `\n\n`
46+
webhook_text_bulk_separator: "\n\n"
47+
48+
grok:
49+
patterns_dir: ./logstash-patterns-core/patterns
50+
additional_patterns:
51+
- 'EXIM_MESSAGE [a-zA-Z ]*'
52+
metrics:
53+
- type: counter
54+
name: exim_rejected_rcpt_total
55+
help: Total number of rejected recipients, partitioned by error message.
56+
match: '%{EXIM_DATE} %{EXIM_REMOTE_HOST} F=<%{EMAILADDRESS}> rejected RCPT <%{EMAILADDRESS}>: %{EXIM_MESSAGE:message}'
57+
labels:
58+
error_message: '{{.message}}'
59+
server:
60+
# For IPV4 localhost use "0.0.0.0"
61+
# For IPV6 localhost use "[::]"
62+
host: "[::]"
63+
port: 9144

exporter/server.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ import (
2222
"os"
2323
)
2424

25+
type HttpServerPathHandler struct {
26+
Path string
27+
Handler http.Handler
28+
}
29+
2530
// cert and key created with openssl req -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -nodes
2631

2732
const defaultCert = `-----BEGIN CERTIFICATE-----
@@ -77,7 +82,7 @@ XLgD9hrDBrTbnKBHHQ6MHpT6ILi4w/e4+5XEUUOBf44ZJE71uRr4ZUA=
7782
-----END RSA PRIVATE KEY-----
7883
`
7984

80-
func RunHttpsServerWithDefaultKeys(host string, port int, path string, handler http.Handler) error {
85+
func RunHttpsServerWithDefaultKeys(host string, port int, httpHandlers []HttpServerPathHandler) error {
8186
cert, err := createTempFile("cert", []byte(defaultCert))
8287
if err != nil {
8388
return err
@@ -88,24 +93,29 @@ func RunHttpsServerWithDefaultKeys(host string, port int, path string, handler h
8893
return err
8994
}
9095
defer os.Remove(key)
91-
return RunHttpsServer(host, port, cert, key, path, handler)
96+
return RunHttpsServer(host, port, cert, key, httpHandlers)
9297
}
9398

94-
func RunHttpsServer(host string, port int, cert, key, path string, handler http.Handler) error {
99+
func RunHttpsServer(host string, port int, cert string, key string, httpHandlers []HttpServerPathHandler) error {
95100
err := tryOpenPort(host, port)
96101
if err != nil {
97102
return listenFailedError(host, port, err)
98103
}
99-
http.Handle(path, handler)
104+
for _, httpHandler := range httpHandlers {
105+
http.Handle(httpHandler.Path, httpHandler.Handler)
106+
}
100107
return http.ListenAndServeTLS(fmt.Sprintf(":%v", port), cert, key, nil)
101108
}
102109

103-
func RunHttpServer(host string, port int, path string, handler http.Handler) error {
110+
func RunHttpServer(host string, port int, httpHandlers []HttpServerPathHandler) error {
104111
err := tryOpenPort(host, port)
105112
if err != nil {
106113
return listenFailedError(host, port, err)
107114
}
108-
http.Handle(path, handler)
115+
for _, httpHandler := range httpHandlers {
116+
http.Handle(httpHandler.Path, httpHandler.Handler)
117+
}
118+
109119
return http.ListenAndServe(fmt.Sprintf("%v:%v", host, port), nil)
110120
}
111121

grok_exporter.go

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import (
2626
"github.com/fstab/grok_exporter/tailer/glob"
2727
"github.com/prometheus/client_golang/prometheus"
2828
"github.com/sirupsen/logrus"
29-
"net/http"
3029
"os"
30+
"strings"
3131
"time"
3232
)
3333

@@ -70,8 +70,20 @@ func main() {
7070

7171
tail, err := startTailer(cfg)
7272
exitOnError(err)
73-
fmt.Print(startMsg(cfg))
74-
serverErrors := startServer(cfg.Server, prometheus.Handler())
73+
74+
// gather up the handlers with which to start the webserver
75+
httpHandlers := []exporter.HttpServerPathHandler{}
76+
httpHandlers = append(httpHandlers, exporter.HttpServerPathHandler{
77+
Path: cfg.Server.Path,
78+
Handler: prometheus.Handler()})
79+
if cfg.Input.Type == "webhook" {
80+
httpHandlers = append(httpHandlers, exporter.HttpServerPathHandler{
81+
Path: cfg.Input.WebhookPath,
82+
Handler: tailer.WebhookHandler()})
83+
}
84+
85+
fmt.Print(startMsg(cfg, httpHandlers))
86+
serverErrors := startServer(cfg.Server, httpHandlers)
7587

7688
retentionTicker := time.NewTicker(cfg.Global.RetentionCheckInterval)
7789

@@ -126,7 +138,7 @@ func main() {
126138
}
127139
}
128140

129-
func startMsg(cfg *v2.Config) string {
141+
func startMsg(cfg *v2.Config, httpHandlers []exporter.HttpServerPathHandler) string {
130142
host := "localhost"
131143
if len(cfg.Server.Host) > 0 {
132144
host = cfg.Server.Host
@@ -136,7 +148,15 @@ func startMsg(cfg *v2.Config) string {
136148
host = hostname
137149
}
138150
}
139-
return fmt.Sprintf("Starting server on %v://%v:%v%v\n", cfg.Server.Protocol, host, cfg.Server.Port, cfg.Server.Path)
151+
152+
var sb strings.Builder
153+
baseUrl := fmt.Sprintf("%v://%v:%v", cfg.Server.Protocol, host, cfg.Server.Port)
154+
sb.WriteString(fmt.Sprintf("Starting server on %v", baseUrl))
155+
for _, httpHandler := range httpHandlers {
156+
sb.WriteString(fmt.Sprintf("\n %v%v", baseUrl, httpHandler.Path))
157+
}
158+
sb.WriteString("\n")
159+
return sb.String()
140160
}
141161

142162
func exitOnError(err error) {
@@ -251,17 +271,17 @@ func initSelfMonitoring(metrics []exporter.Metric) (*prometheus.CounterVec, *pro
251271
return nLinesTotal, nMatchesByMetric, procTimeMicrosecondsByMetric, nErrorsByMetric
252272
}
253273

254-
func startServer(cfg v2.ServerConfig, handler http.Handler) chan error {
274+
func startServer(cfg v2.ServerConfig, httpHandlers []exporter.HttpServerPathHandler) chan error {
255275
serverErrors := make(chan error)
256276
go func() {
257277
switch {
258278
case cfg.Protocol == "http":
259-
serverErrors <- exporter.RunHttpServer(cfg.Host, cfg.Port, cfg.Path, handler)
279+
serverErrors <- exporter.RunHttpServer(cfg.Host, cfg.Port, httpHandlers)
260280
case cfg.Protocol == "https":
261281
if cfg.Cert != "" && cfg.Key != "" {
262-
serverErrors <- exporter.RunHttpsServer(cfg.Host, cfg.Port, cfg.Cert, cfg.Key, cfg.Path, handler)
282+
serverErrors <- exporter.RunHttpsServer(cfg.Host, cfg.Port, cfg.Cert, cfg.Key, httpHandlers)
263283
} else {
264-
serverErrors <- exporter.RunHttpsServerWithDefaultKeys(cfg.Host, cfg.Port, cfg.Path, handler)
284+
serverErrors <- exporter.RunHttpsServerWithDefaultKeys(cfg.Host, cfg.Port, httpHandlers)
265285
}
266286
default:
267287
// This cannot happen, because cfg.validate() makes sure that protocol is either http or https.
@@ -288,6 +308,8 @@ func startTailer(cfg *v2.Config) (fswatcher.FileTailer, error) {
288308
}
289309
case cfg.Input.Type == "stdin":
290310
tail = tailer.RunStdinTailer()
311+
case cfg.Input.Type == "webhook":
312+
tail = tailer.InitWebhookTailer(&cfg.Input)
291313
default:
292314
return nil, fmt.Errorf("Config error: Input type '%v' unknown.", cfg.Input.Type)
293315
}

0 commit comments

Comments
 (0)