Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,17 @@ Following is the supported API format for writing to loki:
timestampScale: timestamp units scale (e.g. for UNIX = 1s)
format: the format of each line: printf (writes using golang's default map printing), fields (writes one key and value field per line) or json (default)
reorder: reorder json map keys
clientProtocol: type of client protocol to use: 'http' or 'grpc' (default: 'http')
grpcConfig: gRPC client configuration (used only for gRPC client type)
keepAlive: keep alive interval
keepAliveTimeout: keep alive timeout
tls: TLS configuration
enabled: enable TLS
certFile: path to client certificate file
keyFile: path to client key file
caFile: path to CA certificate file
serverName: server name for certificate verification
insecureSkipVerify: skip certificate verification (insecure)
</pre>
## Write Standard Output
Following is the supported API format for writing to standard output:
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/minio/minio-go/v7 v7.0.95
github.com/mitchellh/mapstructure v1.5.0
github.com/netobserv/gopipes v0.3.0
github.com/netobserv/loki-client-go v0.0.0-20250425113517-526b43e51847
github.com/netobserv/loki-client-go v0.0.0-20250929121122-f26971f6d948
github.com/netobserv/netobserv-ebpf-agent v1.9.2-community
github.com/netsampler/goflow2 v1.3.7
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -174,3 +174,5 @@ require (
)

replace github.com/vmware/go-ipfix => github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101

replace github.com/netobserv/loki-client-go => github.com/leandroberetta/loki-client-go v0.0.0-20251009134823-acd942c4f7a5
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/leandroberetta/loki-client-go v0.0.0-20251009134823-acd942c4f7a5 h1:SxnvZWFhBAsSDWp40Pyh9EhUJ2kd+Uq8ZRRUun/LdOE=
github.com/leandroberetta/loki-client-go v0.0.0-20251009134823-acd942c4f7a5/go.mod h1:Zb/jtD3Lnu88Poo+jnhTASzxYnvncmHOoZaT93xQjJ8=
github.com/libp2p/go-reuseport v0.4.0 h1:nR5KU7hD0WxXCJbmw7r2rhRYruNRl2koHw8fQscQm2s=
github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8Se6DrI2E1cLU=
github.com/mailru/easyjson v0.9.0 h1:PrnmzHw7262yW8sTBwxi1PdJA3Iw/EKBa8psRf7d9a4=
Expand Down Expand Up @@ -255,8 +257,6 @@ github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/netobserv/gopipes v0.3.0 h1:IYmPnnAVCdSK7VmHmpFhrVBOEm45qpgbZmJz1sSW+60=
github.com/netobserv/gopipes v0.3.0/go.mod h1:N7/Gz05EOF0CQQSKWsv3eof22Cj2PB08Pbttw98YFYU=
github.com/netobserv/loki-client-go v0.0.0-20250425113517-526b43e51847 h1:hjzhVZSSKIOmAzHbGUV4JhVIPkgKs/UtrWDx6JSVKMw=
github.com/netobserv/loki-client-go v0.0.0-20250425113517-526b43e51847/go.mod h1:Zb/jtD3Lnu88Poo+jnhTASzxYnvncmHOoZaT93xQjJ8=
github.com/netobserv/netobserv-ebpf-agent v1.9.2-community h1:ghW16OO4QRWj0Uh1gMYX+NjAlgx2sZmCsO3Tkwoj4Do=
github.com/netobserv/netobserv-ebpf-agent v1.9.2-community/go.mod h1:17OaUNAwx0LxoeV/SaHlJIJP6bpN7zSvUP3GtZelESQ=
github.com/netsampler/goflow2 v1.3.7 h1:XZaTy8kkMnGXpJ9hS3KbO1McyrFTpVNhVFEx9rNhMmc=
Expand Down
73 changes: 70 additions & 3 deletions pkg/api/write_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package api
import (
"errors"
"fmt"
"time"

promConfig "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
Expand All @@ -46,6 +47,15 @@ type WriteLoki struct {
TimestampScale string `yaml:"timestampScale,omitempty" json:"timestampScale,omitempty" doc:"timestamp units scale (e.g. for UNIX = 1s)"`
Format string `yaml:"format,omitempty" json:"format,omitempty" doc:"the format of each line: printf (writes using golang's default map printing), fields (writes one key and value field per line) or json (default)"`
Reorder bool `yaml:"reorder,omitempty" json:"reorder,omitempty" doc:"reorder json map keys"`

// Client protocol selection
ClientProtocol string `yaml:"clientProtocol,omitempty" json:"clientProtocol,omitempty" doc:"type of client protocol to use: 'http' or 'grpc' (default: 'http')"`
GRPCConfig *GRPCLokiConfig `yaml:"grpcConfig,omitempty" json:"grpcConfig,omitempty" doc:"gRPC client configuration (used only for gRPC client type)"`
}

type GRPCLokiConfig struct {
KeepAlive string `yaml:"keepAlive,omitempty" json:"keepAlive,omitempty" doc:"keep alive interval"`
KeepAliveTimeout string `yaml:"keepAliveTimeout,omitempty" json:"keepAliveTimeout,omitempty" doc:"keep alive timeout"`
}

func (w *WriteLoki) SetDefaults() {
Expand Down Expand Up @@ -76,6 +86,23 @@ func (w *WriteLoki) SetDefaults() {
if w.Format == "" {
w.Format = "json"
}
if w.ClientProtocol == "" {
w.ClientProtocol = "http"
}

// Set defaults for gRPC config if gRPC client protocol is selected
if w.ClientProtocol == "grpc" && w.GRPCConfig != nil {
w.GRPCConfig.SetDefaults()
}
}

func (g *GRPCLokiConfig) SetDefaults() {
if g.KeepAlive == "" {
g.KeepAlive = "30s"
}
if g.KeepAliveTimeout == "" {
g.KeepAliveTimeout = "5s"
}
}

func (w *WriteLoki) Validate() error {
Expand All @@ -85,11 +112,51 @@ func (w *WriteLoki) Validate() error {
if w.TimestampScale == "" {
return errors.New("timestampUnit must be a valid Duration > 0 (e.g. 1m, 1s or 1ms)")
}
if w.URL == "" {
return errors.New("url can't be empty")
}
if w.BatchSize <= 0 {
return fmt.Errorf("invalid batchSize: %v. Required > 0", w.BatchSize)
}

// Validate client protocol
if w.ClientProtocol != "" && w.ClientProtocol != "http" && w.ClientProtocol != "grpc" {
return fmt.Errorf("invalid clientProtocol: %s. Must be 'http' or 'grpc'", w.ClientProtocol)
}

// Validate based on client protocol
switch w.ClientProtocol {
case "http", "":
if w.URL == "" {
return errors.New("url can't be empty for HTTP client")
}
case "grpc":
if w.URL == "" {
return errors.New("url can't be empty for gRPC client")
}
if w.GRPCConfig == nil {
return errors.New("grpcConfig is required when using gRPC client protocol")
}
if err := w.GRPCConfig.Validate(); err != nil {
return fmt.Errorf("gRPC config validation failed: %w", err)
}
}

return nil
}

func (g *GRPCLokiConfig) Validate() error {
if g == nil {
return errors.New("gRPC config cannot be nil")
}
// Validate duration fields
if g.KeepAlive != "" {
if _, err := time.ParseDuration(g.KeepAlive); err != nil {
return fmt.Errorf("invalid keepAlive duration: %w", err)
}
}
if g.KeepAliveTimeout != "" {
if _, err := time.ParseDuration(g.KeepAliveTimeout); err != nil {
return fmt.Errorf("invalid keepAliveTimeout duration: %w", err)
}
}

return nil
}
118 changes: 108 additions & 10 deletions pkg/pipeline/write/write_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/utils"

logAdapter "github.com/go-kit/kit/log/logrus"
"github.com/netobserv/loki-client-go/grpc"
"github.com/netobserv/loki-client-go/loki"
"github.com/netobserv/loki-client-go/pkg/backoff"
"github.com/netobserv/loki-client-go/pkg/urlutil"
Expand All @@ -49,7 +50,6 @@ type emitter interface {

// Loki record writer
type Loki struct {
lokiConfig loki.Config
apiConfig api.WriteLoki
timestampScale float64
saneLabels map[string]model.LabelName
Expand All @@ -61,7 +61,46 @@ type Loki struct {
formatter func(config.GenericMap) string
}

func buildLokiConfig(c *api.WriteLoki) (loki.Config, error) {
func createLokiClient(c *api.WriteLoki) (emitter, error) {
switch c.ClientProtocol {
case "grpc":
return createGRPCClient(c)
case "http", "":
return createHTTPClient(c)
default:
return nil, fmt.Errorf("unsupported client protocol: %s", c.ClientProtocol)
}
}

func createHTTPClient(c *api.WriteLoki) (emitter, error) {
cfg, err := buildHTTPLokiConfig(c)
if err != nil {
return nil, err
}

client, err := loki.NewWithLogger(cfg, logAdapter.NewLogger(log.WithField("module", "export/loki")))
if err != nil {
return nil, fmt.Errorf("failed to create HTTP Loki client: %w", err)
}

return client, nil
}

func createGRPCClient(c *api.WriteLoki) (emitter, error) {
cfg, err := buildGRPCLokiConfig(c)
if err != nil {
return nil, err
}

client, err := grpc.NewWithLogger(cfg, logAdapter.NewLogger(log.WithField("module", "export/loki-grpc")))
if err != nil {
return nil, fmt.Errorf("failed to create gRPC Loki client: %w", err)
}

return client, nil
}

func buildHTTPLokiConfig(c *api.WriteLoki) (loki.Config, error) {
batchWait, err := time.ParseDuration(c.BatchWait)
if err != nil {
return loki.Config{}, fmt.Errorf("failed in parsing BatchWait : %w", err)
Expand Down Expand Up @@ -105,6 +144,69 @@ func buildLokiConfig(c *api.WriteLoki) (loki.Config, error) {
return cfg, nil
}

func buildGRPCLokiConfig(c *api.WriteLoki) (grpc.Config, error) {
if c.GRPCConfig == nil {
return grpc.Config{}, fmt.Errorf("gRPC config is required for gRPC client protocol")
}

batchWait, err := time.ParseDuration(c.BatchWait)
if err != nil {
return grpc.Config{}, fmt.Errorf("failed in parsing BatchWait: %w", err)
}

timeout, err := time.ParseDuration(c.Timeout)
if err != nil {
return grpc.Config{}, fmt.Errorf("failed in parsing Timeout: %w", err)
}

minBackoff, err := time.ParseDuration(c.MinBackoff)
if err != nil {
return grpc.Config{}, fmt.Errorf("failed in parsing MinBackoff: %w", err)
}

maxBackoff, err := time.ParseDuration(c.MaxBackoff)
if err != nil {
return grpc.Config{}, fmt.Errorf("failed in parsing MaxBackoff: %w", err)
}

keepAlive, err := time.ParseDuration(c.GRPCConfig.KeepAlive)
if err != nil {
return grpc.Config{}, fmt.Errorf("failed in parsing KeepAlive: %w", err)
}

keepAliveTimeout, err := time.ParseDuration(c.GRPCConfig.KeepAliveTimeout)
if err != nil {
return grpc.Config{}, fmt.Errorf("failed in parsing KeepAliveTimeout: %w", err)
}

cfg := grpc.Config{
ServerAddress: c.URL,
TenantID: c.TenantID,
BatchWait: batchWait,
BatchSize: c.BatchSize,
Timeout: timeout,
KeepAlive: keepAlive,
KeepAliveTimeout: keepAliveTimeout,
BackoffConfig: backoff.BackoffConfig{
MinBackoff: minBackoff,
MaxBackoff: maxBackoff,
MaxRetries: c.MaxRetries,
},
}

// Set external labels from static labels
if len(c.StaticLabels) > 0 {
cfg.ExternalLabels.LabelSet = c.StaticLabels
}

// Configure TLS from shared ClientConfig (same as HTTP client)
if c.ClientConfig != nil {
cfg.TLS = c.ClientConfig.TLSConfig
}

return cfg, nil
}

func (l *Loki) ProcessRecord(in config.GenericMap) error {
labels, lines := l.splitLabelsLines(in)

Expand Down Expand Up @@ -219,13 +321,10 @@ func NewWriteLoki(opMetrics *operational.Metrics, params config.StageParam) (*Lo
return nil, fmt.Errorf("the provided config is not valid: %w", err)
}

lokiConfig, buildconfigErr := buildLokiConfig(&lokiConfigIn)
if buildconfigErr != nil {
return nil, buildconfigErr
}
client, newWithLoggerErr := loki.NewWithLogger(lokiConfig, logAdapter.NewLogger(log.WithField("module", "export/loki")))
if newWithLoggerErr != nil {
return nil, newWithLoggerErr
// Create the appropriate client based on clientProtocol
client, err := createLokiClient(&lokiConfigIn)
if err != nil {
return nil, fmt.Errorf("failed to create Loki client: %w", err)
}

timestampScale, err := time.ParseDuration(lokiConfigIn.TimestampScale)
Expand Down Expand Up @@ -253,7 +352,6 @@ func NewWriteLoki(opMetrics *operational.Metrics, params config.StageParam) (*Lo

f := formatter(lokiConfigIn.Format, lokiConfigIn.Reorder)
l := &Loki{
lokiConfig: lokiConfig,
apiConfig: lokiConfigIn,
timestampScale: float64(timestampScale),
saneLabels: saneLabels,
Expand Down
Loading