Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 46 additions & 2 deletions CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ The `retention_check_interval` is the interval at which `grok_exporter` checks f
Input Section
-------------

We currently support two input types: `file` and `stdin`. The following two sections describe the `file` input type and the `stdin` input type:
We currently support three input types: `file`, `stdin`, and `webhook`. The following three sections describe the input types respectively:

### File Input Type

Expand Down Expand Up @@ -114,6 +114,50 @@ the exporter will terminate as soon as `sample.log` is processed,
and we will not be able to access the result via HTTP(S) after that.
Always use a command that keeps the output open (like `tail -f`) when testing the `grok_exporter` with the `stdin` input.

### Webhook Input Type

The grok_exporter is capable of receive log entries from webhook sources. It supports webhook reception in various formats... plain-text or JSON, single entries or bulk entries.

The following input configuration example which demonstrates how to configure grok_exporter to receive HTTP webhooks from the [Logstash HTTP Output Plugin](https://www.elastic.co/guide/en/logstash/current/plugins-outputs-http.html) configured in `json_batch` mode, which allows the transmission of multiple json log entries in a single webhook.

```yaml
input:

type: webhook

# HTTP Path to POST the webhook
# Default is `/webhook`
webhook_path: /webhook

# HTTP Body POST Format
# text_single: Webhook POST body is a single plain text log entry
# text_bulk: Webhook POST body contains multiple plain text log entries
# separated by webhook_text_bulk_separator (default: \n\n)
# json_single: Webhook POST body is a single json log entry. Log entry
# text is selected from the value of a json key determined by
# webhook_json_selector.
# json_bulk: Webhook POST body contains multiple json log entries. The
# POST body envelope must be a json array "[ <entry>, <entry> ]". Log
# entry text is selected from the value of a json key determined by
# webhook_json_selector.
# Default is `text_single`
webhook_format: json_bulk

# JSON Path Selector
# Within an json log entry, text is selected from the value of this json selector
# Example ".path.to.element"
# Default is `.message`
webhook_json_selector: .message

# Bulk Text Separator
# Separator for text_bulk log entries
# Default is `\n\n`
webhook_text_bulk_separator: "\n\n"
```

This configuration example may be found in the examples directory
[here](example/config_logstash_http_input_ipv6.yml).

Grok Section
------------

Expand Down Expand Up @@ -411,7 +455,7 @@ server:
```

* `protocol` can be `http` or `https`. Default is `http`.
* `host` can be a hostname or an IP address. If host is specified, `grok_exporter` will listen on the network interface with the given address. If host is omitted, `grok_exporter` will listen on all available network interfaces.
* `host` can be a hostname or an IP address. If host is specified, `grok_exporter` will listen on the network interface with the given address. If host is omitted, `grok_exporter` will listen on all available network interfaces. If `host` is set to `[::]`, `grok_exporter` will listen on all IPV6 addresses.
* `port` is the TCP port to be used. Default is `9144`.
* `path` is the path where the metrics are exposed. Default is `/metrics`, i.e. by default metrics will be exported on [http://localhost:9144/metrics].
* `cert` is the path to the SSL certificate file for protocol `https`. It is optional. If omitted, a hard-coded default certificate will be used.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ server:
port: 9144
```

[CONFIG.md] describes the `grok_exporter` configuration file and shows how to define Grok patterns, Prometheus metrics, and labels.
[CONFIG.md] describes the `grok_exporter` configuration file and shows how to define Grok patterns, Prometheus metrics, and labels. It also details how to configure file, stdin, and webhook inputs.

Status
------
Expand Down
36 changes: 36 additions & 0 deletions config/v2/configV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
defaultRetentionCheckInterval = 53 * time.Second
inputTypeStdin = "stdin"
inputTypeFile = "file"
inputTypeWebhook = "webhook"
)

func Unmarshal(config []byte) (*Config, error) {
Expand Down Expand Up @@ -64,6 +65,10 @@ type InputConfig struct {
PollIntervalSeconds string `yaml:"poll_interval_seconds,omitempty"` // TODO: Use time.Duration directly
PollInterval time.Duration `yaml:"-"` // parsed version of PollIntervalSeconds
MaxLinesInBuffer int `yaml:"max_lines_in_buffer,omitempty"`
WebhookPath string `yaml:"webhook_path,omitempty"`
WebhookFormat string `yaml:"webhook_format,omitempty"`
WebhookJsonSelector string `yaml:"webhook_json_selector,omitempty"`
WebhookTextBulkSeparator string `yaml:"webhook_text_bulk_separator,omitempty"`
}

type GrokConfig struct {
Expand Down Expand Up @@ -127,6 +132,20 @@ func (c *InputConfig) addDefaults() {
if c.Type == inputTypeFile && len(c.FailOnMissingLogfileString) == 0 {
c.FailOnMissingLogfileString = "true"
}
if c.Type == inputTypeWebhook {
if len(c.WebhookPath) == 0 {
c.WebhookPath = "/webhook"
}
if len(c.WebhookFormat) == 0 {
c.WebhookFormat = "text_single"
}
if len(c.WebhookJsonSelector) == 0 {
c.WebhookJsonSelector = ".message"
}
if len(c.WebhookTextBulkSeparator) == 0 {
c.WebhookTextBulkSeparator = "\n\n"
}
}
}

func (c *GrokConfig) addDefaults() {}
Expand Down Expand Up @@ -195,6 +214,23 @@ func (c *InputConfig) validate() error {
return fmt.Errorf("invalid input configuration: '%v' is not a valid boolean value in 'input.fail_on_missing_logfile'", c.FailOnMissingLogfileString)
}
}
case c.Type == inputTypeWebhook:
if c.WebhookPath == "" {
return fmt.Errorf("invalid input configuration: 'input.webhook_path' is required for input type \"webhook\"")
} else if c.WebhookPath[0] != '/' {
return fmt.Errorf("invalid input configuration: 'input.webhook_path' must start with \"/\"")
}
if c.WebhookFormat != "text_single" && c.WebhookFormat != "text_bulk" && c.WebhookFormat != "json_single" && c.WebhookFormat != "json_bulk" {
return fmt.Errorf("invalid input configuration: 'input.webhook_format' must be \"text_single|text_bulk|json_single|json_bulk\"")
}
if c.WebhookJsonSelector == "" {
return fmt.Errorf("invalid input configuration: 'input.webhook_json_selector' is required for input type \"webhook\"")
} else if c.WebhookJsonSelector[0] != '.' {
return fmt.Errorf("invalid input configuration: 'input.webhook_json_selector' must start with \".\"")
}
if c.WebhookFormat == "text_bulk" && c.WebhookTextBulkSeparator == "" {
return fmt.Errorf("invalid input configuration: 'input.webhook_text_bulk_separator' is required for input type \"webhook\" and webhook_format \"text_bulk\"")
}
default:
return fmt.Errorf("unsupported 'input.type': %v", c.Type)
}
Expand Down
63 changes: 63 additions & 0 deletions example/config_logstash_http_input_ipv6.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# The grok_exporter is capable of receive log entries from webhook sources.
#
# This input configuration example demonstrates how to configure grok_exporter
# to receive HTTP webhooks from the Logstash HTTP Output Plugin configured in
# "json_batch" mode, which allows the transmission of multiple json log
# entries in a single webhook.
# https://www.elastic.co/guide/en/logstash/current/plugins-outputs-http.html
#
# The grok_exporter supports webhook reception in various formats. Plain-text
# or JSON, single entries or bulk entries. To learn more, read the comments
# in this example, and review the webhookTeailer_test.go unit tests.

global:
config_version: 2
input:

type: webhook

# HTTP Path to POST the webhook
# Default is `/webhook`
webhook_path: /webhook

# HTTP Body POST Format
# text_single: Webhook POST body is a single plain text log entry
# text_bulk: Webhook POST body contains multiple plain text log entries
# separated by webhook_text_bulk_separator (default: \n\n)
# json_single: Webhook POST body is a single json log entry. Log entry
# text is selected from the value of a json key determined by
# webhook_json_selector.
# json_bulk: Webhook POST body contains multiple json log entries. The
# POST body envelope must be a json array "[ <entry>, <entry> ]". Log
# entry text is selected from the value of a json key determined by
# webhook_json_selector.
# Default is `text_single`
webhook_format: json_bulk

# JSON Path Selector
# Within an json log entry, text is selected from the value of this json selector
# Example ".path.to.element"
# Default is `.message`
webhook_json_selector: .message

# Bulk Text Separator
# Separator for text_bulk log entries
# Default is `\n\n`
webhook_text_bulk_separator: "\n\n"

grok:
patterns_dir: ./logstash-patterns-core/patterns
additional_patterns:
- 'EXIM_MESSAGE [a-zA-Z ]*'
metrics:
- type: counter
name: exim_rejected_rcpt_total
help: Total number of rejected recipients, partitioned by error message.
match: '%{EXIM_DATE} %{EXIM_REMOTE_HOST} F=<%{EMAILADDRESS}> rejected RCPT <%{EMAILADDRESS}>: %{EXIM_MESSAGE:message}'
labels:
error_message: '{{.message}}'
server:
# For IPV4 localhost use "0.0.0.0"
# For IPV6 localhost use "[::]"
host: "[::]"
port: 9144
22 changes: 16 additions & 6 deletions exporter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
"os"
)

type HttpServerPathHandler struct {
Path string
Handler http.Handler
}

// cert and key created with openssl req -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -nodes

const defaultCert = `-----BEGIN CERTIFICATE-----
Expand Down Expand Up @@ -77,7 +82,7 @@ XLgD9hrDBrTbnKBHHQ6MHpT6ILi4w/e4+5XEUUOBf44ZJE71uRr4ZUA=
-----END RSA PRIVATE KEY-----
`

func RunHttpsServerWithDefaultKeys(host string, port int, path string, handler http.Handler) error {
func RunHttpsServerWithDefaultKeys(host string, port int, httpHandlers []HttpServerPathHandler) error {
cert, err := createTempFile("cert", []byte(defaultCert))
if err != nil {
return err
Expand All @@ -88,24 +93,29 @@ func RunHttpsServerWithDefaultKeys(host string, port int, path string, handler h
return err
}
defer os.Remove(key)
return RunHttpsServer(host, port, cert, key, path, handler)
return RunHttpsServer(host, port, cert, key, httpHandlers)
}

func RunHttpsServer(host string, port int, cert, key, path string, handler http.Handler) error {
func RunHttpsServer(host string, port int, cert string, key string, httpHandlers []HttpServerPathHandler) error {
err := tryOpenPort(host, port)
if err != nil {
return listenFailedError(host, port, err)
}
http.Handle(path, handler)
for _, httpHandler := range httpHandlers {
http.Handle(httpHandler.Path, httpHandler.Handler)
}
return http.ListenAndServeTLS(fmt.Sprintf(":%v", port), cert, key, nil)
}

func RunHttpServer(host string, port int, path string, handler http.Handler) error {
func RunHttpServer(host string, port int, httpHandlers []HttpServerPathHandler) error {
err := tryOpenPort(host, port)
if err != nil {
return listenFailedError(host, port, err)
}
http.Handle(path, handler)
for _, httpHandler := range httpHandlers {
http.Handle(httpHandler.Path, httpHandler.Handler)
}

return http.ListenAndServe(fmt.Sprintf("%v:%v", host, port), nil)
}

Expand Down
40 changes: 31 additions & 9 deletions grok_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"github.com/fstab/grok_exporter/tailer/glob"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"net/http"
"os"
"strings"
"time"
)

Expand Down Expand Up @@ -70,8 +70,20 @@ func main() {

tail, err := startTailer(cfg)
exitOnError(err)
fmt.Print(startMsg(cfg))
serverErrors := startServer(cfg.Server, prometheus.Handler())

// gather up the handlers with which to start the webserver
httpHandlers := []exporter.HttpServerPathHandler{}
httpHandlers = append(httpHandlers, exporter.HttpServerPathHandler{
Path: cfg.Server.Path,
Handler: prometheus.Handler()})
if cfg.Input.Type == "webhook" {
httpHandlers = append(httpHandlers, exporter.HttpServerPathHandler{
Path: cfg.Input.WebhookPath,
Handler: tailer.WebhookHandler()})
}

fmt.Print(startMsg(cfg, httpHandlers))
serverErrors := startServer(cfg.Server, httpHandlers)

retentionTicker := time.NewTicker(cfg.Global.RetentionCheckInterval)

Expand Down Expand Up @@ -126,7 +138,7 @@ func main() {
}
}

func startMsg(cfg *v2.Config) string {
func startMsg(cfg *v2.Config, httpHandlers []exporter.HttpServerPathHandler) string {
host := "localhost"
if len(cfg.Server.Host) > 0 {
host = cfg.Server.Host
Expand All @@ -136,7 +148,15 @@ func startMsg(cfg *v2.Config) string {
host = hostname
}
}
return fmt.Sprintf("Starting server on %v://%v:%v%v\n", cfg.Server.Protocol, host, cfg.Server.Port, cfg.Server.Path)

var sb strings.Builder
baseUrl := fmt.Sprintf("%v://%v:%v", cfg.Server.Protocol, host, cfg.Server.Port)
sb.WriteString(fmt.Sprintf("Starting server on %v", baseUrl))
for _, httpHandler := range httpHandlers {
sb.WriteString(fmt.Sprintf("\n %v%v", baseUrl, httpHandler.Path))
}
sb.WriteString("\n")
return sb.String()
}

func exitOnError(err error) {
Expand Down Expand Up @@ -251,17 +271,17 @@ func initSelfMonitoring(metrics []exporter.Metric) (*prometheus.CounterVec, *pro
return nLinesTotal, nMatchesByMetric, procTimeMicrosecondsByMetric, nErrorsByMetric
}

func startServer(cfg v2.ServerConfig, handler http.Handler) chan error {
func startServer(cfg v2.ServerConfig, httpHandlers []exporter.HttpServerPathHandler) chan error {
serverErrors := make(chan error)
go func() {
switch {
case cfg.Protocol == "http":
serverErrors <- exporter.RunHttpServer(cfg.Host, cfg.Port, cfg.Path, handler)
serverErrors <- exporter.RunHttpServer(cfg.Host, cfg.Port, httpHandlers)
case cfg.Protocol == "https":
if cfg.Cert != "" && cfg.Key != "" {
serverErrors <- exporter.RunHttpsServer(cfg.Host, cfg.Port, cfg.Cert, cfg.Key, cfg.Path, handler)
serverErrors <- exporter.RunHttpsServer(cfg.Host, cfg.Port, cfg.Cert, cfg.Key, httpHandlers)
} else {
serverErrors <- exporter.RunHttpsServerWithDefaultKeys(cfg.Host, cfg.Port, cfg.Path, handler)
serverErrors <- exporter.RunHttpsServerWithDefaultKeys(cfg.Host, cfg.Port, httpHandlers)
}
default:
// This cannot happen, because cfg.validate() makes sure that protocol is either http or https.
Expand All @@ -288,6 +308,8 @@ func startTailer(cfg *v2.Config) (fswatcher.FileTailer, error) {
}
case cfg.Input.Type == "stdin":
tail = tailer.RunStdinTailer()
case cfg.Input.Type == "webhook":
tail = tailer.InitWebhookTailer(&cfg.Input)
default:
return nil, fmt.Errorf("Config error: Input type '%v' unknown.", cfg.Input.Type)
}
Expand Down
Loading