Skip to content

Commit d01e386

Browse files
grpc support to write logs to Loki
1 parent 7e3fe86 commit d01e386

File tree

12 files changed

+1197
-106
lines changed

12 files changed

+1197
-106
lines changed

docs/api.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,17 @@ Following is the supported API format for writing to loki:
324324
timestampScale: timestamp units scale (e.g. for UNIX = 1s)
325325
format: the format of each line: printf (writes using golang's default map printing), fields (writes one key and value field per line) or json (default)
326326
reorder: reorder json map keys
327+
clientProtocol: type of client protocol to use: 'http' or 'grpc' (default: 'http')
328+
grpcConfig: gRPC client configuration (used only for gRPC client type)
329+
keepAlive: keep alive interval
330+
keepAliveTimeout: keep alive timeout
331+
tls: TLS configuration
332+
enabled: enable TLS
333+
certFile: path to client certificate file
334+
keyFile: path to client key file
335+
caFile: path to CA certificate file
336+
serverName: server name for certificate verification
337+
insecureSkipVerify: skip certificate verification (insecure)
327338
</pre>
328339
## Write Standard Output
329340
Following is the supported API format for writing to standard output:

go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ require (
1717
github.com/minio/minio-go/v7 v7.0.95
1818
github.com/mitchellh/mapstructure v1.5.0
1919
github.com/netobserv/gopipes v0.3.0
20-
github.com/netobserv/loki-client-go v0.0.0-20250425113517-526b43e51847
20+
github.com/netobserv/loki-client-go v0.0.0-20250929121122-f26971f6d948
2121
github.com/netobserv/netobserv-ebpf-agent v1.9.2-community
2222
github.com/netsampler/goflow2 v1.3.7
2323
github.com/pkg/errors v0.9.1
@@ -174,3 +174,5 @@ require (
174174
)
175175

176176
replace github.com/vmware/go-ipfix => github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101
177+
178+
replace github.com/netobserv/loki-client-go => github.com/leandroberetta/loki-client-go v0.0.0-20251009134823-acd942c4f7a5

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
206206
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
207207
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
208208
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
209+
github.com/leandroberetta/loki-client-go v0.0.0-20251009134823-acd942c4f7a5 h1:SxnvZWFhBAsSDWp40Pyh9EhUJ2kd+Uq8ZRRUun/LdOE=
210+
github.com/leandroberetta/loki-client-go v0.0.0-20251009134823-acd942c4f7a5/go.mod h1:Zb/jtD3Lnu88Poo+jnhTASzxYnvncmHOoZaT93xQjJ8=
209211
github.com/libp2p/go-reuseport v0.4.0 h1:nR5KU7hD0WxXCJbmw7r2rhRYruNRl2koHw8fQscQm2s=
210212
github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8Se6DrI2E1cLU=
211213
github.com/mailru/easyjson v0.9.0 h1:PrnmzHw7262yW8sTBwxi1PdJA3Iw/EKBa8psRf7d9a4=
@@ -255,8 +257,6 @@ github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J
255257
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
256258
github.com/netobserv/gopipes v0.3.0 h1:IYmPnnAVCdSK7VmHmpFhrVBOEm45qpgbZmJz1sSW+60=
257259
github.com/netobserv/gopipes v0.3.0/go.mod h1:N7/Gz05EOF0CQQSKWsv3eof22Cj2PB08Pbttw98YFYU=
258-
github.com/netobserv/loki-client-go v0.0.0-20250425113517-526b43e51847 h1:hjzhVZSSKIOmAzHbGUV4JhVIPkgKs/UtrWDx6JSVKMw=
259-
github.com/netobserv/loki-client-go v0.0.0-20250425113517-526b43e51847/go.mod h1:Zb/jtD3Lnu88Poo+jnhTASzxYnvncmHOoZaT93xQjJ8=
260260
github.com/netobserv/netobserv-ebpf-agent v1.9.2-community h1:ghW16OO4QRWj0Uh1gMYX+NjAlgx2sZmCsO3Tkwoj4Do=
261261
github.com/netobserv/netobserv-ebpf-agent v1.9.2-community/go.mod h1:17OaUNAwx0LxoeV/SaHlJIJP6bpN7zSvUP3GtZelESQ=
262262
github.com/netsampler/goflow2 v1.3.7 h1:XZaTy8kkMnGXpJ9hS3KbO1McyrFTpVNhVFEx9rNhMmc=

pkg/api/write_loki.go

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package api
2020
import (
2121
"errors"
2222
"fmt"
23+
"time"
2324

2425
promConfig "github.com/prometheus/common/config"
2526
"github.com/prometheus/common/model"
@@ -46,6 +47,15 @@ type WriteLoki struct {
4647
TimestampScale string `yaml:"timestampScale,omitempty" json:"timestampScale,omitempty" doc:"timestamp units scale (e.g. for UNIX = 1s)"`
4748
Format string `yaml:"format,omitempty" json:"format,omitempty" doc:"the format of each line: printf (writes using golang's default map printing), fields (writes one key and value field per line) or json (default)"`
4849
Reorder bool `yaml:"reorder,omitempty" json:"reorder,omitempty" doc:"reorder json map keys"`
50+
51+
// Client protocol selection
52+
ClientProtocol string `yaml:"clientProtocol,omitempty" json:"clientProtocol,omitempty" doc:"type of client protocol to use: 'http' or 'grpc' (default: 'http')"`
53+
GRPCConfig *GRPCLokiConfig `yaml:"grpcConfig,omitempty" json:"grpcConfig,omitempty" doc:"gRPC client configuration (used only for gRPC client type)"`
54+
}
55+
56+
type GRPCLokiConfig struct {
57+
KeepAlive string `yaml:"keepAlive,omitempty" json:"keepAlive,omitempty" doc:"keep alive interval"`
58+
KeepAliveTimeout string `yaml:"keepAliveTimeout,omitempty" json:"keepAliveTimeout,omitempty" doc:"keep alive timeout"`
4959
}
5060

5161
func (w *WriteLoki) SetDefaults() {
@@ -76,6 +86,23 @@ func (w *WriteLoki) SetDefaults() {
7686
if w.Format == "" {
7787
w.Format = "json"
7888
}
89+
if w.ClientProtocol == "" {
90+
w.ClientProtocol = "http"
91+
}
92+
93+
// Set defaults for gRPC config if gRPC client protocol is selected
94+
if w.ClientProtocol == "grpc" && w.GRPCConfig != nil {
95+
w.GRPCConfig.SetDefaults()
96+
}
97+
}
98+
99+
func (g *GRPCLokiConfig) SetDefaults() {
100+
if g.KeepAlive == "" {
101+
g.KeepAlive = "30s"
102+
}
103+
if g.KeepAliveTimeout == "" {
104+
g.KeepAliveTimeout = "5s"
105+
}
79106
}
80107

81108
func (w *WriteLoki) Validate() error {
@@ -85,11 +112,51 @@ func (w *WriteLoki) Validate() error {
85112
if w.TimestampScale == "" {
86113
return errors.New("timestampUnit must be a valid Duration > 0 (e.g. 1m, 1s or 1ms)")
87114
}
88-
if w.URL == "" {
89-
return errors.New("url can't be empty")
90-
}
91115
if w.BatchSize <= 0 {
92116
return fmt.Errorf("invalid batchSize: %v. Required > 0", w.BatchSize)
93117
}
118+
119+
// Validate client protocol
120+
if w.ClientProtocol != "" && w.ClientProtocol != "http" && w.ClientProtocol != "grpc" {
121+
return fmt.Errorf("invalid clientProtocol: %s. Must be 'http' or 'grpc'", w.ClientProtocol)
122+
}
123+
124+
// Validate based on client protocol
125+
switch w.ClientProtocol {
126+
case "http", "":
127+
if w.URL == "" {
128+
return errors.New("url can't be empty for HTTP client")
129+
}
130+
case "grpc":
131+
if w.URL == "" {
132+
return errors.New("url can't be empty for gRPC client")
133+
}
134+
if w.GRPCConfig == nil {
135+
return errors.New("grpcConfig is required when using gRPC client protocol")
136+
}
137+
if err := w.GRPCConfig.Validate(); err != nil {
138+
return fmt.Errorf("gRPC config validation failed: %w", err)
139+
}
140+
}
141+
142+
return nil
143+
}
144+
145+
func (g *GRPCLokiConfig) Validate() error {
146+
if g == nil {
147+
return errors.New("gRPC config cannot be nil")
148+
}
149+
// Validate duration fields
150+
if g.KeepAlive != "" {
151+
if _, err := time.ParseDuration(g.KeepAlive); err != nil {
152+
return fmt.Errorf("invalid keepAlive duration: %w", err)
153+
}
154+
}
155+
if g.KeepAliveTimeout != "" {
156+
if _, err := time.ParseDuration(g.KeepAliveTimeout); err != nil {
157+
return fmt.Errorf("invalid keepAliveTimeout duration: %w", err)
158+
}
159+
}
160+
94161
return nil
95162
}

pkg/pipeline/write/write_loki.go

Lines changed: 108 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
3131

3232
logAdapter "github.com/go-kit/kit/log/logrus"
33+
"github.com/netobserv/loki-client-go/grpc"
3334
"github.com/netobserv/loki-client-go/loki"
3435
"github.com/netobserv/loki-client-go/pkg/backoff"
3536
"github.com/netobserv/loki-client-go/pkg/urlutil"
@@ -49,7 +50,6 @@ type emitter interface {
4950

5051
// Loki record writer
5152
type Loki struct {
52-
lokiConfig loki.Config
5353
apiConfig api.WriteLoki
5454
timestampScale float64
5555
saneLabels map[string]model.LabelName
@@ -61,7 +61,46 @@ type Loki struct {
6161
formatter func(config.GenericMap) string
6262
}
6363

64-
func buildLokiConfig(c *api.WriteLoki) (loki.Config, error) {
64+
func createLokiClient(c *api.WriteLoki) (emitter, error) {
65+
switch c.ClientProtocol {
66+
case "grpc":
67+
return createGRPCClient(c)
68+
case "http", "":
69+
return createHTTPClient(c)
70+
default:
71+
return nil, fmt.Errorf("unsupported client protocol: %s", c.ClientProtocol)
72+
}
73+
}
74+
75+
func createHTTPClient(c *api.WriteLoki) (emitter, error) {
76+
cfg, err := buildHTTPLokiConfig(c)
77+
if err != nil {
78+
return nil, err
79+
}
80+
81+
client, err := loki.NewWithLogger(cfg, logAdapter.NewLogger(log.WithField("module", "export/loki")))
82+
if err != nil {
83+
return nil, fmt.Errorf("failed to create HTTP Loki client: %w", err)
84+
}
85+
86+
return client, nil
87+
}
88+
89+
func createGRPCClient(c *api.WriteLoki) (emitter, error) {
90+
cfg, err := buildGRPCLokiConfig(c)
91+
if err != nil {
92+
return nil, err
93+
}
94+
95+
client, err := grpc.NewWithLogger(cfg, logAdapter.NewLogger(log.WithField("module", "export/loki-grpc")))
96+
if err != nil {
97+
return nil, fmt.Errorf("failed to create gRPC Loki client: %w", err)
98+
}
99+
100+
return client, nil
101+
}
102+
103+
func buildHTTPLokiConfig(c *api.WriteLoki) (loki.Config, error) {
65104
batchWait, err := time.ParseDuration(c.BatchWait)
66105
if err != nil {
67106
return loki.Config{}, fmt.Errorf("failed in parsing BatchWait : %w", err)
@@ -105,6 +144,69 @@ func buildLokiConfig(c *api.WriteLoki) (loki.Config, error) {
105144
return cfg, nil
106145
}
107146

147+
func buildGRPCLokiConfig(c *api.WriteLoki) (grpc.Config, error) {
148+
if c.GRPCConfig == nil {
149+
return grpc.Config{}, fmt.Errorf("gRPC config is required for gRPC client protocol")
150+
}
151+
152+
batchWait, err := time.ParseDuration(c.BatchWait)
153+
if err != nil {
154+
return grpc.Config{}, fmt.Errorf("failed in parsing BatchWait: %w", err)
155+
}
156+
157+
timeout, err := time.ParseDuration(c.Timeout)
158+
if err != nil {
159+
return grpc.Config{}, fmt.Errorf("failed in parsing Timeout: %w", err)
160+
}
161+
162+
minBackoff, err := time.ParseDuration(c.MinBackoff)
163+
if err != nil {
164+
return grpc.Config{}, fmt.Errorf("failed in parsing MinBackoff: %w", err)
165+
}
166+
167+
maxBackoff, err := time.ParseDuration(c.MaxBackoff)
168+
if err != nil {
169+
return grpc.Config{}, fmt.Errorf("failed in parsing MaxBackoff: %w", err)
170+
}
171+
172+
keepAlive, err := time.ParseDuration(c.GRPCConfig.KeepAlive)
173+
if err != nil {
174+
return grpc.Config{}, fmt.Errorf("failed in parsing KeepAlive: %w", err)
175+
}
176+
177+
keepAliveTimeout, err := time.ParseDuration(c.GRPCConfig.KeepAliveTimeout)
178+
if err != nil {
179+
return grpc.Config{}, fmt.Errorf("failed in parsing KeepAliveTimeout: %w", err)
180+
}
181+
182+
cfg := grpc.Config{
183+
ServerAddress: c.URL,
184+
TenantID: c.TenantID,
185+
BatchWait: batchWait,
186+
BatchSize: c.BatchSize,
187+
Timeout: timeout,
188+
KeepAlive: keepAlive,
189+
KeepAliveTimeout: keepAliveTimeout,
190+
BackoffConfig: backoff.BackoffConfig{
191+
MinBackoff: minBackoff,
192+
MaxBackoff: maxBackoff,
193+
MaxRetries: c.MaxRetries,
194+
},
195+
}
196+
197+
// Set external labels from static labels
198+
if len(c.StaticLabels) > 0 {
199+
cfg.ExternalLabels.LabelSet = c.StaticLabels
200+
}
201+
202+
// Configure TLS from shared ClientConfig (same as HTTP client)
203+
if c.ClientConfig != nil {
204+
cfg.TLS = c.ClientConfig.TLSConfig
205+
}
206+
207+
return cfg, nil
208+
}
209+
108210
func (l *Loki) ProcessRecord(in config.GenericMap) error {
109211
labels, lines := l.splitLabelsLines(in)
110212

@@ -219,13 +321,10 @@ func NewWriteLoki(opMetrics *operational.Metrics, params config.StageParam) (*Lo
219321
return nil, fmt.Errorf("the provided config is not valid: %w", err)
220322
}
221323

222-
lokiConfig, buildconfigErr := buildLokiConfig(&lokiConfigIn)
223-
if buildconfigErr != nil {
224-
return nil, buildconfigErr
225-
}
226-
client, newWithLoggerErr := loki.NewWithLogger(lokiConfig, logAdapter.NewLogger(log.WithField("module", "export/loki")))
227-
if newWithLoggerErr != nil {
228-
return nil, newWithLoggerErr
324+
// Create the appropriate client based on clientProtocol
325+
client, err := createLokiClient(&lokiConfigIn)
326+
if err != nil {
327+
return nil, fmt.Errorf("failed to create Loki client: %w", err)
229328
}
230329

231330
timestampScale, err := time.ParseDuration(lokiConfigIn.TimestampScale)
@@ -253,7 +352,6 @@ func NewWriteLoki(opMetrics *operational.Metrics, params config.StageParam) (*Lo
253352

254353
f := formatter(lokiConfigIn.Format, lokiConfigIn.Reorder)
255354
l := &Loki{
256-
lokiConfig: lokiConfig,
257355
apiConfig: lokiConfigIn,
258356
timestampScale: float64(timestampScale),
259357
saneLabels: saneLabels,

0 commit comments

Comments
 (0)