-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This is the first step in adding the Elasticsearch exporter. Initially we will only support the Logs exporter interface, and potentially will add metrics in the future as well. This change only provides some boilerplate initializing the exporter. But the exporter is not yet usable (or part of) any opentelemetry collector distribution. The elasticsearch exporter is based on the official [go-elasticsearch](https://github.com/elastic/go-elasticsearch) client. We will use the BulkIndexer provided by the client for event publishing. The client and BulkIndexer provide some support for retrying already. The Elasticsearch Bulk API can report errors at the HTTP level, but uses selective ACKs for individual events. This allows us to retry only failed events and/or reject events that can not be indexed (e.g. due to an mapping error). The 429 error code might even inidcate that we should backoff a little before retrying. **Link to tracking Issue:** #1800 **Testing:** Only configuration loading and validation tests have been added so far. The exporter currently panics when trying to publish events. More unit and integration tests will be added in the future. **Documentation:** All settings that will be available initially are documented in the README.md file.
- Loading branch information
urso
committed
Feb 10, 2021
1 parent
ffce884
commit 66f7074
Showing
11 changed files
with
2,251 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include ../../Makefile.Common |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
# Elasticsearch Exporter | ||
|
||
This exporter supports sending OpenTelemetry logs to [Elasticsearch](https://www.elastic.co/elasticsearch). | ||
|
||
## Configuration options | ||
|
||
- `urls`: List of Elasticsearch URLS. If urls and cloudid is missing the | ||
ELASTICSEARCH_URL environment variable will be used. | ||
- `cloudid` (optional): | ||
[ID](https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html) of the | ||
Elastic Cloud Cluster to publish events to. The `cloudid` can be used instead | ||
of `urls`. | ||
- `workers` (optional): Number of workers publishing bulk requests concurrently. | ||
- `index`: The | ||
[index](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices.html) | ||
or [datastream](https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html) | ||
name to publish events to. The default value is `logs-generic-default`. | ||
- `pipeline` (optional): Optional [Ingest Node](https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html) | ||
pipeline ID used for processing documents published by the exporter. | ||
- `flush`: Event bulk buffer flush settings | ||
- `bytes` (default=5242880): Write buffer flush limit. | ||
- `interval` (default=30s): Write buffer time limit. | ||
- `retry`: Event retry settings | ||
- `enabled` (default=true): Enable/Disable event retry on error. Retry | ||
support is enabled by default. | ||
- `max` (default=3): Number of HTTP retry attempts. | ||
- `initial_interval` (default=100ms): Initial waiting time if a HTTP request failed. | ||
- `max_interval` (default=1m): Max waiting time if a HTTP request failed. | ||
- `mapping`: Events are encoded to JSON. The `mapping` allows users to | ||
configure additional mapping rules. | ||
- `mode` (default=ecs): The fields naming mode. valid modes are: | ||
- `none`: Use original fields and event structure from the OTLP event. | ||
- `ecs`: Try to map fields defined in the | ||
[OpenTelemetry Semantic Conventions](https://github.com/open-telemetry/opentelemetry-specification/tree/main/semantic_conventions) | ||
to [Elastic Common Schema (ECS)](https://www.elastic.co/guide/en/ecs/current/index.html). | ||
- `fields` (optional): Configure additional fields mappings. | ||
- `file` (optional): Read additional field mappings from the provided YAML file. | ||
- `dedup` (default=true): Try to find and remove duplicate fields/attributes | ||
from events before publishing to Elasticsearch. Some structured logging | ||
libraries can produce duplicate fields (for example zap). Elasticsearch | ||
will reject documents that have duplicate fields. | ||
- `dedot` (default=true): When enabled attributes with `.` will be split into | ||
proper json objects. | ||
|
||
### HTTP settings | ||
|
||
- `read_buffer_size` (default=0): Read buffer size. | ||
- `write_buffer_size` (default=0): Write buffer size used when. | ||
- `timeout` (default=90s): HTTP request time limit. | ||
- `headers` (optional): Headers to be send with each HTTP request. | ||
|
||
### Security and Authentication settings | ||
|
||
- `user` (optional): Username used for HTTP Basic Authentication. | ||
- `password` (optional): Password used for HTTP Basic Authentication. | ||
- `api_key` (optional): Authorization [API Key](https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html). | ||
- `ca_file` (optional): Root Certificate Authority (CA) certificate, for | ||
verifying the server's identity, if TLS is enabled. | ||
- `cert_file` (optional): Client TLS certificate. | ||
- `key_file` (optional): Client TLS key. | ||
- `insecure` (optional): Disable verification of the server's identity, if TLS | ||
is enabled. | ||
|
||
### Node Discovery | ||
|
||
The Elasticsearch Exporter will check Elasticsearch regularily for available | ||
nodes and updates the list of hosts if discovery is enabled. Newly discovered | ||
nodes will automatically be used for load balancing. | ||
|
||
- `discover`: | ||
- `on_start` (optional): If enabled the exporter queries Elasticsearch | ||
for all known nodes in the cluster on startup. | ||
- `interval` (optional): Interval to update the list of Elasticsearch nodes. | ||
|
||
## Example | ||
|
||
```yaml | ||
exporters: | ||
elasticsearch: | ||
urls: | ||
- "https://localhost:9200" | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,217 @@ | ||
// Copyright 2020, OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package elasticsearchexporter | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"strings" | ||
"time" | ||
|
||
"go.opentelemetry.io/collector/config/configmodels" | ||
"go.opentelemetry.io/collector/config/configtls" | ||
) | ||
|
||
// Config defines configuration for Elastic exporter. | ||
type Config struct { | ||
configmodels.ExporterSettings `mapstructure:",squash"` | ||
|
||
// URLs holds the Elasticsearch URLs the exporter should send events to. | ||
// | ||
// This setting is required if CloudID is not set and if the | ||
// ELASTICSEARCH_URL environment variable is not set. | ||
URLs []string `mapstructure:"urls"` | ||
|
||
// CloudID holds the cloud ID to identify the Elastic Cloud cluster to send events to. | ||
// https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html | ||
// | ||
// This setting is required if no URL is configured. | ||
CloudID string `mapstructure:"cloudid"` | ||
|
||
// Workers configures the number of workers publishing bulk requests. | ||
Workers int `mapstructure:"workers"` | ||
|
||
// Index configures the index, index alias, or data stream name events should be indexed in. | ||
// | ||
// https://www.elastic.co/guide/en/elasticsearch/reference/current/indices.html | ||
// https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html | ||
// | ||
// This setting is required. | ||
Index string `mapstructure:"index"` | ||
|
||
// Pipeline configures the ingest node pipeline name that should be used to process the | ||
// events. | ||
// | ||
// https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html | ||
Pipeline string `mapstructure:"pipeline"` | ||
|
||
HTTPClientSettings `mapstructure:",squash"` | ||
Discovery DiscoverySettings `mapstructure:"discover"` | ||
Retry RetrySettings `mapstructure:"retry"` | ||
Flush FlushSettings `mapstructure:"flush"` | ||
Mapping MappingsSettings `mapstructure:"mapping"` | ||
} | ||
|
||
type HTTPClientSettings struct { | ||
Authentication AuthenticationSettings `mapstructure:",squash"` | ||
|
||
// ReadBufferSize for HTTP client. See http.Transport.ReadBufferSize. | ||
ReadBufferSize int `mapstructure:"read_buffer_size"` | ||
|
||
// WriteBufferSize for HTTP client. See http.Transport.WriteBufferSize. | ||
WriteBufferSize int `mapstructure:"write_buffer_size"` | ||
|
||
// Timeout configures the HTTP request timeout. | ||
Timeout time.Duration `mapstructure:"timeout"` | ||
|
||
// Headers allows users to configure optional HTTP headers that | ||
// will be send with each HTTP request. | ||
Headers map[string]string `mapstructure:"headers,omitempty"` | ||
|
||
configtls.TLSClientSetting `mapstructure:",squash"` | ||
} | ||
|
||
// AuthenticationSettings defines user authentication related settings. | ||
type AuthenticationSettings struct { | ||
// User is used to configure HTTP Basic Authentication. | ||
User string `mapstructure:"user"` | ||
|
||
// Password is used to configure HTTP Basic Authentication. | ||
Password string `mapstructure:"password"` | ||
|
||
// APIKey is used to configure ApiKey based Authentication. | ||
// | ||
// https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html | ||
APIKey string `mapstructure:"api_key"` | ||
} | ||
|
||
// DiscoverySettings defines Elasticsearch node discovery related settings. | ||
// The exporter will check Elasticsearch regularily for available nodes | ||
// and updates the list of hosts if discovery is enabled. Newly discovered | ||
// nodes will automatically be used for load balancing. | ||
// | ||
// DiscoverySettings should not be enabled when operating Elasticsearch behind a proxy | ||
// or load balancer. | ||
// | ||
// https://www.elastic.co/blog/elasticsearch-sniffing-best-practices-what-when-why-how | ||
type DiscoverySettings struct { | ||
// OnStart, if set, instructs the exporter to look for available Elasticsearch | ||
// nodes the first time the exporter connects to the cluster. | ||
OnStart bool `mapstructure:"on_start"` | ||
|
||
// Interval instructs the exporter to renew the list of Elasticsearch URLs | ||
// with the given interval. URLs will not be updated if Interval is <=0. | ||
Interval time.Duration `mapstructure:"interval"` | ||
} | ||
|
||
// FlushSettings defines settings for configuring the write buffer flushing | ||
// policy in the Elasticsearch exporter. The exporter sends a bulk request with | ||
// all events already serialized into the send-buffer. | ||
type FlushSettings struct { | ||
// Bytes sets the send buffer flushing limit. | ||
Bytes int `mapstructure:"bytes"` | ||
|
||
// Interval configures the max age of a document in the send buffer. | ||
Interval time.Duration `mapstructure:"interval"` | ||
} | ||
|
||
// RetrySettings defines settings for the HTTP request retries in the Elasticsearch exporter. | ||
// Failed sends are retried with exponential backoff. | ||
type RetrySettings struct { | ||
// Enabled allows users to disable retry without having to comment out all settings. | ||
Enabled bool `mapstructure:"enabled"` | ||
|
||
// Max configures how often an HTTP request is retried before it is assumed to be failed. | ||
Max int `mapstructure:"max"` | ||
|
||
// InitialInterval configures the initial waiting time if a request failed. | ||
InitialInterval time.Duration `mapstructure:"initial_interval"` | ||
|
||
// MaxInterval configures the max waiting time if consecutive requests failed. | ||
MaxInterval time.Duration `mapstructure:"max_interval"` | ||
} | ||
|
||
type MappingsSettings struct { | ||
// Mode configures the field mappings. | ||
Mode string `mapstructure:"mode"` | ||
|
||
// Additional field mappings. | ||
Fields map[string]string `mapstructure:"fields"` | ||
|
||
// File to read additional fields mappings from. | ||
File string `mapstructure:"file"` | ||
|
||
// Try to find and remove duplicate fields | ||
Dedup bool `mapstructure:"dedup"` | ||
|
||
Dedot bool `mapstructure:"dedot"` | ||
} | ||
|
||
type MappingMode int | ||
|
||
const ( | ||
MappingNone MappingMode = iota | ||
MappingECS | ||
) | ||
|
||
func (m MappingMode) String() string { | ||
switch m { | ||
case MappingNone: | ||
return "" | ||
case MappingECS: | ||
return "ecs" | ||
default: | ||
return "" | ||
} | ||
} | ||
|
||
var mappingModes = func() map[string]MappingMode { | ||
table := map[string]MappingMode{} | ||
for _, m := range []MappingMode{ | ||
MappingNone, | ||
MappingECS, | ||
} { | ||
table[strings.ToLower(m.String())] = m | ||
} | ||
|
||
// config aliases | ||
table["no"] = MappingNone | ||
table["none"] = MappingNone | ||
|
||
return table | ||
}() | ||
|
||
// Validate validates the elasticsearch server configuration. | ||
func (cfg *Config) Validate() error { | ||
if len(cfg.URLs) == 0 && cfg.CloudID == "" { | ||
return errors.New("Elasticsearch URL or CloudID must be specified") | ||
} | ||
|
||
for _, url := range cfg.URLs { | ||
if url == "" { | ||
return errors.New("Elasticsearch URL must not be empty") | ||
} | ||
} | ||
|
||
if cfg.Index == "" { | ||
return errors.New("Elasticsearch Index must be specified") | ||
} | ||
|
||
if _, ok := mappingModes[cfg.Mapping.Mode]; !ok { | ||
return fmt.Errorf("unknown mapping mode %v", cfg.Mapping.Mode) | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
// Copyright 2020, OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package elasticsearchexporter | ||
|
||
import ( | ||
"path" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/collector/component/componenttest" | ||
"go.opentelemetry.io/collector/config/configmodels" | ||
"go.opentelemetry.io/collector/config/configtest" | ||
) | ||
|
||
func TestLoadConfig(t *testing.T) { | ||
factories, err := componenttest.ExampleComponents() | ||
require.NoError(t, err) | ||
|
||
factory := NewFactory() | ||
factories.Exporters[configmodels.Type(typeStr)] = factory | ||
cfg, err := configtest.LoadConfigFile( | ||
t, path.Join(".", "testdata", "config.yaml"), factories, | ||
) | ||
require.NoError(t, err) | ||
require.NotNil(t, cfg) | ||
|
||
assert.Equal(t, len(cfg.Exporters), 2) | ||
|
||
r0 := cfg.Exporters["elasticsearch"] | ||
assert.Equal(t, r0, factory.CreateDefaultConfig()) | ||
|
||
r1 := cfg.Exporters["elasticsearch/customname"].(*Config) | ||
assert.Equal(t, r1, &Config{ | ||
ExporterSettings: configmodels.ExporterSettings{TypeVal: configmodels.Type(typeStr), NameVal: "elasticsearch/customname"}, | ||
URLs: []string{"https://elastic.example.com:9200"}, | ||
CloudID: "TRNMxjXlNJEt", | ||
Index: "myindex", | ||
Pipeline: "mypipeline", | ||
HTTPClientSettings: HTTPClientSettings{ | ||
Authentication: AuthenticationSettings{ | ||
User: "elastic", | ||
Password: "search", | ||
APIKey: "AvFsEiPs==", | ||
}, | ||
Timeout: 2 * time.Minute, | ||
Headers: map[string]string{ | ||
"myheader": "test", | ||
}, | ||
}, | ||
Discovery: DiscoverySettings{ | ||
OnStart: true, | ||
}, | ||
Flush: FlushSettings{ | ||
Bytes: 10485760, | ||
}, | ||
Retry: RetrySettings{ | ||
Enabled: true, | ||
Max: 5, | ||
InitialInterval: 100 * time.Millisecond, | ||
MaxInterval: 1 * time.Minute, | ||
}, | ||
Mapping: MappingsSettings{ | ||
Mode: "ecs", | ||
Dedup: true, | ||
Dedot: true, | ||
}, | ||
}) | ||
} |
Oops, something went wrong.