Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/outputs/all/arc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !custom || outputs || outputs.arc

package all

import _ "github.com/influxdata/telegraf/plugins/outputs/arc" // register plugin
61 changes: 61 additions & 0 deletions plugins/outputs/arc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Arc Output Plugin

This plugin writes metrics to [Arc][arc], a high-performance time-series
database, via MessagePack binary protocol messages providing a **3-5x better
performance** than the line-protocol format.

⭐ Telegraf v1.37.0
🏷️ datastore
💻 all

[arc]: https://github.com/basekick-labs/arc

## Global configuration options <!-- @/docs/includes/plugin_config.md -->

In addition to the plugin-specific configuration settings, plugins support
additional global and plugin configuration settings. These settings are used to
modify metrics, tags, and field or create aliases and configure ordering, etc.
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins

## Configuration

```toml @sample.conf
# Arc Time-Series Database Output Plugin
[[outputs.arc]]
## Arc MessagePack API URL
url = "http://localhost:8000/api/v1/write/msgpack"

## API Key for authentication
# api_key = ""

## Database name for multi-database architecture
## Defaults to the server configured DB if not specified or empty
# database = ""

## Content encoding for request body
## Options: "gzip" (default), "identity"
# content_encoding = "gzip"

## Timeout for HTTP writes
# timeout = "5s"

## Additional HTTP headers
# [outputs.arc.headers]
# X-Custom-Header = "custom-value"
```

## Metrics

The Arc output plugin does not produce any metrics.

## Troubleshooting

For authentication issues, ensure you have generated a valid API key with write
permissions. See the [Arc documentation](https://docs.basekick.net/arc) for
details on authentication and configuration.

For connection or performance issues, check that Arc is running and accessible,
and review the Telegraf debug logs with
`telegraf --config telegraf.conf --debug`.
206 changes: 206 additions & 0 deletions plugins/outputs/arc/arc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
//go:generate ../../../tools/readme_config_includer/generator
package arc

import (
"bytes"
"compress/gzip"
"context"
_ "embed"
"fmt"
"io"
"net/http"
"time"

"github.com/tinylib/msgp/msgp"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/outputs"
)

//go:embed sample.conf
var sampleConfig string

type Arc struct {
URL string `toml:"url"`
APIKey config.Secret `toml:"api_key"`
Database string `toml:"database"`
Headers map[string]string `toml:"headers"`
ContentEncoding string `toml:"content_encoding"`
httpconfig.HTTPClientConfig
Log telegraf.Logger `toml:"-"`

client *http.Client
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using the existing http configuration instead and get all the beauty of supporting proxies etc?

}

type arcColumnarData struct {
Measurement string `msgpack:"m"`
Columns map[string]interface{} `msgpack:"columns"`
}

func (*Arc) SampleConfig() string {
return sampleConfig
}

func (a *Arc) Init() error {
if a.URL == "" {
a.URL = "http://localhost:8000/api/v1/write/msgpack"
}

if a.ContentEncoding == "" {
a.ContentEncoding = "gzip"
}

if a.Timeout == 0 {
a.Timeout = config.Duration(5 * time.Second)
}

return nil
}

func (a *Arc) Connect() error {
ctx := context.Background()
client, err := a.HTTPClientConfig.CreateClient(ctx, a.Log)
if err != nil {
return fmt.Errorf("failed to create HTTP client: %w", err)
}
a.client = client

return nil
}

func (a *Arc) Close() error {
if a.client != nil {
a.client.CloseIdleConnections()
}
return nil
}

func (a *Arc) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}

groups := make(map[string]*group)
for _, m := range metrics {
name := m.Name()
if _, found := groups[name]; !found {
numCols := len(m.FieldList()) + len(m.TagList()) + 1
groups[name] = &group{name: name, columns: make(map[string][]interface{}, numCols)}
}
groups[name].add(m)
}

messages := make([]*arcColumnarData, 0, len(groups))
for _, g := range groups {
msg, err := g.produceMessage()
if err != nil {
a.Log.Error(err)
continue
}
messages = append(messages, msg)
}

var data interface{}
switch len(messages) {
case 0:
return nil
case 1:
data = map[string]interface{}{
"m": messages[0].Measurement,
"columns": messages[0].Columns,
}
default:
dataArray := make([]interface{}, len(messages))
for i, msg := range messages {
dataArray[i] = map[string]interface{}{
"m": msg.Measurement,
"columns": msg.Columns,
}
}
data = dataArray
}

var buf bytes.Buffer
writer := msgp.NewWriter(&buf)
err := writer.WriteIntf(data)
if err != nil {
return fmt.Errorf("marshalling message failed: %w", err)
}
err = writer.Flush()
if err != nil {
return fmt.Errorf("failed to flush MessagePack writer: %w", err)
}

payload := buf.Bytes()

if a.ContentEncoding == "gzip" {
var buf bytes.Buffer
gzipWriter := gzip.NewWriter(&buf)
if _, err := gzipWriter.Write(payload); err != nil {
return fmt.Errorf("failed to gzip payload: %w", err)
}
if err := gzipWriter.Close(); err != nil {
return fmt.Errorf("failed to close gzip writer: %w", err)
}
payload = buf.Bytes()
}

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(a.Timeout))
defer cancel()

req, err := http.NewRequestWithContext(ctx, "POST", a.URL, bytes.NewReader(payload))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

req.Header.Set("Content-Type", "application/msgpack")
req.Header.Set("User-Agent", internal.ProductToken())

if !a.APIKey.Empty() {
apiKey, err := a.APIKey.Get()
if err != nil {
return fmt.Errorf("failed to get API key: %w", err)
}
req.Header.Set("x-api-key", apiKey.String())
apiKey.Destroy()
}

if a.Database != "" {
req.Header.Set("x-arc-database", a.Database)
}

if a.ContentEncoding == "gzip" {
req.Header.Set("Content-Encoding", "gzip")
}

for k, v := range a.Headers {
req.Header.Set(k, v)
}

resp, err := a.client.Do(req)
if err != nil {
return fmt.Errorf("failed to write to Arc: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("arc returned status %d (failed to read response body: %w)", resp.StatusCode, err)
}
return fmt.Errorf("arc returned status %d: %s", resp.StatusCode, string(body))
}

return nil
}

func init() {
outputs.Add("arc", func() telegraf.Output {
return &Arc{
ContentEncoding: "gzip",
}
})
}
Loading
Loading