Skip to content

Commit

Permalink
Add kafkametricsreceiver initial structure (#2550)
Browse files Browse the repository at this point in the history
Adds the factory, config and receiver for new (kafkametricsreceiver) component.

The new kafkametricsreceiver will primarily report metrics on consumer_group lag from kafka (using the internal __consumer_offsets topic). It will also include metrics on number of brokers, topics, and offsets of topics.

Testing: The current code is a scaffolding that cannot be e2e tested, appropriate unit tests were added.

Documentation: README explaining usage of new receiver component is included.
  • Loading branch information
dshomoye committed Mar 8, 2021
1 parent 0d64efb commit 107e5f3
Show file tree
Hide file tree
Showing 11 changed files with 1,897 additions and 0 deletions.
1 change: 1 addition & 0 deletions receiver/kafkametricsreceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
60 changes: 60 additions & 0 deletions receiver/kafkametricsreceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Kafka Metrics Receiver

Kafka metrics receiver collects kafka metrics (brokers, topics, partitions, consumer groups) from kafka server,
converting into otlp.

## Getting Started

Required settings (no defaults):

- `protocol_version`: Kafka protocol version
- `scrapers`: any combination of the following scrapers can be enabled.
- `topics`
- `consumers`
- `brokers`

Optional Settings (with defaults):

- `brokers` (default = localhost:9092): the list of brokers to read from.
- `topic_match` (default = *): regex pattern of topics to filter for metrics collection.
- `group_match` (default = *): regex pattern of consumer groups to filter on for metrics.
- `client_id` (default = otel-metrics-receiver): consumer client id
- `collection_interval` (default = 1m): frequency of metric collection/scraping.
- `auth` (default none)
- `plain_text`
- `username`: The username to use.
- `password`: The password to use
- `tls`
- `ca_file`: path to the CA cert. For a client this verifies the server certificate. Should only be used
if `insecure` is set to true.
- `cert_file`: path to the TLS cert to use for TLS required connections. Should only be used if `insecure` is
set to true.
- `key_file`: path to the TLS key to use for TLS required connections. Should only be used if `insecure` is set
to true.
- `insecure` (default = false): Disable verifying the server's certificate chain and host
name (`InsecureSkipVerify` in the tls config)
- `server_name_override`: ServerName indicates the name of the server requested by the client in order to
support virtual hosting.
- `kerberos`
- `service_name`: Kerberos service name
- `realm`: Kerberos realm
- `use_keytab`: Use of keytab instead of password, if this is true, keytab file will be used instead of
password
- `username`: The Kerberos username used for authenticate with KDC
- `password`: The Kerberos password used for authenticate with KDC
- `config_file`: Path to Kerberos configuration. i.e /etc/krb5.conf
- `keytab_file`: Path to keytab file. i.e /etc/security/kafka.keytab

## Examples:

Basic configuration with all scrapers:

```yaml
receivers:
kafkametrics:
protocol_version: 2.0.0
scrapers:
- brokers
- topics
- consumers
```
46 changes: 46 additions & 0 deletions receiver/kafkametricsreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkametricsreceiver

import (
"go.opentelemetry.io/collector/exporter/kafkaexporter"
"go.opentelemetry.io/collector/receiver/scraperhelper"
)

// Config represents user settings for kafkametrics receiver
type Config struct {
scraperhelper.ScraperControllerSettings `mapstructure:",squash"`

// The list of kafka brokers (default localhost:9092)
Brokers []string `mapstructure:"brokers"`

// ProtocolVersion Kafka protocol version
ProtocolVersion string `mapstructure:"protocol_version"`

// TopicMatch topics to collect metrics on
TopicMatch string `mapstructure:"topic_match"`

// GroupMatch consumer groups to collect on
GroupMatch string `mapstructure:"group_match"`

// Authentication data
Authentication kafkaexporter.Authentication `mapstructure:"auth"`

// Scrapers defines which metric data points to be captured from kafka
Scrapers []string `mapstructure:"scrapers"`

// ClientID is the id associated with the consumer that reads from topics in kafka.
ClientID string `mapstructure:"client_id"`
}
59 changes: 59 additions & 0 deletions receiver/kafkametricsreceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkametricsreceiver

import (
"path"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/kafkaexporter"
"go.opentelemetry.io/collector/receiver/scraperhelper"
)

func TestLoadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.NoError(t, err)

factory := NewFactory()
factories.Receivers[typeStr] = factory
cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.Equal(t, 1, len(cfg.Receivers))

r := cfg.Receivers[typeStr].(*Config)
assert.Equal(t, &Config{
ScraperControllerSettings: scraperhelper.DefaultScraperControllerSettings(typeStr),
Brokers: []string{"10.10.10.10:9092"},
ProtocolVersion: "2.0.0",
TopicMatch: "test_*",
GroupMatch: "test_*",
Authentication: kafkaexporter.Authentication{
TLS: &configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
CAFile: "ca.pem",
CertFile: "cert.pem",
KeyFile: "key.pem",
},
},
},
ClientID: defaultClientID,
Scrapers: []string{"brokers", "topics", "consumers"},
}, r)
}
64 changes: 64 additions & 0 deletions receiver/kafkametricsreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkametricsreceiver

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.opentelemetry.io/collector/receiver/scraperhelper"
)

const (
typeStr = "kafkametrics"
defaultBroker = "localhost:9092"
defaultGroupMatch = ".*"
defaultTopicMatch = ".*"
defaultClientID = "otel-metrics-receiver"
)

// NewFactory creates kafkametrics receiver factory.
func NewFactory() component.ReceiverFactory {
return receiverhelper.NewFactory(
typeStr,
createDefaultConfig,
receiverhelper.WithMetrics(createMetricsReceiver))
}

func createDefaultConfig() configmodels.Receiver {
return &Config{
ScraperControllerSettings: scraperhelper.DefaultScraperControllerSettings(typeStr),
Brokers: []string{defaultBroker},
GroupMatch: defaultGroupMatch,
TopicMatch: defaultTopicMatch,
ClientID: defaultClientID,
}
}

func createMetricsReceiver(
ctx context.Context,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
nextConsumer consumer.MetricsConsumer) (component.MetricsReceiver, error) {
c := cfg.(*Config)
r, err := newMetricsReceiver(ctx, *c, params, nextConsumer)
if err != nil {
return nil, err
}
return r, nil
}
58 changes: 58 additions & 0 deletions receiver/kafkametricsreceiver/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkametricsreceiver

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/consumer"
)

func TestCreateDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "default config not created")
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestCreateMetricsReceiver_errors(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
cfg.Scrapers = []string{"topics"}
r, err := createMetricsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
assert.Error(t, err)
assert.Nil(t, r)
}

func TestCreateMetricsReceiver(t *testing.T) {
prev := newMetricsReceiver
newMetricsReceiver = func(ctx context.Context, config Config, params component.ReceiverCreateParams, consumer consumer.MetricsConsumer) (component.MetricsReceiver, error) {
return nil, nil
}
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
cfg.Scrapers = []string{"topics"}
_, err := createMetricsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
newMetricsReceiver = prev
assert.Nil(t, err)
}
8 changes: 8 additions & 0 deletions receiver/kafkametricsreceiver/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver

go 1.14

require (
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/collector v0.21.0
)
Loading

0 comments on commit 107e5f3

Please sign in to comment.