Skip to content

Commit

Permalink
feat(outputs.quix): Add plugin (influxdata#16144)
Browse files Browse the repository at this point in the history
Co-authored-by: stereosky <tun@stereosky.com>
Co-authored-by: Sven Rebhan <srebhan@influxdata.com>
  • Loading branch information
3 people authored Dec 6, 2024
1 parent c0db964 commit a9c91f1
Show file tree
Hide file tree
Showing 6 changed files with 507 additions and 0 deletions.
5 changes: 5 additions & 0 deletions plugins/outputs/all/quix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !custom || outputs || outputs.quix

package all

import _ "github.com/influxdata/telegraf/plugins/outputs/quix" // register plugin
58 changes: 58 additions & 0 deletions plugins/outputs/quix/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Quix Output Plugin

This plugin writes metrics to a [Quix][quix] endpoint.

Please consult Quix's [official documentation][docs] for more details on the
Quix platform architecture and concepts.

⭐ Telegraf v1.33.0
🏷️ cloud, messaging
💻 all

[quix]: https://quix.io
[docs]: https://quix.io/docs/

## Global configuration options <!-- @/docs/includes/plugin_config.md -->

In addition to the plugin-specific configuration settings, plugins support
additional global and plugin configuration settings. These settings are used to
modify metrics, tags, and field or create aliases and configure ordering, etc.
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins

## Secret-store support

This plugin supports secrets from secret-stores for the `token` option.
See the [secret-store documentation][SECRETSTORE] for more details on how
to use them.

[SECRETSTORE]: ../../../docs/CONFIGURATION.md#secret-store-secrets

## Configuration

```toml @sample.conf
# Send metrics to a Quix data processing pipeline
[[outputs.quix]]
## Endpoint for providing the configuration
# url = "https://portal-api.platform.quix.io"

## Workspace and topics to send the metrics to
workspace = "your_workspace"
topic = "your_topic"

## Authentication token created in Quix
token = "your_auth_token"

## Amount of time allowed to complete the HTTP request for fetching the config
# timeout = "5s"
```

The plugin requires a [SDK token][token] for authentication with Quix. You can
generate the `token` in settings under the `API and tokens` section.

Furthermore, the `workspace` parameter must be set to the `Workspace ID` or the
`Environment ID` of your Quix project. Those values can be found in settings
under the `General settings` section.

[token]: https://quix.io/docs/develop/authentication/personal-access-token.html
81 changes: 81 additions & 0 deletions plugins/outputs/quix/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package quix

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/http"
)

type brokerConfig struct {
BootstrapServers string `json:"bootstrap.servers"`
SaslMechanism string `json:"sasl.mechanism"`
SaslUsername string `json:"sasl.username"`
SaslPassword string `json:"sasl.password"`
SecurityProtocol string `json:"security.protocol"`
SSLCertBase64 string `json:"ssl.ca.cert"`

cert []byte
}

func (q *Quix) fetchBrokerConfig() (*brokerConfig, error) {
// Create request
endpoint := fmt.Sprintf("%s/workspaces/%s/broker/librdkafka", q.APIURL, q.Workspace)
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return nil, fmt.Errorf("creating request failed: %w", err)
}

// Setup authentication
token, err := q.Token.Get()
if err != nil {
return nil, fmt.Errorf("getting token failed: %w", err)
}
req.Header.Set("Authorization", "Bearer "+token.String())
req.Header.Set("Accept", "application/json")
token.Destroy()

// Query the broker configuration from the Quix API
client, err := q.HTTPClientConfig.CreateClient(context.Background(), q.Log)
if err != nil {
return nil, fmt.Errorf("creating client failed: %w", err)
}
defer client.CloseIdleConnections()

resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("executing request failed: %w", err)
}
defer resp.Body.Close()

// Read the body as we need it both in case of an error as well as for
// decoding the config in case of success
body, err := io.ReadAll(resp.Body)
if err != nil {
q.Log.Errorf("Reading message body failed: %v", err)
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected response %q (%d): %s",
http.StatusText(resp.StatusCode),
resp.StatusCode,
string(body),
)
}

// Decode the broker and the returned certificate
var cfg brokerConfig
if err := json.Unmarshal(body, &cfg); err != nil {
return nil, fmt.Errorf("decoding body failed: %w", err)
}

cert, err := base64.StdEncoding.DecodeString(cfg.SSLCertBase64)
if err != nil {
return nil, fmt.Errorf("decoding certificate failed: %w", err)
}
cfg.cert = cert

return &cfg, nil
}
169 changes: 169 additions & 0 deletions plugins/outputs/quix/quix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
//go:generate ../../../tools/readme_config_includer/generator
package quix

import (
"crypto/tls"
"crypto/x509"
_ "embed"
"errors"
"fmt"
"strings"
"time"

"github.com/IBM/sarama"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
common_http "github.com/influxdata/telegraf/plugins/common/http"
common_kafka "github.com/influxdata/telegraf/plugins/common/kafka"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/json"
)

//go:embed sample.conf
var sampleConfig string

type Quix struct {
APIURL string `toml:"url"`
Workspace string `toml:"workspace"`
Topic string `toml:"topic"`
Token config.Secret `toml:"token"`
Log telegraf.Logger `toml:"-"`
common_http.HTTPClientConfig

producer sarama.SyncProducer
serializer serializers.Serializer
kakfaTopic string
}

func (*Quix) SampleConfig() string {
return sampleConfig
}

func (q *Quix) Init() error {
// Set defaults
if q.APIURL == "" {
q.APIURL = "https://portal-api.platform.quix.io"
}
q.APIURL = strings.TrimSuffix(q.APIURL, "/")

// Check input parameters
if q.Topic == "" {
return errors.New("option 'topic' must be set")
}
if q.Workspace == "" {
return errors.New("option 'workspace' must be set")
}
if q.Token.Empty() {
return errors.New("option 'token' must be set")
}
q.kakfaTopic = q.Workspace + "-" + q.Topic

// Create a JSON serializer for the output
q.serializer = &json.Serializer{
TimestampUnits: config.Duration(time.Nanosecond), // Hardcoded nanoseconds precision
}

return nil
}

func (q *Quix) Connect() error {
// Fetch the Kafka broker configuration from the Quix HTTP endpoint
quixConfig, err := q.fetchBrokerConfig()
if err != nil {
return fmt.Errorf("fetching broker config failed: %w", err)
}
brokers := strings.Split(quixConfig.BootstrapServers, ",")
if len(brokers) == 0 {
return errors.New("no brokers received")
}

// Setup the Kakfa producer config
cfg := sarama.NewConfig()
cfg.Producer.Return.Successes = true

switch quixConfig.SecurityProtocol {
case "SASL_SSL":
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = quixConfig.SaslUsername
cfg.Net.SASL.Password = quixConfig.SaslPassword
cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &common_kafka.XDGSCRAMClient{HashGeneratorFcn: common_kafka.SHA256}
}

switch quixConfig.SaslMechanism {
case "SCRAM-SHA-512":
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &common_kafka.XDGSCRAMClient{HashGeneratorFcn: common_kafka.SHA512}
}
cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
case "SCRAM-SHA-256":
cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
cfg.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &common_kafka.XDGSCRAMClient{HashGeneratorFcn: common_kafka.SHA256}
}
case "PLAIN":
cfg.Net.SASL.Mechanism = sarama.SASLTypePlaintext
default:
return fmt.Errorf("unsupported SASL mechanism: %s", quixConfig.SaslMechanism)
}

// Certificate
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(quixConfig.cert) {
return errors.New("appending CA cert to pool failed")
}
cfg.Net.TLS.Enable = true
cfg.Net.TLS.Config = &tls.Config{RootCAs: certPool}
case "PLAINTEXT":
// No additional configuration required for plaintext communication
default:
return fmt.Errorf("unsupported security protocol: %s", quixConfig.SecurityProtocol)
}

// Setup the Kakfa producer itself
producer, err := sarama.NewSyncProducer(brokers, cfg)
if err != nil {
return fmt.Errorf("creating producer failed: %w", err)
}
q.producer = producer

return nil
}

func (q *Quix) Write(metrics []telegraf.Metric) error {
for _, m := range metrics {
serialized, err := q.serializer.Serialize(m)
if err != nil {
q.Log.Errorf("Error serializing metric: %v", err)
continue
}

msg := &sarama.ProducerMessage{
Topic: q.kakfaTopic,
Value: sarama.ByteEncoder(serialized),
Timestamp: m.Time(),
Key: sarama.StringEncoder("telegraf"),
}

if _, _, err = q.producer.SendMessage(msg); err != nil {
q.Log.Errorf("Error sending message to Kafka: %v", err)
continue
}
}

return nil
}

func (q *Quix) Close() error {
if q.producer != nil {
return q.producer.Close()
}
return nil
}

func init() {
outputs.Add("quix", func() telegraf.Output { return &Quix{} })
}
Loading

0 comments on commit a9c91f1

Please sign in to comment.