Skip to content

Conversation

@MASA-JAPAN
Copy link
Contributor

@MASA-JAPAN MASA-JAPAN commented Jan 2, 2026

Fix concurrent map write panic in pubsub trace propagation

Fixes #11314

Summary

This PR fixes a concurrent map write panic that occurs when multiple goroutines publish messages with shared attributes maps while OpenTelemetry tracing is enabled.

Problem

When users publish messages concurrently using a shared attributes map with tracing enabled, the application crashes with:

fatal error: concurrent map iteration and map write

Root Cause

The injectPropagation() function in both pubsub/trace.go and pubsub/v2/trace.go was modifying the message's attributes map directly by injecting trace context propagation attributes (prefixed with googclient_).

When multiple goroutines published messages using the same shared attributes map:

  1. Goroutine A calls injectPropagation() which modifies the map
  2. Goroutine B simultaneously tries to serialize the same map for publishing
  3. Go runtime detects concurrent map iteration and write → panic

User Impact

This bug affects any application that:

  • Enables OpenTelemetry tracing with EnableOpenTelemetryTracing: true
  • Publishes messages concurrently from multiple goroutines
  • Reuses the same attributes map across multiple messages

This is a common pattern in production code where users create a shared attributes map for common metadata.

Solution

Create a defensive copy of the attributes map before injecting trace context propagation attributes. This ensures each message gets its own independent copy, preventing concurrent access to the shared map.

Changes

Modified Files:

  • pubsub/trace.go - Added defensive copy in injectPropagation() function
  • pubsub/v2/trace.go - Added defensive copy in injectPropagation() function
  • pubsub/trace_test.go - Added regression test TestPublish_ConcurrentWithSharedAttributes

Testing

New Test: TestPublish_ConcurrentWithSharedAttributes

This regression test reproduces the exact scenario reported in issue #11314:

  • Creates a shared attributes map
  • Launches 50 goroutines that each publish 20 messages (1000 total)
  • All messages use the same shared attributes map
  • Verifies no panic occurs
  • Verifies the original shared map remains unmodified

Test Results:

  • ✅ New regression test passes
  • ✅ All existing pubsub v1 tests pass (95.869s)
  • ✅ All existing pubsub v2 tests pass (2.108s)
  • ✅ No performance regression (defensive copy is only created when needed)

Test Coverage

The test validates:

  1. Concurrency safety: 1000 concurrent publishes complete without panic
  2. Isolation: Original shared map is not modified
  3. Functionality: Trace context is still properly injected (verified by span creation)

Performance Impact

Minimal impact:

  • Copy operation only occurs when:
    • Tracing is enabled (EnableOpenTelemetryTracing: true)
    • Valid trace context exists
    • Message has non-nil attributes
  • Copy is O(n) where n = number of attributes (typically small, e.g., 2-5 attributes)
  • Pre-allocated map with correct capacity avoids reallocations

Breaking Changes

None. This is a bug fix that maintains backward compatibility while fixing undefined behavior.

Checklist

@MASA-JAPAN MASA-JAPAN requested review from a team as code owners January 2, 2026 06:59
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @MASA-JAPAN, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request resolves a critical concurrency bug in the Pub/Sub client's OpenTelemetry tracing integration. Previously, applications using shared message attribute maps and OpenTelemetry tracing could experience panics due to concurrent map writes when multiple goroutines published messages. The fix introduces a defensive copy mechanism for message attributes during trace context injection, ensuring thread safety and preventing application crashes without introducing breaking changes or significant performance overhead.

Highlights

  • Concurrent Map Write Fix: Addresses a panic caused by concurrent modification of shared message attribute maps during OpenTelemetry trace propagation when multiple goroutines publish messages.
  • Defensive Copy Implementation: The injectPropagation function now creates a defensive copy of the message attributes map before injecting trace context, preventing direct modification of shared maps and ensuring thread safety.
  • Regression Test Added: A new test, TestPublish_ConcurrentWithSharedAttributes, has been introduced to specifically reproduce and verify the fix for the concurrent map write scenario, confirming concurrency safety and isolation of shared attribute maps.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively addresses a critical concurrent map write panic by introducing a defensive copy of the message attributes map before modification. The fix is correctly applied to both pubsub/trace.go and pubsub/v2/trace.go, and the new regression test in pubsub/trace_test.go is comprehensive and accurately reproduces the issue.

My review includes a few suggestions to enhance the implementation:

  • Simplifying the map copy logic in both trace.go files for better conciseness.
  • Improving the robustness of the new concurrent test by adding a timeout to prevent hangs.
  • Making the test's map verification more maintainable by using reflect.DeepEqual.

Overall, this is a great fix for a serious issue. The changes are well-described and tested.

Comment on lines 326 to 335
if msg.Attributes == nil {
msg.Attributes = make(map[string]string)
} else {
// Make a copy of the original attributes
attrs := make(map[string]string, len(msg.Attributes))
for k, v := range msg.Attributes {
attrs[k] = v
}
msg.Attributes = attrs
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for creating a defensive copy can be simplified. Ranging over a nil map in Go is a no-op, which means you can remove the if msg.Attributes == nil check and just perform the copy. If msg.Attributes is nil, len will be 0 and the loop won't execute, resulting in a new empty map being assigned to msg.Attributes. This makes the code more concise.

As an alternative, since this repository uses Go 1.21+, you could use maps.Clone for an even more concise implementation, though you'd still need to handle the nil case separately as maps.Clone(nil) returns nil.

// With Go 1.21+
attrs := maps.Clone(msg.Attributes)
if attrs == nil {
    attrs = make(map[string]string)
}
msg.Attributes = attrs
		attrs := make(map[string]string, len(msg.Attributes))
		for k, v := range msg.Attributes {
			attrs[k] = v
		}
		msg.Attributes = attrs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to a change in our how our release pipeline works, we should only touch one module at a time (pubsub or pubsub/v2). Please revert the changes here and just make changes to pubsub/v2 for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 800 to 804
for i := 0; i < numGoroutines; i++ {
if err := <-errChan; err != nil {
t.Fatalf("concurrent publish failed: %v", err)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current approach of waiting for results on a channel can cause the test to hang indefinitely if one of the goroutines panics before sending a result. To make the test more robust, consider adding a timeout to the receive operation. This will prevent hangs and cause the test to fail clearly if it takes too long. You will need to import the time package.

Suggested change
for i := 0; i < numGoroutines; i++ {
if err := <-errChan; err != nil {
t.Fatalf("concurrent publish failed: %v", err)
}
}
for i := 0; i < numGoroutines; i++ {
select {
case err := <-errChan:
if err != nil {
t.Fatalf("concurrent publish failed: %v", err)
}
case <-time.After(30 * time.Second):
t.Fatal("test timed out waiting for goroutines to finish")
}
}

Comment on lines 812 to 817
if len(sharedAttrs) != 3 {
t.Errorf("shared attributes map was modified: expected 3 entries, got %d", len(sharedAttrs))
}
if sharedAttrs["key1"] != "value1" || sharedAttrs["key2"] != "value2" || sharedAttrs["key3"] != "value3" {
t.Errorf("shared attributes map values were modified: %v", sharedAttrs)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This check for map modification is a bit verbose and brittle. Using reflect.DeepEqual would make the check more concise and easier to maintain if the test's shared attributes change in the future. You will need to import the reflect package.

Suggested change
if len(sharedAttrs) != 3 {
t.Errorf("shared attributes map was modified: expected 3 entries, got %d", len(sharedAttrs))
}
if sharedAttrs["key1"] != "value1" || sharedAttrs["key2"] != "value2" || sharedAttrs["key3"] != "value3" {
t.Errorf("shared attributes map values were modified: %v", sharedAttrs)
}
expectedAttrs := map[string]string{
"key1": "value1",
"key2": "value2",
"key3": "value3",
}
if !reflect.DeepEqual(sharedAttrs, expectedAttrs) {
t.Errorf("shared attributes map was modified. got: %v, want: %v", sharedAttrs, expectedAttrs)
}

Comment on lines 326 to 335
if msg.Attributes == nil {
msg.Attributes = make(map[string]string)
} else {
// Make a copy of the original attributes
attrs := make(map[string]string, len(msg.Attributes))
for k, v := range msg.Attributes {
attrs[k] = v
}
msg.Attributes = attrs
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for creating a defensive copy can be simplified. Ranging over a nil map in Go is a no-op, which means you can remove the if msg.Attributes == nil check and just perform the copy. If msg.Attributes is nil, len will be 0 and the loop won't execute, resulting in a new empty map being assigned to msg.Attributes. This makes the code more concise.

As an alternative, since this repository uses Go 1.21+, you could use maps.Clone for an even more concise implementation, though you'd still need to handle the nil case separately as maps.Clone(nil) returns nil.

// With Go 1.21+
attrs := maps.Clone(msg.Attributes)
if attrs == nil {
    attrs = make(map[string]string)
}
msg.Attributes = attrs
		attrs := make(map[string]string, len(msg.Attributes))
		for k, v := range msg.Attributes {
			attrs[k] = v
		}
		msg.Attributes = attrs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use maps.Clone here as suggested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hongalex hongalex changed the title Fix concurrent map write panic in pubsub trace propagation feat(pubsub/v2): fix concurrent map write Jan 6, 2026
Copy link
Member

@hongalex hongalex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned in the detailed comments, but I think the test added is unnecessarily complex. You can probably skip all of the goroutines, and just write a test that verifies the attributes map argument remains the same from a single publish with otel enabled.

Also please only make changes to the pubsub/v2 directory. Do not make changes to pubsub/trace.go or pubsub/trace_test.go in the same PR that you touch v2 code.

Thanks for getting around to this issue!

Comment on lines 326 to 335
if msg.Attributes == nil {
msg.Attributes = make(map[string]string)
} else {
// Make a copy of the original attributes
attrs := make(map[string]string, len(msg.Attributes))
for k, v := range msg.Attributes {
attrs[k] = v
}
msg.Attributes = attrs
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use maps.Clone here as suggested.

Comment on lines 326 to 335
if msg.Attributes == nil {
msg.Attributes = make(map[string]string)
} else {
// Make a copy of the original attributes
attrs := make(map[string]string, len(msg.Attributes))
for k, v := range msg.Attributes {
attrs[k] = v
}
msg.Attributes = attrs
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to a change in our how our release pipeline works, we should only touch one module at a time (pubsub or pubsub/v2). Please revert the changes here and just make changes to pubsub/v2 for now.

@MASA-JAPAN
Copy link
Contributor Author

@hongalex
Thank you for all your comments ✨
I've updated the codes accordingly.
I appreciate it if you could review it again.

@MASA-JAPAN MASA-JAPAN requested a review from hongalex January 7, 2026 14:22
Copy link
Member

@hongalex hongalex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments for the test.

Comment on lines 781 to 787
// Verify that the original shared attributes map was not modified
if len(sharedAttrs) != 3 {
t.Errorf("shared attributes map was modified: expected 3 entries, got %d", len(sharedAttrs))
}
if sharedAttrs["key1"] != "value1" || sharedAttrs["key2"] != "value2" || sharedAttrs["key3"] != "value3" {
t.Errorf("shared attributes map values were modified: %v", sharedAttrs)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would argue this is the only logic that's necessary. Everything after is not needed since we have other tests to validate that the context propagation is happening properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MASA-JAPAN MASA-JAPAN requested a review from hongalex January 9, 2026 13:26
@hongalex hongalex added the kokoro:run Add this label to force Kokoro to re-run the tests. label Jan 13, 2026
@kokoro-team kokoro-team removed the kokoro:run Add this label to force Kokoro to re-run the tests. label Jan 13, 2026
@hongalex hongalex merged commit 50a9c4a into googleapis:main Jan 15, 2026
10 of 11 checks passed
krishnamd-jkp pushed a commit that referenced this pull request Jan 28, 2026
# Fix concurrent map write panic in pubsub trace propagation

Fixes #11314

## Summary

This PR fixes a concurrent map write panic that occurs when multiple
goroutines publish messages with shared attributes maps while
OpenTelemetry tracing is enabled.

## Problem

When users publish messages concurrently using a shared attributes map
with tracing enabled, the application crashes with:

```
fatal error: concurrent map iteration and map write
```

### Root Cause

The `injectPropagation()` function in both `pubsub/trace.go` and
`pubsub/v2/trace.go` was modifying the message's attributes map directly
by injecting trace context propagation attributes (prefixed with
`googclient_`).

When multiple goroutines published messages using the same shared
attributes map:
1. Goroutine A calls `injectPropagation()` which modifies the map
2. Goroutine B simultaneously tries to serialize the same map for
publishing
3. Go runtime detects concurrent map iteration and write → panic

### User Impact

This bug affects any application that:
- Enables OpenTelemetry tracing with `EnableOpenTelemetryTracing: true`
- Publishes messages concurrently from multiple goroutines
- Reuses the same attributes map across multiple messages

This is a common pattern in production code where users create a shared
attributes map for common metadata.

## Solution

Create a defensive copy of the attributes map before injecting trace
context propagation attributes. This ensures each message gets its own
independent copy, preventing concurrent access to the shared map.

### Changes

**Modified Files:**
- `pubsub/trace.go` - Added defensive copy in `injectPropagation()`
function
- `pubsub/v2/trace.go` - Added defensive copy in `injectPropagation()`
function
- `pubsub/trace_test.go` - Added regression test
`TestPublish_ConcurrentWithSharedAttributes`

## Testing

### New Test: `TestPublish_ConcurrentWithSharedAttributes`

This regression test reproduces the exact scenario reported in issue
#11314:
- Creates a shared attributes map
- Launches 50 goroutines that each publish 20 messages (1000 total)
- All messages use the same shared attributes map
- Verifies no panic occurs
- Verifies the original shared map remains unmodified

**Test Results:**
- ✅ New regression test passes
- ✅ All existing pubsub v1 tests pass (95.869s)
- ✅ All existing pubsub v2 tests pass (2.108s)
- ✅ No performance regression (defensive copy is only created when
needed)

### Test Coverage

The test validates:
1. **Concurrency safety**: 1000 concurrent publishes complete without
panic
2. **Isolation**: Original shared map is not modified
3. **Functionality**: Trace context is still properly injected (verified
by span creation)

## Performance Impact

**Minimal impact:**
- Copy operation only occurs when:
  - Tracing is enabled (`EnableOpenTelemetryTracing: true`)
  - Valid trace context exists
  - Message has non-nil attributes
- Copy is O(n) where n = number of attributes (typically small, e.g.,
2-5 attributes)
- Pre-allocated map with correct capacity avoids reallocations

## Breaking Changes

None. This is a bug fix that maintains backward compatibility while
fixing undefined behavior.

## Checklist

- [x] Fixed the bug in both `pubsub/trace.go` (v1) and
`pubsub/v2/trace.go` (v2)
- [x] Added comprehensive regression test
- [x] All existing tests pass
- [x] Added code comments explaining the fix and referencing issue
#11314
- [x] No breaking changes
- [x] Performance impact is negligible

---------

Co-authored-by: MASA-JAPAN <MASA-JAPAN@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

pubsub: concurrent map write panic when publishing with telemetry enabled and shared attributes

4 participants