Skip to content

Commit 261f89d

Browse files
adri1197Adrian Fernandez De La Torre
authored andcommitted
Setup OTEL initial implementation
Signed-off-by: Adrian Fernandez De La Torre <adri1197@gmail.com>
1 parent c2d0f5e commit 261f89d

File tree

7 files changed

+432
-4
lines changed

7 files changed

+432
-4
lines changed

go.mod

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ require (
4141
github.com/spf13/pflag v1.0.7
4242
github.com/stretchr/testify v1.10.0
4343
gitlab.com/gitlab-org/api/client-go v0.137.0
44+
go.opentelemetry.io/otel v1.37.0
45+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.37.0
46+
go.opentelemetry.io/otel/sdk v1.37.0
47+
go.opentelemetry.io/otel/trace v1.37.0
4448
golang.org/x/oauth2 v0.30.0
4549
golang.org/x/text v0.27.0
4650
google.golang.org/api v0.243.0
@@ -78,6 +82,7 @@ require (
7882
github.com/blang/semver/v4 v4.0.0 // indirect
7983
github.com/bradleyfalzon/ghinstallation/v2 v2.16.0 // indirect
8084
github.com/carapace-sh/carapace-shlex v1.0.1 // indirect
85+
github.com/cenkalti/backoff/v5 v5.0.2 // indirect
8186
github.com/cespare/xxhash/v2 v2.3.0 // indirect
8287
github.com/chai2010/gettext-go v1.0.2 // indirect
8388
github.com/cloudevents/sdk-go/v2 v2.15.2 // indirect
@@ -122,6 +127,7 @@ require (
122127
github.com/googleapis/gax-go/v2 v2.15.0 // indirect
123128
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
124129
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
130+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect
125131
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
126132
github.com/hashicorp/go-version v1.7.0 // indirect
127133
github.com/inconshreveable/mousetrap v1.1.0 // indirect
@@ -167,9 +173,9 @@ require (
167173
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
168174
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 // indirect
169175
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect
170-
go.opentelemetry.io/otel v1.37.0 // indirect
176+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect
171177
go.opentelemetry.io/otel/metric v1.37.0 // indirect
172-
go.opentelemetry.io/otel/trace v1.37.0 // indirect
178+
go.opentelemetry.io/proto/otlp v1.7.0 // indirect
173179
go.uber.org/multierr v1.11.0 // indirect
174180
go.uber.org/zap v1.27.0 // indirect
175181
go.yaml.in/yaml/v2 v2.4.2 // indirect

go.sum

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ github.com/carapace-sh/carapace-shlex v1.0.1 h1:ww0JCgWpOVuqWG7k3724pJ18Lq8gh5pH
7474
github.com/carapace-sh/carapace-shlex v1.0.1/go.mod h1:lJ4ZsdxytE0wHJ8Ta9S7Qq0XpjgjU0mdfCqiI2FHx7M=
7575
github.com/cdevents/sdk-go v0.4.1 h1:Cr/iH/I51Z+slxKRx9AV7stn6hr2pjRHQ5wpPJhRLTU=
7676
github.com/cdevents/sdk-go v0.4.1/go.mod h1:3IhWLoY4vsyUEzv7XJbyr0BRQ0KPgvNx+wiD2hQGFNU=
77+
github.com/cenkalti/backoff/v5 v5.0.2 h1:rIfFVxEf1QsI7E1ZHfp/B4DF/6QBAUhmgkxc0H7Zss8=
78+
github.com/cenkalti/backoff/v5 v5.0.2/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
7779
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
7880
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
7981
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
@@ -260,6 +262,8 @@ github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5T
260262
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA=
261263
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA=
262264
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
265+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 h1:X5VWvz21y3gzm9Nw/kaUeku/1+uBhcekkmy4IkffJww=
266+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1/go.mod h1:Zanoh4+gvIgluNqcfMVTJueD4wSS5hT7zTt4Mrutd90=
263267
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
264268
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
265269
github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k=
@@ -419,14 +423,20 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 h1:F7Jx+6h
419423
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0/go.mod h1:UHB22Z8QsdRDrnAtX4PntOl36ajSxcdUMt1sF7Y6E7Q=
420424
go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
421425
go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I=
426+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 h1:Ahq7pZmv87yiyn3jeFz/LekZmPLLdKejuO3NcK9MssM=
427+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0/go.mod h1:MJTqhM0im3mRLw1i8uGHnCvUEeS7VwRyxlLC78PA18M=
428+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.37.0 h1:bDMKF3RUSxshZ5OjOTi8rsHGaPKsAt76FaqgvIUySLc=
429+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.37.0/go.mod h1:dDT67G/IkA46Mr2l9Uj7HsQVwsjASyV9SjGofsiUZDA=
422430
go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE=
423431
go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E=
424-
go.opentelemetry.io/otel/sdk v1.36.0 h1:b6SYIuLRs88ztox4EyrvRti80uXIFy+Sqzoh9kFULbs=
425-
go.opentelemetry.io/otel/sdk v1.36.0/go.mod h1:+lC+mTgD+MUWfjJubi2vvXWcVxyr9rmlshZni72pXeY=
432+
go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI=
433+
go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg=
426434
go.opentelemetry.io/otel/sdk/metric v1.36.0 h1:r0ntwwGosWGaa0CrSt8cuNuTcccMXERFwHX4dThiPis=
427435
go.opentelemetry.io/otel/sdk/metric v1.36.0/go.mod h1:qTNOhFDfKRwX0yXOqJYegL5WRaW376QbB7P4Pb0qva4=
428436
go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4=
429437
go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
438+
go.opentelemetry.io/proto/otlp v1.7.0 h1:jX1VolD6nHuFzOYso2E73H85i92Mv8JQYk0K9vz09os=
439+
go.opentelemetry.io/proto/otlp v1.7.0/go.mod h1:fSKjH6YJ7HDlwzltzyMj036AJ3ejJLCgCSHGj4efDDo=
430440
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
431441
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
432442
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=

internal/notifier/forwarder.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"net/url"
2727

2828
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
29+
"sigs.k8s.io/controller-runtime/pkg/log"
2930

3031
"github.com/hashicorp/go-retryablehttp"
3132
)
@@ -96,6 +97,13 @@ func (f *Forwarder) Post(ctx context.Context, event eventv1.Event) error {
9697
opts = append(opts, withTLSConfig(f.TLSConfig))
9798
}
9899

100+
// Wrap it as a new notifier and post the traces
101+
OTELTraceNotifier := NewOTLPTracer(f.URL, f.ProxyURL, f.Headers, f.TLSConfig)
102+
traceErr := OTELTraceNotifier.Post(ctx, event)
103+
if traceErr != nil {
104+
log.FromContext(ctx).Error(nil, "warning: failed to send OTEL trace", traceErr)
105+
}
106+
99107
if err := postMessage(ctx, f.URL, event, opts...); err != nil {
100108
return fmt.Errorf("postMessage failed: %w", err)
101109
}

internal/notifier/otel.go

Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
/*
2+
Copyright 2025 The Flux authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package notifier
18+
19+
import (
20+
"context"
21+
"crypto/sha256"
22+
"crypto/tls"
23+
"fmt"
24+
"net/http"
25+
"net/url"
26+
"slices"
27+
"strings"
28+
29+
apiv1beta3 "github.com/fluxcd/notification-controller/api/v1beta3"
30+
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
31+
"go.opentelemetry.io/otel"
32+
"go.opentelemetry.io/otel/attribute"
33+
"go.opentelemetry.io/otel/codes"
34+
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
35+
"go.opentelemetry.io/otel/sdk/resource"
36+
sdktrace "go.opentelemetry.io/otel/sdk/trace"
37+
semconv "go.opentelemetry.io/otel/semconv/v1.34.0"
38+
"go.opentelemetry.io/otel/trace"
39+
"sigs.k8s.io/controller-runtime/pkg/log"
40+
)
41+
42+
type OTLPTracer struct {
43+
URL string
44+
ProxyURL string
45+
Headers map[string]string
46+
TLSConfig *tls.Config
47+
}
48+
49+
func NewOTLPTracer(url string, proxyURL string, headers map[string]string, tlsConfig *tls.Config) *OTLPTracer {
50+
return &OTLPTracer{
51+
URL: url,
52+
ProxyURL: proxyURL,
53+
Headers: headers,
54+
TLSConfig: tlsConfig,
55+
}
56+
}
57+
58+
// Post implements the notifier.Interface
59+
func (t *OTLPTracer) Post(ctx context.Context, event eventv1.Event) error {
60+
logger := log.FromContext(ctx).WithValues(
61+
"event", event.Reason,
62+
"object", fmt.Sprintf("%s/%s/%s", event.InvolvedObject.Kind, event.InvolvedObject.Namespace, event.InvolvedObject.Name),
63+
"severity", event.Severity,
64+
)
65+
66+
// Set up OTLP exporter options
67+
logger.V(1).Info("Configuring OTLP HTTP options", "url", t.URL)
68+
// Parse URL to extract host and port
69+
parsedURL, err := url.Parse(t.URL)
70+
if err != nil {
71+
logger.Error(err, "Failed to parse URL", "url", t.URL)
72+
return fmt.Errorf("failed to parse URL: %w", err)
73+
}
74+
httpOptions := []otlptracehttp.Option{
75+
otlptracehttp.WithEndpoint(parsedURL.Host),
76+
}
77+
78+
// Add headers if available
79+
if len(t.Headers) > 0 {
80+
logger.V(1).Info("Adding headers to OTLP exporter", "headerCount", len(t.Headers))
81+
httpOptions = append(httpOptions, otlptracehttp.WithHeaders(t.Headers))
82+
}
83+
84+
// Add TLS config if available
85+
if t.TLSConfig != nil {
86+
logger.V(1).Info("Configuring TLS for OTLP exporter")
87+
httpOptions = append(httpOptions, otlptracehttp.WithTLSClientConfig(t.TLSConfig))
88+
} else if parsedURL.Scheme == "http" {
89+
logger.V(1).Info("Using insecure connection for OTLP exporter")
90+
httpOptions = append(httpOptions, otlptracehttp.WithInsecure())
91+
}
92+
93+
// Add proxy if available
94+
if t.ProxyURL != "" {
95+
logger.V(1).Info("Setting up Proxy URL for OTLP exporter", "proxyURL", t.ProxyURL)
96+
proxyURL, err := url.Parse(t.ProxyURL)
97+
if err != nil {
98+
logger.Error(err, "Failed to parse proxy URL", "proxyURL", t.ProxyURL)
99+
} else {
100+
httpOptions = append(httpOptions, otlptracehttp.WithProxy(func(*http.Request) (*url.URL, error) {
101+
return proxyURL, nil
102+
}))
103+
}
104+
}
105+
106+
// Create exporter
107+
logger.V(1).Info("Creating OTLP exporter")
108+
exporter, err := otlptracehttp.New(ctx, httpOptions...)
109+
if err != nil {
110+
return fmt.Errorf("failed to create OTLP exporter: %w", err)
111+
}
112+
113+
// Extract revision from event metadata
114+
revision := ""
115+
for k, v := range event.Metadata {
116+
if strings.Contains(k, "revision") {
117+
revision = v
118+
logger.V(1).Info("Found revision in metadata", "revision", revision)
119+
break
120+
}
121+
}
122+
123+
// Get value from context (this would need to be passed in from event_handlers.go)
124+
alertUID, ok := ctx.Value("alertUID").(string)
125+
if !ok {
126+
alertUID = "unknown"
127+
logger.V(1).Info("alertUID not found in context, using default", "alertUID", alertUID)
128+
} else {
129+
logger.V(1).Info("Using alertUID from context", "alertUID", alertUID)
130+
}
131+
alertName, ok := ctx.Value("alertName").(string)
132+
if !ok {
133+
alertName = "unknown"
134+
logger.V(1).Info("alertName not found in context, using default", "alertName", alertName)
135+
} else {
136+
logger.V(1).Info("Using alertName from context", "alertName", alertName)
137+
}
138+
alertNamespace, ok := ctx.Value("alertNamespace").(string)
139+
if !ok {
140+
alertNamespace = "unknown"
141+
logger.V(1).Info("alertNamespace not found in context, using default", "alertNamespace", alertNamespace)
142+
} else {
143+
logger.V(1).Info("Using alertNamespace from context", "alertNamespace", alertNamespace)
144+
}
145+
146+
// Create trace provider with resource attributes
147+
logger.V(1).Info("Creating trace provider")
148+
serviceName := fmt.Sprintf("%s: %s/%s", apiv1beta3.AlertKind, alertNamespace, alertName)
149+
resource := resource.NewWithAttributes(
150+
semconv.SchemaURL,
151+
semconv.ServiceInstanceID(alertUID),
152+
semconv.ServiceName(serviceName),
153+
semconv.ServiceNamespace(alertNamespace),
154+
)
155+
tp := sdktrace.NewTracerProvider(
156+
sdktrace.WithBatcher(exporter),
157+
sdktrace.WithResource(resource),
158+
)
159+
160+
// Setup global tracer provider
161+
otel.SetTracerProvider(tp)
162+
163+
// Tracer instatiation for span creation
164+
tracer := otel.Tracer("flux:notification-controller")
165+
166+
// Generate the following IDs:
167+
// - SpanID: <AlertUID>:<AlertNamespace/AlertName>
168+
// - TraceID: <AlertUID>:<revisionID>
169+
logger.V(1).Info("Generating trace IDs", "alertUID", alertUID, "revision", revision)
170+
spanIDStr := generateID(alertUID, fmt.Sprintf("%s/%s", alertNamespace, alertName))
171+
traceIDStr := generateID(alertUID, revision)
172+
173+
var traceID trace.TraceID
174+
var spanID trace.SpanID
175+
copy(traceID[:], traceIDStr[:16])
176+
copy(spanID[:], spanIDStr[:8])
177+
178+
// Create trace context with the generated ID
179+
var spanCtx context.Context
180+
181+
// Replace trace context to use Alert UID + revision
182+
logger.V(1).Info("Trace context", "kind", event.InvolvedObject.Kind)
183+
// Create new context for root span
184+
currentSpanContext := trace.SpanContextFromContext(ctx)
185+
186+
// For source objects: create root span with custom traceID
187+
if isSource(event.InvolvedObject.Kind) {
188+
logger.V(1).Info("Create a new trace", "traceID", traceID.String())
189+
spanCtx = trace.ContextWithSpanContext(ctx,
190+
trace.NewSpanContext(trace.SpanContextConfig{
191+
TraceID: traceID,
192+
SpanID: spanID,
193+
TraceFlags: trace.FlagsSampled,
194+
}),
195+
)
196+
} else {
197+
// For non-source objects: use existing trace context (becomes child)
198+
if currentSpanContext.IsValid() {
199+
logger.V(1).Info("Creating child span", "parentTraceID", traceID.String())
200+
spanCtx = ctx // Use existing context as parent
201+
} else {
202+
// Fallback: create context with same traceID but no parent
203+
logger.V(1).Info("Creating orphan span with shared traceID", "traceID", traceID.String())
204+
spanCtx = trace.ContextWithSpanContext(ctx,
205+
trace.NewSpanContext(trace.SpanContextConfig{
206+
TraceID: traceID,
207+
TraceFlags: trace.FlagsSampled,
208+
}),
209+
)
210+
}
211+
}
212+
213+
// Create single span with proper attributes
214+
spanName := fmt.Sprintf("%s: %s/%s", event.InvolvedObject.Kind, event.InvolvedObject.Namespace, event.InvolvedObject.Name)
215+
_, span := tracer.Start(spanCtx, spanName,
216+
trace.WithAttributes(
217+
attribute.String("flux.object.uid", string(event.InvolvedObject.UID)),
218+
attribute.String("flux.object.kind", event.InvolvedObject.Kind),
219+
attribute.String("flux.object.name", event.InvolvedObject.Name),
220+
attribute.String("flux.object.namespace", event.InvolvedObject.Namespace),
221+
attribute.String("flux.event.severity", event.Severity),
222+
attribute.String("flux.event.reason", event.Reason),
223+
attribute.String("flux.event.message", event.Message),
224+
),
225+
trace.WithTimestamp(event.Timestamp.Time),
226+
)
227+
228+
// Add metadata attributes
229+
for k, v := range event.Metadata {
230+
span.SetAttributes(attribute.String(fmt.Sprintf("flux.event.metadata.%s", k), v))
231+
}
232+
233+
// Set status based on event severity
234+
if event.Severity == eventv1.EventSeverityError {
235+
span.SetStatus(codes.Error, event.Message)
236+
} else {
237+
span.SetStatus(codes.Ok, event.Message)
238+
}
239+
240+
logger.Info("Successfully sent trace to OTLP endpoint",
241+
"url", t.URL,
242+
"object", fmt.Sprintf("%s/%s/%s", event.InvolvedObject.Kind, event.InvolvedObject.Namespace, event.InvolvedObject.Name),
243+
"reason", event.Reason)
244+
245+
defer func() {
246+
span.End()
247+
tp.ForceFlush(ctx)
248+
tp.Shutdown(ctx)
249+
exporter.Shutdown(ctx)
250+
}()
251+
252+
return nil
253+
}
254+
255+
// Add this function to generate trace and span ID
256+
func generateID(alertUID, sourceRevision string) []byte {
257+
input := fmt.Sprintf("%s:%s", alertUID, sourceRevision)
258+
hash := sha256.Sum256([]byte(input))
259+
return hash[:]
260+
}
261+
262+
func isSource(kind string) bool {
263+
sourceKinds := []string{"GitRepository", "HelmRepository", "OCIRepository", "Bucket"}
264+
return slices.Contains(sourceKinds, kind)
265+
}

0 commit comments

Comments
 (0)