-
Notifications
You must be signed in to change notification settings - Fork 149
[RFC-0010] Add object-level workload identity support to Google Pub/Sub notifier #1154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RFC-0010] Add object-level workload identity support to Google Pub/Sub notifier #1154
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this PR!
internal/notifier/google_helpers.go
Outdated
ServiceAccountName string | ||
ProviderName string | ||
ProviderNamespace string | ||
ProxyURL string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are introducing proxy support for authentication, we could also introduce proxy support for the whole operation, i.e. for interacting with the Pub/Sub API itself. For this we need to build a custom Google transport, like this:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! I think we could address this in a follow-up PR to keep the current implementation focused on the core workload identity functionality.
@matheuscscp I also updated the documentation, including the parts related to Azure that were implemented earlier. |
5a87795
to
1d04f98
Compare
1d04f98
to
58322f3
Compare
aa0f80b
to
91cc08d
Compare
[{
"level": "error",
"ts": "2025-08-02T16:50:26.004Z",
"logger": "event-server",
"msg": "failed to send notification",
"eventInvolvedObject": {
"kind": "OCIRepository",
"namespace": "flux-system",
"name": "flux-system",
"uid": "f2dde3e6-6411-498a-b26c-1c14c00b36ac",
"apiVersion": "source.toolkit.fluxcd.io/v1",
"resourceVersion": "76746"
},
"alert": {
"name": "pubsub",
"namespace": "flux-system",
"providerName": "pubsub"
},
"error": "error publishing event to topic projects/flux-gitops-playground/topics/test-flux: rpc error: code = Unauthenticated desc = transport: per-RPC creds failed due to error: failed to get service account 'flux-system/pubsub': Timeout: failed waiting for *v1.ServiceAccount Informer to sync"
},
{
"level": "error",
"ts": "2025-08-02T16:54:13.860Z",
"logger": "event-server",
"msg": "failed to send notification",
"eventInvolvedObject": {
"kind": "OCIRepository",
"namespace": "flux-system",
"name": "flux-system",
"uid": "f2dde3e6-6411-498a-b26c-1c14c00b36ac",
"apiVersion": "source.toolkit.fluxcd.io/v1",
"resourceVersion": "88975"
},
"alert": {
"name": "pubsub",
"namespace": "flux-system",
"providerName": "pubsub"
},
"error": "error publishing event to topic projects/flux-gitops-playground/topics/test-flux: rpc error: code = Unauthenticated desc = transport: per-RPC creds failed due to error: failed to create kubernetes token for service account 'flux-system/pubsub': context canceled"
}] I'm trying to investigate why this is happening, but normally I'd expect that a canceled/done context is being passed 🤔 |
I’m looking into the code to try to find the cause. |
The logs above are for object-level. The same happens at the controller level: {
"level": "error",
"ts": "2025-08-02T17:35:23.582Z",
"logger": "event-server",
"msg": "failed to send notification",
"eventInvolvedObject": {
"kind": "OCIRepository",
"namespace": "flux-system",
"name": "flux-system",
"uid": "f2dde3e6-6411-498a-b26c-1c14c00b36ac",
"apiVersion": "source.toolkit.fluxcd.io/v1",
"resourceVersion": "109404"
},
"alert": {
"name": "pubsub",
"namespace": "flux-system",
"providerName": "pubsub"
},
"error": "error publishing event to topic projects/flux-gitops-playground/topics/test-flux: rpc error: code = Unauthenticated desc = transport: per-RPC creds failed due to error: failed to create provider access token for the controller: oauth2/google: invalid response from Secure Token Server: Post \"https://sts.googleapis.com/v1/token\": context canceled"
} Gives strength to the theory of a bad context being passed somewhere |
It's not here, I added a log and it doesn't get printed: if ctx.Err() != nil {
log.FromContext(ctx).Error(ctx.Err(), "context error while building GCP client options")
}
tokenSource := gcp.NewTokenSource(ctx, authOpts...)
clientOpts = append(clientOpts, option.WithTokenSource(tokenSource)) |
ProblemThe workload identity implementation was causing "context canceled" errors in matheus's test environment:
Suspected CauseThe issue was likely caused by a context timeout mismatch in the notification pipeline:
Context Flow Analysis (generated by AI)
SolutionModified
|
Additional ContextFor context, this issue highlights a common Go design pattern consideration. The official Go documentation recommends:
This is discussed further in the Go blog post about context and structs. The Our fix works around this by providing a long-lived background context specifically for the TokenSource, while keeping the request-scoped timeout control separate in the notification goroutine. |
CorrectionI want to clarify point 3 in my previous analysis to be more accurate: Before (could be misinterpreted):
More accurate:
The issue wasn't with |
The |
Exactly! I reached the same conclusion. I was not observing 15secs between the event and the log, so the timeout was not because of that. It's because the The right fix here is, unfortunately, restructure the code so that we use the 15sec notification-controller/internal/server/event_handlers.go Lines 216 to 219 in eddaf14
It's not hard, though, we just have to pass all the required inputs further down to the The reason why we can't use |
Better yet, all of the code inside |
Turns out that this is not true, and to achieve using the diff --git a/internal/notifier/factory.go b/internal/notifier/factory.go
index 062ecae..f2dfdb8 100644
--- a/internal/notifier/factory.go
+++ b/internal/notifier/factory.go
@@ -263,21 +263,7 @@ func googleChatNotifierFunc(opts notifierOptions) (Interface, error) {
}
func googlePubSubNotifierFunc(opts notifierOptions) (Interface, error) {
- // NOTE: Use background context for GCP client options to avoid workload identity timeout.
- // opts.Context has a 15-second timeout which is too short for ServiceAccount informer
- // sync and token creation. The actual notification timeout is handled separately.
- clientOpts, err := buildGCPClientOptions(context.Background(), opts)
- if err != nil {
- return nil, err
- }
-
- return NewGooglePubSub(
- opts.Context,
- opts.URL,
- opts.Channel,
- opts.Headers,
- clientOpts,
- )
+ return NewGooglePubSub(&opts)
}
func webexNotifierFunc(opts notifierOptions) (Interface, error) {
diff --git a/internal/notifier/google_pubsub.go b/internal/notifier/google_pubsub.go
index 0bf92d0..0a66f7c 100644
--- a/internal/notifier/google_pubsub.go
+++ b/internal/notifier/google_pubsub.go
@@ -23,7 +23,6 @@ import (
"fmt"
"cloud.google.com/go/pubsub"
- "google.golang.org/api/option"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"sigs.k8s.io/controller-runtime/pkg/log"
@@ -33,18 +32,13 @@ import (
type (
// GooglePubSub holds a Google Pub/Sub client and target topic.
GooglePubSub struct {
- topicID string
- attrs map[string]string
- topicName string
-
client interface {
- publish(ctx context.Context, topicID string, eventPayload []byte, attrs map[string]string) (serverID string, err error)
+ publish(ctx context.Context, eventPayload []byte) error
}
}
googlePubSubClient struct {
- projectID string
- opts []option.ClientOption
+ opts *notifierOptions
}
)
@@ -53,29 +47,15 @@ var _ Interface = &GooglePubSub{}
// NewGooglePubSub creates a Google Pub/Sub client tied to a specific
// project and topic using the provided client options.
-func NewGooglePubSub(ctx context.Context, projectID, topicID string, attrs map[string]string,
- clientOpts []option.ClientOption) (*GooglePubSub, error) {
- if projectID == "" {
+func NewGooglePubSub(opts *notifierOptions) (*GooglePubSub, error) {
+ if opts.URL == "" {
return nil, errors.New("GCP project ID cannot be empty")
}
- if topicID == "" {
+ if opts.Channel == "" {
return nil, errors.New("GCP Pub/Sub topic ID cannot be empty")
}
- if len(attrs) == 0 {
- attrs = nil
- }
-
- client := &googlePubSubClient{
- projectID: projectID,
- opts: clientOpts,
- }
-
- return &GooglePubSub{
- topicID: topicID,
- attrs: attrs,
- topicName: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
- client: client,
- }, nil
+ client := &googlePubSubClient{opts}
+ return &GooglePubSub{client}, nil
}
// Post posts Flux events to a Google Pub/Sub topic.
@@ -90,24 +70,22 @@ func (g *GooglePubSub) Post(ctx context.Context, event eventv1.Event) error {
return fmt.Errorf("error json-marshaling event: %w", err)
}
- serverID, err := g.client.publish(ctx, g.topicID, eventPayload, g.attrs)
- if err != nil {
- return fmt.Errorf("error publishing event to topic %s: %w", g.topicName, err)
- }
-
- // debug log
- log.FromContext(ctx).V(1).Info("Event published to GCP Pub/Sub topic",
- "topic", g.topicName,
- "server message id", serverID)
-
- return nil
+ return g.client.publish(ctx, eventPayload)
}
-func (g *googlePubSubClient) publish(ctx context.Context, topicID string, eventPayload []byte, attrs map[string]string) (serverID string, err error) {
- var client *pubsub.Client
- client, err = pubsub.NewClient(ctx, g.projectID, g.opts...)
+func (g *googlePubSubClient) publish(ctx context.Context, eventPayload []byte) error {
+ projectID := g.opts.URL
+ topicID := g.opts.Channel
+ topicName := fmt.Sprintf("projects/%s/topics/%s", projectID, topicID)
+
+ // Build client.
+ opts, err := buildGCPClientOptions(ctx, *g.opts)
+ if err != nil {
+ return err
+ }
+ client, err := pubsub.NewClient(ctx, projectID, opts...)
if err != nil {
- return
+ return err
}
defer func() {
if closeErr := client.Close(); closeErr != nil {
@@ -118,12 +96,27 @@ func (g *googlePubSubClient) publish(ctx context.Context, topicID string, eventP
}
}
}()
- serverID, err = client.
+
+ // Publish the event to the topic.
+ attrs := g.opts.Headers
+ if len(attrs) == 0 {
+ attrs = nil
+ }
+ serverID, err := client.
Topic(topicID).
Publish(ctx, &pubsub.Message{
Data: eventPayload,
Attributes: attrs,
}).
Get(ctx)
- return
+ if err != nil {
+ return fmt.Errorf("error publishing to GCP Pub/Sub topic %s: %w", topicName, err)
+ }
+
+ // Emit debug log.
+ log.FromContext(ctx).V(1).Info("Event published to GCP Pub/Sub topic",
+ "topic", topicName,
+ "server message id", serverID)
+
+ return nil
}
diff --git a/internal/notifier/google_pubsub_test.go b/internal/notifier/google_pubsub_test.go
index a94da52..d9e491a 100644
--- a/internal/notifier/google_pubsub_test.go
+++ b/internal/notifier/google_pubsub_test.go
@@ -19,7 +19,6 @@ package notifier
import (
"context"
"errors"
- "fmt"
"testing"
. "github.com/onsi/gomega"
@@ -50,60 +49,18 @@ func TestNewGooglePubSub(t *testing.T) {
topicID: "",
expectedErr: errors.New("GCP Pub/Sub topic ID cannot be empty"),
},
- {
- name: "topic name is stored properly",
- projectID: "project-id",
- topicID: "topic-id",
- expectedTopicName: "projects/project-id/topics/topic-id",
- },
- {
- name: "client options are stored properly",
- projectID: "project-id",
- topicID: "topic-id",
- expectedTopicName: "projects/project-id/topics/topic-id",
- clientOpts: []option.ClientOption{option.WithCredentialsJSON([]byte("json credentials"))},
- },
- {
- name: "non-empty attributes are stored properly",
- projectID: "project-id",
- topicID: "topic-id",
- expectedTopicName: "projects/project-id/topics/topic-id",
- attrs: map[string]string{"foo": "bar"},
- expectedAttrs: map[string]string{"foo": "bar"},
- },
- {
- name: "empty attributes are stored properly",
- projectID: "project-id",
- topicID: "topic-id",
- expectedTopicName: "projects/project-id/topics/topic-id",
- attrs: map[string]string{},
- expectedAttrs: nil,
- },
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
- ctx := context.Background()
- provider, err := NewGooglePubSub(ctx, tt.projectID, tt.topicID, tt.attrs, tt.clientOpts)
+ provider, err := NewGooglePubSub(¬ifierOptions{
+ URL: tt.projectID,
+ Channel: tt.topicID,
+ })
- if tt.expectedErr != nil {
- g.Expect(err).To(Equal(tt.expectedErr))
- g.Expect(provider).To(BeNil())
- } else {
- g.Expect(err).To(BeNil())
- g.Expect(provider).NotTo(BeNil())
-
- g.Expect(provider.topicID).To(Equal(tt.topicID))
- g.Expect(provider.attrs).To(Equal(tt.expectedAttrs))
- g.Expect(provider.topicName).To(Equal(tt.expectedTopicName))
-
- g.Expect(provider.client).NotTo(BeNil())
-
- client := provider.client.(*googlePubSubClient)
- g.Expect(client).NotTo(BeNil())
- g.Expect(client.opts).To(Equal(tt.clientOpts))
- }
+ g.Expect(err).To(Equal(tt.expectedErr))
+ g.Expect(provider).To(BeNil())
})
}
}
@@ -123,14 +80,11 @@ type googlePubSubPostTestCase struct {
g *WithT
}
-func (tt *googlePubSubPostTestCase) publish(ctx context.Context, topicID string, eventPayload []byte, attrs map[string]string) (serverID string, err error) {
+func (tt *googlePubSubPostTestCase) publish(ctx context.Context, eventPayload []byte) error {
tt.g.THelper()
tt.publishExecuted = true
- tt.g.Expect(topicID).To(Equal(tt.topicID))
tt.g.Expect(string(eventPayload)).To(Equal(tt.expectedEventPayload))
- tt.g.Expect(attrs).To(Equal(tt.attrs))
- // serverID is only used in a debug log for now, there's no way to assert it
- return "", tt.publishErr
+ return tt.publishErr
}
func TestGooglePubSubPost(t *testing.T) {
@@ -151,11 +105,11 @@ func TestGooglePubSubPost(t *testing.T) {
publishShouldExecute: false,
},
{
- name: "publish error is wrapped and relayed",
+ name: "publish error is relayed",
expectedEventPayload: `{"involvedObject":{},"severity":"","timestamp":null,"message":"","reason":"","reportingController":""}`,
topicName: "projects/projectID/topics/topicID",
publishErr: errors.New("publish error"),
- expectedErr: fmt.Errorf("error publishing event to topic projects/projectID/topics/topicID: %w", errors.New("publish error")),
+ expectedErr: errors.New("publish error"),
publishShouldExecute: true,
},
{
@@ -173,10 +127,7 @@ func TestGooglePubSubPost(t *testing.T) {
tt.g = g
topic := &GooglePubSub{
- client: tt,
- topicID: tt.topicID,
- attrs: tt.attrs,
- topicName: tt.topicName,
+ client: tt,
}
err := topic.Post(context.Background(), tt.event) |
Sorry for making you do most of the implementation…! (good morning 🌞 ) |
There's nothing to be sorry about! It's my mistake not guiding you correctly beforehand, I definitely did not predict this issue we found out, it's for sure something to keep in mind more often. You are helping us out tons here! 😁 |
@matheuscscp I’ve merged your changes in 9021d14. That was an awesome refactor—thanks a lot! |
If you've finished reviewing the changes, let me know and I'll squash the commits 🙏 |
I did a last-minute edit on this comment after catching a problem (passing wrong arg to diff --git a/internal/notifier/google_pubsub.go b/internal/notifier/google_pubsub.go
index 52b2bb7..0dd4ef9 100644
--- a/internal/notifier/google_pubsub.go
+++ b/internal/notifier/google_pubsub.go
@@ -103,7 +103,7 @@ func (g *googlePubSubClient) publish(ctx context.Context, eventPayload []byte) e
}
topic := fmt.Sprintf("projects/%s/topics/%s", projectID, topicID)
serverID, err := client.
- Topic(topic).
+ Topic(topicID).
Publish(ctx, &pubsub.Message{
Data: eventPayload,
Attributes: attrs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! 🚀
Thanks very much @cappyzawa ❤️
I was able to test both controller-level and object-level workload identity for the commit 9021d14:
{
"involvedObject": {
"kind": "OCIRepository",
"namespace": "flux-system",
"name": "flux-system",
"uid": "f2dde3e6-6411-498a-b26c-1c14c00b36ac",
"apiVersion": "source.toolkit.fluxcd.io/v1",
"resourceVersion": "247276"
},
"severity": "info",
"timestamp": "2025-08-02T22:21:54Z",
"message": "stored artifact with revision 'latest@sha256:<redacted>' from 'oci://<redacted>', origin source 'https://<redacted>', origin revision 'refs/heads/main@sha1:<redacted>'",
"reason": "NewArtifact",
"metadata": {
"authMode": "controller-level",
"cluster": "preview",
"revision": "latest@sha256:<redacted>"
},
"reportingController": "source-controller",
"reportingInstance": "source-controller-775f75d5b4-6rll5"
}
{
"involvedObject": {
"kind": "OCIRepository",
"namespace": "flux-system",
"name": "flux-system",
"uid": "f2dde3e6-6411-498a-b26c-1c14c00b36ac",
"apiVersion": "source.toolkit.fluxcd.io/v1",
"resourceVersion": "246335"
},
"severity": "info",
"timestamp": "2025-08-02T22:18:58Z",
"message": "stored artifact with revision 'latest@sha256:<redacted>' from 'oci://<redacted>', origin source 'https://<redacted>', origin revision 'refs/heads/main@sha1:<redacted>'",
"reason": "NewArtifact",
"metadata": {
"authMode": "object-level",
"cluster": "preview",
"revision": "latest@sha256:<redacted>"
},
"reportingController": "source-controller",
"reportingInstance": "source-controller-775f75d5b4-6rll5"
}
I added the authMode
metadata key to the event through the .spec.eventMetadata
API:
apiVersion: notification.toolkit.fluxcd.io/v1beta3
kind: Alert
metadata:
name: pubsub
namespace: flux-system
spec:
providerRef:
name: pubsub
eventMetadata:
cluster: preview
authMode: controller-level # and object-level while testing .spec.serviceAccountName set on the Provider object
eventSeverity: info
eventSources:
- kind: OCIRepository
name: '*'
Add support for object-level GCP workload identity authentication to enable individual Providers to authenticate using their own ServiceAccount without needing to manage JSON credentials. This extends beyond the existing controller-level workload identity that is automatically handled by Google libraries. The implementation maintains backward compatibility by prioritizing JSON credentials when both authentication methods are available. Proxy support is also added following the Azure DevOps pattern for consistency across notifiers. This change is part of the broader effort to support multi-tenant workload identity across Flux controllers (RFC-0010). Signed-off-by: cappyzawa <cappyzawa@gmail.com>
Add comprehensive workload identity documentation for both Google Pub/Sub and Azure DevOps providers. Include controller-level and object-level authentication patterns with feature gate requirements and setup instructions for multi-tenant environments. Signed-off-by: cappyzawa <cappyzawa@gmail.com>
9021d14
to
039cd81
Compare
Part of fluxcd/flux2#5022.