Skip to content

[RFC-0010] Add object-level workload identity support to Google Pub/Sub notifier #1154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

cappyzawa
Copy link
Member

Part of fluxcd/flux2#5022.

notification-controller > Google Pub/Sub

Copy link
Member

@matheuscscp matheuscscp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this PR!

ServiceAccountName string
ProviderName string
ProviderNamespace string
ProxyURL string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are introducing proxy support for authentication, we could also introduce proxy support for the whole operation, i.e. for interacting with the Pub/Sub API itself. For this we need to build a custom Google transport, like this:

https://github.com/fluxcd/source-controller/blob/c2b572bae03a53cef4355389494ea71097c6abfe/pkg/gcp/gcp.go#L90-L145

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I think we could address this in a follow-up PR to keep the current implementation focused on the core workload identity functionality.

@cappyzawa
Copy link
Member Author

@matheuscscp
Thanks for the review! I re-read the RFC and implemented my understanding in this PR.
e43b0b9

I also updated the documentation, including the parts related to Azure that were implemented earlier.
420901c

@cappyzawa cappyzawa force-pushed the feat/google-pubsub-workload-identity branch from 5a87795 to 1d04f98 Compare August 2, 2025 02:33
@cappyzawa cappyzawa force-pushed the feat/google-pubsub-workload-identity branch from 1d04f98 to 58322f3 Compare August 2, 2025 11:26
@cappyzawa cappyzawa changed the title [RFC-0010] Add workload identity support to Google Pub/Sub notifier [RFC-0010] Add object-level workload identity support to Google Pub/Sub notifier Aug 2, 2025
@cappyzawa cappyzawa force-pushed the feat/google-pubsub-workload-identity branch from aa0f80b to 91cc08d Compare August 2, 2025 12:36
@matheuscscp
Copy link
Member

[{
    "level": "error",
    "ts": "2025-08-02T16:50:26.004Z",
    "logger": "event-server",
    "msg": "failed to send notification",
    "eventInvolvedObject": {
        "kind": "OCIRepository",
        "namespace": "flux-system",
        "name": "flux-system",
        "uid": "f2dde3e6-6411-498a-b26c-1c14c00b36ac",
        "apiVersion": "source.toolkit.fluxcd.io/v1",
        "resourceVersion": "76746"
    },
    "alert": {
        "name": "pubsub",
        "namespace": "flux-system",
        "providerName": "pubsub"
    },
    "error": "error publishing event to topic projects/flux-gitops-playground/topics/test-flux: rpc error: code = Unauthenticated desc = transport: per-RPC creds failed due to error: failed to get service account 'flux-system/pubsub': Timeout: failed waiting for *v1.ServiceAccount Informer to sync"
},
{
    "level": "error",
    "ts": "2025-08-02T16:54:13.860Z",
    "logger": "event-server",
    "msg": "failed to send notification",
    "eventInvolvedObject": {
        "kind": "OCIRepository",
        "namespace": "flux-system",
        "name": "flux-system",
        "uid": "f2dde3e6-6411-498a-b26c-1c14c00b36ac",
        "apiVersion": "source.toolkit.fluxcd.io/v1",
        "resourceVersion": "88975"
    },
    "alert": {
        "name": "pubsub",
        "namespace": "flux-system",
        "providerName": "pubsub"
    },
    "error": "error publishing event to topic projects/flux-gitops-playground/topics/test-flux: rpc error: code = Unauthenticated desc = transport: per-RPC creds failed due to error: failed to create kubernetes token for service account 'flux-system/pubsub': context canceled"
}]

I'm trying to investigate why this is happening, but normally I'd expect that a canceled/done context is being passed 🤔

@cappyzawa
Copy link
Member Author

I’m looking into the code to try to find the cause.

@matheuscscp
Copy link
Member

The logs above are for object-level. The same happens at the controller level:

{
    "level": "error",
    "ts": "2025-08-02T17:35:23.582Z",
    "logger": "event-server",
    "msg": "failed to send notification",
    "eventInvolvedObject": {
        "kind": "OCIRepository",
        "namespace": "flux-system",
        "name": "flux-system",
        "uid": "f2dde3e6-6411-498a-b26c-1c14c00b36ac",
        "apiVersion": "source.toolkit.fluxcd.io/v1",
        "resourceVersion": "109404"
    },
    "alert": {
        "name": "pubsub",
        "namespace": "flux-system",
        "providerName": "pubsub"
    },
    "error": "error publishing event to topic projects/flux-gitops-playground/topics/test-flux: rpc error: code = Unauthenticated desc = transport: per-RPC creds failed due to error: failed to create provider access token for the controller: oauth2/google: invalid response from Secure Token Server: Post \"https://sts.googleapis.com/v1/token\": context canceled"
}

Gives strength to the theory of a bad context being passed somewhere

@matheuscscp
Copy link
Member

It's not here, I added a log and it doesn't get printed:

		if ctx.Err() != nil {
			log.FromContext(ctx).Error(ctx.Err(), "context error while building GCP client options")
		}
		tokenSource := gcp.NewTokenSource(ctx, authOpts...)
		clientOpts = append(clientOpts, option.WithTokenSource(tokenSource))

@cappyzawa
Copy link
Member Author

Problem

The workload identity implementation was causing "context canceled" errors in matheus's test environment:

"error": "rpc error: code = Unauthenticated desc = transport: per-RPC creds failed due to error: failed to get service account 'flux-system/pubsub': Timeout: failed waiting for *v1.ServiceAccount Informer to sync"

"error": "rpc error: code = Unauthenticated desc = transport: per-RPC creds failed due to error: failed to create kubernetes token for service account 'flux-system/pubsub': context canceled"

Suspected Cause

The issue was likely caused by a context timeout mismatch in the notification pipeline:

  1. Event Server Handler: Creates a 15-second timeout context for HTTP request processing
  2. Notifier Creation: The context flows through dispatchNotificationgetNotificationParamscreateNotifierbuildGCPClientOptions
  3. TokenSource Storage: gcp.NewTokenSource(ctx, ...) stores the 15-second context in its struct (Go anti-pattern)
  4. Workload Identity Setup: When TokenSource.Token() is called later, it uses the stored context for:
    • ServiceAccount informer synchronization
    • Kubernetes token creation via client.SubResource("token").Create(ctx, ...)
  5. Timeout: These operations can take longer than 15 seconds, causing context cancellation

Context Flow Analysis (generated by AI)

HTTP Request → handleEvent (15s timeout)
             ↓
             dispatchNotification
             ↓
             getNotificationParams  
             ↓
             createNotifier
             ↓
             buildGCPClientOptions → gcp.NewTokenSource(15s_ctx)
                                                        ↓
                                     [context stored in struct]
                                                        ↓
             [goroutine starts with fresh context]
             ↓
             notifier.Post(fresh_ctx) → TokenSource.Token()
                                                    ↓
                                        [uses old 15s context]
                                                    ↓
                                        auth.GetAccessToken(old_ctx)
                                                    ↓
                                        ServiceAccount operations → TIMEOUT!

Solution

Modified buildGCPClientOptions to use context.Background() instead of the HTTP request context:

  1. Function Signature: Changed to buildGCPClientOptions(ctx context.Context, opts notifierOptions) for explicit context control
  2. Background Context: Call with context.Background() to avoid timeout during workload identity setup
  3. Safety: The actual notification sending timeout is controlled separately in the goroutine with a fresh context

@cappyzawa
Copy link
Member Author

Additional Context

For context, this issue highlights a common Go design pattern consideration. The official Go documentation recommends:

Do not store Contexts inside a struct type; instead, pass a Context explicitly to each function that needs it.

This is discussed further in the Go blog post about context and structs. The gcp.NewTokenSource() function stores the context in its struct to match the oauth2.TokenSource interface, which predates Go's context patterns. This design pattern can lead to the timeout issues we encountered, where a short-lived context gets stored and used later when it may have already been canceled.

Our fix works around this by providing a long-lived background context specifically for the TokenSource, while keeping the request-scoped timeout control separate in the notification goroutine.

@cappyzawa
Copy link
Member Author

Correction

I want to clarify point 3 in my previous analysis to be more accurate:

Before (could be misinterpreted):

  1. TokenSource Storage: gcp.NewTokenSource(ctx, ...) stores the 15-second context in its struct (Go anti-pattern)

More accurate:

  1. Context Choice: We passed a 15-second timeout context to gcp.NewTokenSource(ctx, ...), which stores the context for later use

The issue wasn't with gcp.NewTokenSource's design (which reasonably stores context to work with the oauth2.TokenSource interface), but rather our choice to pass a short-lived HTTP request context to it. The function works as designed - we just needed to provide a more appropriate context for long-running operations.

@matheuscscp
Copy link
Member

The oauth2.TokenSource interface observation is most correct 👍 We'd love to be able to pass a ctx in the right place here, but the old, wide-spread interface does not allow us 😢

@matheuscscp
Copy link
Member

matheuscscp commented Aug 2, 2025

The issue wasn't with gcp.NewTokenSource's design (which reasonably stores context to work with the oauth2.TokenSource interface), but rather our choice to pass a short-lived HTTP request context to it. The function works as designed - we just needed to provide a more appropriate context for long-running operations.

Exactly! I reached the same conclusion. I was not observing 15secs between the event and the log, so the timeout was not because of that. It's because the .Post() operation is invoked on a goroutine! The HTTP request ctx is long gone.

The right fix here is, unfortunately, restructure the code so that we use the 15sec ctx created in the goroutine:

go func(n notifier.Interface, e eventv1.Event) {
pctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := n.Post(pctx, e); err != nil {

It's not hard, though, we just have to pass all the required inputs further down to the googlePubSubClient struct and call gcp.NewTokenSource() inside publish().

The reason why we can't use context.Background() for authentication is because we are doing some API calls to the Kubernetes API server and to GCP, so they could hang forever... Also the context created on the beginning of the goroutine should be used everywhere in the operation

@matheuscscp
Copy link
Member

Better yet, all of the code inside buildGCPClientOptions can actually run in Post(). The only code that needs to be placed inside publish() is code that we cannot run successfully in CI, which is pubsub.NewClient() and client.Topic().Publish().Get(). This way Post() is unit-testable through the googlePubSubPostTestCase mock.

@matheuscscp
Copy link
Member

matheuscscp commented Aug 2, 2025

Better yet, all of the code inside buildGCPClientOptions can actually run in Post(). The only code that needs to be placed inside publish() is code that we cannot run successfully in CI, which is pubsub.NewClient() and client.Topic().Publish().Get(). This way Post() is unit-testable through the googlePubSubPostTestCase mock.

Turns out that this is not true, and to achieve using the ctx created on the beginning of the goroutine quite a refactor is needed, see collapsed diff

diff --git a/internal/notifier/factory.go b/internal/notifier/factory.go
index 062ecae..f2dfdb8 100644
--- a/internal/notifier/factory.go
+++ b/internal/notifier/factory.go
@@ -263,21 +263,7 @@ func googleChatNotifierFunc(opts notifierOptions) (Interface, error) {
 }
 
 func googlePubSubNotifierFunc(opts notifierOptions) (Interface, error) {
-	// NOTE: Use background context for GCP client options to avoid workload identity timeout.
-	// opts.Context has a 15-second timeout which is too short for ServiceAccount informer
-	// sync and token creation. The actual notification timeout is handled separately.
-	clientOpts, err := buildGCPClientOptions(context.Background(), opts)
-	if err != nil {
-		return nil, err
-	}
-
-	return NewGooglePubSub(
-		opts.Context,
-		opts.URL,
-		opts.Channel,
-		opts.Headers,
-		clientOpts,
-	)
+	return NewGooglePubSub(&opts)
 }
 
 func webexNotifierFunc(opts notifierOptions) (Interface, error) {
diff --git a/internal/notifier/google_pubsub.go b/internal/notifier/google_pubsub.go
index 0bf92d0..0a66f7c 100644
--- a/internal/notifier/google_pubsub.go
+++ b/internal/notifier/google_pubsub.go
@@ -23,7 +23,6 @@ import (
 	"fmt"
 
 	"cloud.google.com/go/pubsub"
-	"google.golang.org/api/option"
 	kerrors "k8s.io/apimachinery/pkg/util/errors"
 	"sigs.k8s.io/controller-runtime/pkg/log"
 
@@ -33,18 +32,13 @@ import (
 type (
 	// GooglePubSub holds a Google Pub/Sub client and target topic.
 	GooglePubSub struct {
-		topicID   string
-		attrs     map[string]string
-		topicName string
-
 		client interface {
-			publish(ctx context.Context, topicID string, eventPayload []byte, attrs map[string]string) (serverID string, err error)
+			publish(ctx context.Context, eventPayload []byte) error
 		}
 	}
 
 	googlePubSubClient struct {
-		projectID string
-		opts      []option.ClientOption
+		opts *notifierOptions
 	}
 )
 
@@ -53,29 +47,15 @@ var _ Interface = &GooglePubSub{}
 
 // NewGooglePubSub creates a Google Pub/Sub client tied to a specific
 // project and topic using the provided client options.
-func NewGooglePubSub(ctx context.Context, projectID, topicID string, attrs map[string]string,
-	clientOpts []option.ClientOption) (*GooglePubSub, error) {
-	if projectID == "" {
+func NewGooglePubSub(opts *notifierOptions) (*GooglePubSub, error) {
+	if opts.URL == "" {
 		return nil, errors.New("GCP project ID cannot be empty")
 	}
-	if topicID == "" {
+	if opts.Channel == "" {
 		return nil, errors.New("GCP Pub/Sub topic ID cannot be empty")
 	}
-	if len(attrs) == 0 {
-		attrs = nil
-	}
-
-	client := &googlePubSubClient{
-		projectID: projectID,
-		opts:      clientOpts,
-	}
-
-	return &GooglePubSub{
-		topicID:   topicID,
-		attrs:     attrs,
-		topicName: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
-		client:    client,
-	}, nil
+	client := &googlePubSubClient{opts}
+	return &GooglePubSub{client}, nil
 }
 
 // Post posts Flux events to a Google Pub/Sub topic.
@@ -90,24 +70,22 @@ func (g *GooglePubSub) Post(ctx context.Context, event eventv1.Event) error {
 		return fmt.Errorf("error json-marshaling event: %w", err)
 	}
 
-	serverID, err := g.client.publish(ctx, g.topicID, eventPayload, g.attrs)
-	if err != nil {
-		return fmt.Errorf("error publishing event to topic %s: %w", g.topicName, err)
-	}
-
-	// debug log
-	log.FromContext(ctx).V(1).Info("Event published to GCP Pub/Sub topic",
-		"topic", g.topicName,
-		"server message id", serverID)
-
-	return nil
+	return g.client.publish(ctx, eventPayload)
 }
 
-func (g *googlePubSubClient) publish(ctx context.Context, topicID string, eventPayload []byte, attrs map[string]string) (serverID string, err error) {
-	var client *pubsub.Client
-	client, err = pubsub.NewClient(ctx, g.projectID, g.opts...)
+func (g *googlePubSubClient) publish(ctx context.Context, eventPayload []byte) error {
+	projectID := g.opts.URL
+	topicID := g.opts.Channel
+	topicName := fmt.Sprintf("projects/%s/topics/%s", projectID, topicID)
+
+	// Build client.
+	opts, err := buildGCPClientOptions(ctx, *g.opts)
+	if err != nil {
+		return err
+	}
+	client, err := pubsub.NewClient(ctx, projectID, opts...)
 	if err != nil {
-		return
+		return err
 	}
 	defer func() {
 		if closeErr := client.Close(); closeErr != nil {
@@ -118,12 +96,27 @@ func (g *googlePubSubClient) publish(ctx context.Context, topicID string, eventP
 			}
 		}
 	}()
-	serverID, err = client.
+
+	// Publish the event to the topic.
+	attrs := g.opts.Headers
+	if len(attrs) == 0 {
+		attrs = nil
+	}
+	serverID, err := client.
 		Topic(topicID).
 		Publish(ctx, &pubsub.Message{
 			Data:       eventPayload,
 			Attributes: attrs,
 		}).
 		Get(ctx)
-	return
+	if err != nil {
+		return fmt.Errorf("error publishing to GCP Pub/Sub topic %s: %w", topicName, err)
+	}
+
+	// Emit debug log.
+	log.FromContext(ctx).V(1).Info("Event published to GCP Pub/Sub topic",
+		"topic", topicName,
+		"server message id", serverID)
+
+	return nil
 }
diff --git a/internal/notifier/google_pubsub_test.go b/internal/notifier/google_pubsub_test.go
index a94da52..d9e491a 100644
--- a/internal/notifier/google_pubsub_test.go
+++ b/internal/notifier/google_pubsub_test.go
@@ -19,7 +19,6 @@ package notifier
 import (
 	"context"
 	"errors"
-	"fmt"
 	"testing"
 
 	. "github.com/onsi/gomega"
@@ -50,60 +49,18 @@ func TestNewGooglePubSub(t *testing.T) {
 			topicID:     "",
 			expectedErr: errors.New("GCP Pub/Sub topic ID cannot be empty"),
 		},
-		{
-			name:              "topic name is stored properly",
-			projectID:         "project-id",
-			topicID:           "topic-id",
-			expectedTopicName: "projects/project-id/topics/topic-id",
-		},
-		{
-			name:              "client options are stored properly",
-			projectID:         "project-id",
-			topicID:           "topic-id",
-			expectedTopicName: "projects/project-id/topics/topic-id",
-			clientOpts:        []option.ClientOption{option.WithCredentialsJSON([]byte("json credentials"))},
-		},
-		{
-			name:              "non-empty attributes are stored properly",
-			projectID:         "project-id",
-			topicID:           "topic-id",
-			expectedTopicName: "projects/project-id/topics/topic-id",
-			attrs:             map[string]string{"foo": "bar"},
-			expectedAttrs:     map[string]string{"foo": "bar"},
-		},
-		{
-			name:              "empty attributes are stored properly",
-			projectID:         "project-id",
-			topicID:           "topic-id",
-			expectedTopicName: "projects/project-id/topics/topic-id",
-			attrs:             map[string]string{},
-			expectedAttrs:     nil,
-		},
 	}
 
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			g := NewWithT(t)
-			ctx := context.Background()
-			provider, err := NewGooglePubSub(ctx, tt.projectID, tt.topicID, tt.attrs, tt.clientOpts)
+			provider, err := NewGooglePubSub(&notifierOptions{
+				URL:     tt.projectID,
+				Channel: tt.topicID,
+			})
 
-			if tt.expectedErr != nil {
-				g.Expect(err).To(Equal(tt.expectedErr))
-				g.Expect(provider).To(BeNil())
-			} else {
-				g.Expect(err).To(BeNil())
-				g.Expect(provider).NotTo(BeNil())
-
-				g.Expect(provider.topicID).To(Equal(tt.topicID))
-				g.Expect(provider.attrs).To(Equal(tt.expectedAttrs))
-				g.Expect(provider.topicName).To(Equal(tt.expectedTopicName))
-
-				g.Expect(provider.client).NotTo(BeNil())
-
-				client := provider.client.(*googlePubSubClient)
-				g.Expect(client).NotTo(BeNil())
-				g.Expect(client.opts).To(Equal(tt.clientOpts))
-			}
+			g.Expect(err).To(Equal(tt.expectedErr))
+			g.Expect(provider).To(BeNil())
 		})
 	}
 }
@@ -123,14 +80,11 @@ type googlePubSubPostTestCase struct {
 	g *WithT
 }
 
-func (tt *googlePubSubPostTestCase) publish(ctx context.Context, topicID string, eventPayload []byte, attrs map[string]string) (serverID string, err error) {
+func (tt *googlePubSubPostTestCase) publish(ctx context.Context, eventPayload []byte) error {
 	tt.g.THelper()
 	tt.publishExecuted = true
-	tt.g.Expect(topicID).To(Equal(tt.topicID))
 	tt.g.Expect(string(eventPayload)).To(Equal(tt.expectedEventPayload))
-	tt.g.Expect(attrs).To(Equal(tt.attrs))
-	// serverID is only used in a debug log for now, there's no way to assert it
-	return "", tt.publishErr
+	return tt.publishErr
 }
 
 func TestGooglePubSubPost(t *testing.T) {
@@ -151,11 +105,11 @@ func TestGooglePubSubPost(t *testing.T) {
 			publishShouldExecute: false,
 		},
 		{
-			name:                 "publish error is wrapped and relayed",
+			name:                 "publish error is relayed",
 			expectedEventPayload: `{"involvedObject":{},"severity":"","timestamp":null,"message":"","reason":"","reportingController":""}`,
 			topicName:            "projects/projectID/topics/topicID",
 			publishErr:           errors.New("publish error"),
-			expectedErr:          fmt.Errorf("error publishing event to topic projects/projectID/topics/topicID: %w", errors.New("publish error")),
+			expectedErr:          errors.New("publish error"),
 			publishShouldExecute: true,
 		},
 		{
@@ -173,10 +127,7 @@ func TestGooglePubSubPost(t *testing.T) {
 			tt.g = g
 
 			topic := &GooglePubSub{
-				client:    tt,
-				topicID:   tt.topicID,
-				attrs:     tt.attrs,
-				topicName: tt.topicName,
+				client: tt,
 			}
 
 			err := topic.Post(context.Background(), tt.event)

@cappyzawa
Copy link
Member Author

Sorry for making you do most of the implementation…! (good morning 🌞 )
I’ll go ahead and incorporate your fix—thanks!

@matheuscscp
Copy link
Member

Sorry for making you do most of the implementation…! (good morning 🌞 ) I’ll go ahead and incorporate your fix—thanks!

There's nothing to be sorry about! It's my mistake not guiding you correctly beforehand, I definitely did not predict this issue we found out, it's for sure something to keep in mind more often. You are helping us out tons here! 😁

@cappyzawa
Copy link
Member Author

@matheuscscp I’ve merged your changes in 9021d14.
Could you check if the diff looks as intended? 🙏

That was an awesome refactor—thanks a lot!

@cappyzawa
Copy link
Member Author

If you've finished reviewing the changes, let me know and I'll squash the commits 🙏

@matheuscscp
Copy link
Member

see collapsed diff

I did a last-minute edit on this comment after catching a problem (passing wrong arg to client.Topic()), it looks like you fixed it for me, is that correct?

diff --git a/internal/notifier/google_pubsub.go b/internal/notifier/google_pubsub.go
index 52b2bb7..0dd4ef9 100644
--- a/internal/notifier/google_pubsub.go
+++ b/internal/notifier/google_pubsub.go
@@ -103,7 +103,7 @@ func (g *googlePubSubClient) publish(ctx context.Context, eventPayload []byte) e
 	}
 	topic := fmt.Sprintf("projects/%s/topics/%s", projectID, topicID)
 	serverID, err := client.
-		Topic(topic).
+		Topic(topicID).
 		Publish(ctx, &pubsub.Message{
 			Data:       eventPayload,
 			Attributes: attrs,

Copy link
Member

@matheuscscp matheuscscp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! 🚀

Thanks very much @cappyzawa ❤️

I was able to test both controller-level and object-level workload identity for the commit 9021d14:

{
    "involvedObject": {
        "kind": "OCIRepository",
        "namespace": "flux-system",
        "name": "flux-system",
        "uid": "f2dde3e6-6411-498a-b26c-1c14c00b36ac",
        "apiVersion": "source.toolkit.fluxcd.io/v1",
        "resourceVersion": "247276"
    },
    "severity": "info",
    "timestamp": "2025-08-02T22:21:54Z",
    "message": "stored artifact with revision 'latest@sha256:<redacted>' from 'oci://<redacted>', origin source 'https://<redacted>', origin revision 'refs/heads/main@sha1:<redacted>'",
    "reason": "NewArtifact",
    "metadata": {
        "authMode": "controller-level",
        "cluster": "preview",
        "revision": "latest@sha256:<redacted>"
    },
    "reportingController": "source-controller",
    "reportingInstance": "source-controller-775f75d5b4-6rll5"
}
{
    "involvedObject": {
        "kind": "OCIRepository",
        "namespace": "flux-system",
        "name": "flux-system",
        "uid": "f2dde3e6-6411-498a-b26c-1c14c00b36ac",
        "apiVersion": "source.toolkit.fluxcd.io/v1",
        "resourceVersion": "246335"
    },
    "severity": "info",
    "timestamp": "2025-08-02T22:18:58Z",
    "message": "stored artifact with revision 'latest@sha256:<redacted>' from 'oci://<redacted>', origin source 'https://<redacted>', origin revision 'refs/heads/main@sha1:<redacted>'",
    "reason": "NewArtifact",
    "metadata": {
        "authMode": "object-level",
        "cluster": "preview",
        "revision": "latest@sha256:<redacted>"
    },
    "reportingController": "source-controller",
    "reportingInstance": "source-controller-775f75d5b4-6rll5"
}

I added the authMode metadata key to the event through the .spec.eventMetadata API:

apiVersion: notification.toolkit.fluxcd.io/v1beta3
kind: Alert
metadata:
  name: pubsub
  namespace: flux-system
spec:
  providerRef:
    name: pubsub
  eventMetadata:
    cluster: preview
    authMode: controller-level # and object-level while testing .spec.serviceAccountName set on the Provider object
  eventSeverity: info
  eventSources:
    - kind: OCIRepository
      name: '*'

Add support for object-level GCP workload identity authentication to enable
individual Providers to authenticate using their own ServiceAccount without
needing to manage JSON credentials. This extends beyond the existing
controller-level workload identity that is automatically handled by
Google libraries.

The implementation maintains backward compatibility by prioritizing
JSON credentials when both authentication methods are available.
Proxy support is also added following the Azure DevOps pattern
for consistency across notifiers.

This change is part of the broader effort to support multi-tenant
workload identity across Flux controllers (RFC-0010).

Signed-off-by: cappyzawa <cappyzawa@gmail.com>
Add comprehensive workload identity documentation for both Google Pub/Sub and Azure DevOps providers.
Include controller-level and object-level authentication patterns with feature gate requirements
and setup instructions for multi-tenant environments.

Signed-off-by: cappyzawa <cappyzawa@gmail.com>
@cappyzawa cappyzawa force-pushed the feat/google-pubsub-workload-identity branch from 9021d14 to 039cd81 Compare August 2, 2025 22:35
@matheuscscp matheuscscp merged commit 4d1c503 into fluxcd:main Aug 2, 2025
5 checks passed
@cappyzawa cappyzawa deleted the feat/google-pubsub-workload-identity branch August 2, 2025 22:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants