-
Notifications
You must be signed in to change notification settings - Fork 1.5k
feat(pubsub/v2): fix concurrent map write #13530
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(pubsub/v2): fix concurrent map write #13530
Conversation
Summary of ChangesHello @MASA-JAPAN, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request resolves a critical concurrency bug in the Pub/Sub client's OpenTelemetry tracing integration. Previously, applications using shared message attribute maps and OpenTelemetry tracing could experience panics due to concurrent map writes when multiple goroutines published messages. The fix introduces a defensive copy mechanism for message attributes during trace context injection, ensuring thread safety and preventing application crashes without introducing breaking changes or significant performance overhead. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request effectively addresses a critical concurrent map write panic by introducing a defensive copy of the message attributes map before modification. The fix is correctly applied to both pubsub/trace.go and pubsub/v2/trace.go, and the new regression test in pubsub/trace_test.go is comprehensive and accurately reproduces the issue.
My review includes a few suggestions to enhance the implementation:
- Simplifying the map copy logic in both
trace.gofiles for better conciseness. - Improving the robustness of the new concurrent test by adding a timeout to prevent hangs.
- Making the test's map verification more maintainable by using
reflect.DeepEqual.
Overall, this is a great fix for a serious issue. The changes are well-described and tested.
| if msg.Attributes == nil { | ||
| msg.Attributes = make(map[string]string) | ||
| } else { | ||
| // Make a copy of the original attributes | ||
| attrs := make(map[string]string, len(msg.Attributes)) | ||
| for k, v := range msg.Attributes { | ||
| attrs[k] = v | ||
| } | ||
| msg.Attributes = attrs | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for creating a defensive copy can be simplified. Ranging over a nil map in Go is a no-op, which means you can remove the if msg.Attributes == nil check and just perform the copy. If msg.Attributes is nil, len will be 0 and the loop won't execute, resulting in a new empty map being assigned to msg.Attributes. This makes the code more concise.
As an alternative, since this repository uses Go 1.21+, you could use maps.Clone for an even more concise implementation, though you'd still need to handle the nil case separately as maps.Clone(nil) returns nil.
// With Go 1.21+
attrs := maps.Clone(msg.Attributes)
if attrs == nil {
attrs = make(map[string]string)
}
msg.Attributes = attrs attrs := make(map[string]string, len(msg.Attributes))
for k, v := range msg.Attributes {
attrs[k] = v
}
msg.Attributes = attrsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to a change in our how our release pipeline works, we should only touch one module at a time (pubsub or pubsub/v2). Please revert the changes here and just make changes to pubsub/v2 for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pubsub/trace_test.go
Outdated
| for i := 0; i < numGoroutines; i++ { | ||
| if err := <-errChan; err != nil { | ||
| t.Fatalf("concurrent publish failed: %v", err) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current approach of waiting for results on a channel can cause the test to hang indefinitely if one of the goroutines panics before sending a result. To make the test more robust, consider adding a timeout to the receive operation. This will prevent hangs and cause the test to fail clearly if it takes too long. You will need to import the time package.
| for i := 0; i < numGoroutines; i++ { | |
| if err := <-errChan; err != nil { | |
| t.Fatalf("concurrent publish failed: %v", err) | |
| } | |
| } | |
| for i := 0; i < numGoroutines; i++ { | |
| select { | |
| case err := <-errChan: | |
| if err != nil { | |
| t.Fatalf("concurrent publish failed: %v", err) | |
| } | |
| case <-time.After(30 * time.Second): | |
| t.Fatal("test timed out waiting for goroutines to finish") | |
| } | |
| } |
pubsub/trace_test.go
Outdated
| if len(sharedAttrs) != 3 { | ||
| t.Errorf("shared attributes map was modified: expected 3 entries, got %d", len(sharedAttrs)) | ||
| } | ||
| if sharedAttrs["key1"] != "value1" || sharedAttrs["key2"] != "value2" || sharedAttrs["key3"] != "value3" { | ||
| t.Errorf("shared attributes map values were modified: %v", sharedAttrs) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check for map modification is a bit verbose and brittle. Using reflect.DeepEqual would make the check more concise and easier to maintain if the test's shared attributes change in the future. You will need to import the reflect package.
| if len(sharedAttrs) != 3 { | |
| t.Errorf("shared attributes map was modified: expected 3 entries, got %d", len(sharedAttrs)) | |
| } | |
| if sharedAttrs["key1"] != "value1" || sharedAttrs["key2"] != "value2" || sharedAttrs["key3"] != "value3" { | |
| t.Errorf("shared attributes map values were modified: %v", sharedAttrs) | |
| } | |
| expectedAttrs := map[string]string{ | |
| "key1": "value1", | |
| "key2": "value2", | |
| "key3": "value3", | |
| } | |
| if !reflect.DeepEqual(sharedAttrs, expectedAttrs) { | |
| t.Errorf("shared attributes map was modified. got: %v, want: %v", sharedAttrs, expectedAttrs) | |
| } |
pubsub/v2/trace.go
Outdated
| if msg.Attributes == nil { | ||
| msg.Attributes = make(map[string]string) | ||
| } else { | ||
| // Make a copy of the original attributes | ||
| attrs := make(map[string]string, len(msg.Attributes)) | ||
| for k, v := range msg.Attributes { | ||
| attrs[k] = v | ||
| } | ||
| msg.Attributes = attrs | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for creating a defensive copy can be simplified. Ranging over a nil map in Go is a no-op, which means you can remove the if msg.Attributes == nil check and just perform the copy. If msg.Attributes is nil, len will be 0 and the loop won't execute, resulting in a new empty map being assigned to msg.Attributes. This makes the code more concise.
As an alternative, since this repository uses Go 1.21+, you could use maps.Clone for an even more concise implementation, though you'd still need to handle the nil case separately as maps.Clone(nil) returns nil.
// With Go 1.21+
attrs := maps.Clone(msg.Attributes)
if attrs == nil {
attrs = make(map[string]string)
}
msg.Attributes = attrs attrs := make(map[string]string, len(msg.Attributes))
for k, v := range msg.Attributes {
attrs[k] = v
}
msg.Attributes = attrsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use maps.Clone here as suggested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hongalex
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned in the detailed comments, but I think the test added is unnecessarily complex. You can probably skip all of the goroutines, and just write a test that verifies the attributes map argument remains the same from a single publish with otel enabled.
Also please only make changes to the pubsub/v2 directory. Do not make changes to pubsub/trace.go or pubsub/trace_test.go in the same PR that you touch v2 code.
Thanks for getting around to this issue!
pubsub/v2/trace.go
Outdated
| if msg.Attributes == nil { | ||
| msg.Attributes = make(map[string]string) | ||
| } else { | ||
| // Make a copy of the original attributes | ||
| attrs := make(map[string]string, len(msg.Attributes)) | ||
| for k, v := range msg.Attributes { | ||
| attrs[k] = v | ||
| } | ||
| msg.Attributes = attrs | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use maps.Clone here as suggested.
| if msg.Attributes == nil { | ||
| msg.Attributes = make(map[string]string) | ||
| } else { | ||
| // Make a copy of the original attributes | ||
| attrs := make(map[string]string, len(msg.Attributes)) | ||
| for k, v := range msg.Attributes { | ||
| attrs[k] = v | ||
| } | ||
| msg.Attributes = attrs | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to a change in our how our release pipeline works, we should only touch one module at a time (pubsub or pubsub/v2). Please revert the changes here and just make changes to pubsub/v2 for now.
|
@hongalex |
hongalex
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few more comments for the test.
pubsub/v2/trace_test.go
Outdated
| // Verify that the original shared attributes map was not modified | ||
| if len(sharedAttrs) != 3 { | ||
| t.Errorf("shared attributes map was modified: expected 3 entries, got %d", len(sharedAttrs)) | ||
| } | ||
| if sharedAttrs["key1"] != "value1" || sharedAttrs["key2"] != "value2" || sharedAttrs["key3"] != "value3" { | ||
| t.Errorf("shared attributes map values were modified: %v", sharedAttrs) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would argue this is the only logic that's necessary. Everything after is not needed since we have other tests to validate that the context propagation is happening properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Fix concurrent map write panic in pubsub trace propagation Fixes #11314 ## Summary This PR fixes a concurrent map write panic that occurs when multiple goroutines publish messages with shared attributes maps while OpenTelemetry tracing is enabled. ## Problem When users publish messages concurrently using a shared attributes map with tracing enabled, the application crashes with: ``` fatal error: concurrent map iteration and map write ``` ### Root Cause The `injectPropagation()` function in both `pubsub/trace.go` and `pubsub/v2/trace.go` was modifying the message's attributes map directly by injecting trace context propagation attributes (prefixed with `googclient_`). When multiple goroutines published messages using the same shared attributes map: 1. Goroutine A calls `injectPropagation()` which modifies the map 2. Goroutine B simultaneously tries to serialize the same map for publishing 3. Go runtime detects concurrent map iteration and write → panic ### User Impact This bug affects any application that: - Enables OpenTelemetry tracing with `EnableOpenTelemetryTracing: true` - Publishes messages concurrently from multiple goroutines - Reuses the same attributes map across multiple messages This is a common pattern in production code where users create a shared attributes map for common metadata. ## Solution Create a defensive copy of the attributes map before injecting trace context propagation attributes. This ensures each message gets its own independent copy, preventing concurrent access to the shared map. ### Changes **Modified Files:** - `pubsub/trace.go` - Added defensive copy in `injectPropagation()` function - `pubsub/v2/trace.go` - Added defensive copy in `injectPropagation()` function - `pubsub/trace_test.go` - Added regression test `TestPublish_ConcurrentWithSharedAttributes` ## Testing ### New Test: `TestPublish_ConcurrentWithSharedAttributes` This regression test reproduces the exact scenario reported in issue #11314: - Creates a shared attributes map - Launches 50 goroutines that each publish 20 messages (1000 total) - All messages use the same shared attributes map - Verifies no panic occurs - Verifies the original shared map remains unmodified **Test Results:** - ✅ New regression test passes - ✅ All existing pubsub v1 tests pass (95.869s) - ✅ All existing pubsub v2 tests pass (2.108s) - ✅ No performance regression (defensive copy is only created when needed) ### Test Coverage The test validates: 1. **Concurrency safety**: 1000 concurrent publishes complete without panic 2. **Isolation**: Original shared map is not modified 3. **Functionality**: Trace context is still properly injected (verified by span creation) ## Performance Impact **Minimal impact:** - Copy operation only occurs when: - Tracing is enabled (`EnableOpenTelemetryTracing: true`) - Valid trace context exists - Message has non-nil attributes - Copy is O(n) where n = number of attributes (typically small, e.g., 2-5 attributes) - Pre-allocated map with correct capacity avoids reallocations ## Breaking Changes None. This is a bug fix that maintains backward compatibility while fixing undefined behavior. ## Checklist - [x] Fixed the bug in both `pubsub/trace.go` (v1) and `pubsub/v2/trace.go` (v2) - [x] Added comprehensive regression test - [x] All existing tests pass - [x] Added code comments explaining the fix and referencing issue #11314 - [x] No breaking changes - [x] Performance impact is negligible --------- Co-authored-by: MASA-JAPAN <MASA-JAPAN@users.noreply.github.com>
Fix concurrent map write panic in pubsub trace propagation
Fixes #11314
Summary
This PR fixes a concurrent map write panic that occurs when multiple goroutines publish messages with shared attributes maps while OpenTelemetry tracing is enabled.
Problem
When users publish messages concurrently using a shared attributes map with tracing enabled, the application crashes with:
Root Cause
The
injectPropagation()function in bothpubsub/trace.goandpubsub/v2/trace.gowas modifying the message's attributes map directly by injecting trace context propagation attributes (prefixed withgoogclient_).When multiple goroutines published messages using the same shared attributes map:
injectPropagation()which modifies the mapUser Impact
This bug affects any application that:
EnableOpenTelemetryTracing: trueThis is a common pattern in production code where users create a shared attributes map for common metadata.
Solution
Create a defensive copy of the attributes map before injecting trace context propagation attributes. This ensures each message gets its own independent copy, preventing concurrent access to the shared map.
Changes
Modified Files:
pubsub/trace.go- Added defensive copy ininjectPropagation()functionpubsub/v2/trace.go- Added defensive copy ininjectPropagation()functionpubsub/trace_test.go- Added regression testTestPublish_ConcurrentWithSharedAttributesTesting
New Test:
TestPublish_ConcurrentWithSharedAttributesThis regression test reproduces the exact scenario reported in issue #11314:
Test Results:
Test Coverage
The test validates:
Performance Impact
Minimal impact:
EnableOpenTelemetryTracing: true)Breaking Changes
None. This is a bug fix that maintains backward compatibility while fixing undefined behavior.
Checklist
pubsub/trace.go(v1) andpubsub/v2/trace.go(v2)