Skip to content

Conversation

@dbinnersley
Copy link

@dbinnersley dbinnersley commented Dec 19, 2025

Following up on an issue I created a couple months ago. #2257 We are in need of this feature to be able to use websockets through cosmo due to the volume of data we are handling.

This adds new websocket compression options to the config and handles the negotiation of the websocket upgrade to use compression if the client requests it.

Summary by CodeRabbit

  • New Features

    • WebSocket permessage-deflate compression support with configurable compression level (1–9, default 6).
    • Client-side option to initiate compressed GraphQL WebSocket connections.
  • Configuration

    • New WebSocket compression configuration block (enabled, level) added to runtime config and schema.
  • Tests

    • Added tests covering enabled/disabled compression, custom levels, and compressed subscription flows.

✏️ Tip: You can customize this high-level summary in your review settings.

Checklist

  • I have discussed my proposed changes in an issue and have received approval to proceed.
  • I have followed the coding standards of the project.
  • Tests or benchmarks have been added or updated.
  • Documentation has been updated on https://github.com/wundergraph/cosmo-docs.
  • I have read the Contributors Guide.

@coderabbitai
Copy link

coderabbitai bot commented Dec 19, 2025

Warning

Rate limit exceeded

@dbinnersley has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 19 minutes and 4 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 1e68a39 and 4f0907c.

📒 Files selected for processing (1)
  • router/pkg/config/config.go (1 hunks)

Walkthrough

Adds permessage-deflate WebSocket compression: configuration (enable + level), negotiation during upgrade, compressed-frame read/write support, test helpers for compression-enabled GraphQL WS connections, and unit tests covering negotiation and payload exchange.

Changes

Cohort / File(s) Summary
Configuration Schema & Defaults
router/pkg/config/config.go, router/pkg/config/config.schema.json, router/pkg/config/fixtures/full.yaml, router/pkg/config/testdata/config_defaults.json, router/pkg/config/testdata/config_full.json
Introduced WebSocketCompressionConfiguration (Enabled, Level) and added Compression to WebSocketConfiguration; updated JSON schema and example/test fixtures with defaults and sample values (enabled: true, level: 6).
WebSocket Handler Implementation
router/core/websocket.go
Added compressionEnabled and compressionLevel to WebsocketHandler and wsConnectionWrapper; negotiate permessage-deflate during upgrade; implement compressed read path (RSV1/decompress) and writeCompressed path (deflate + RSV1) with bounds-checked compression level.
Test Infrastructure
router-tests/testenv/testenv.go
Added GraphQLWebsocketDialWithCompressionRetry and InitGraphQLWebSocketConnectionWithCompression to dial/init GraphQL WebSocket connections that enable client-side compression negotiation while preserving retry/error handling.
Test Cases
router-tests/websocket_test.go
Added subtests for compression scenarios: server+client enabled, server disabled + client enabled, and custom compression level; assert Sec-WebSocket-Extensions negotiation and subscription payload lifecycle under compressed/uncompressed flows.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Review compression negotiation and how the upgrader detects/records accepted extensions.
  • Inspect read path changes for correct RSV1 handling, control-frame semantics, masking, and decompression corner cases.
  • Verify writeCompressed: deflate writer configuration, RSV1/frame construction, and proper flushing/fragmentation behavior.
  • Confirm compression level clamping and config tag correctness (YAML/env tags).
  • Check new test helpers and tests for deterministic behavior and proper cleanup of connections.

Possibly related issues

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: adding support for per-message deflate compression to WebSocket connections, which is the primary focus across all modified files.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@dbinnersley
Copy link
Author

@coderabbitai summary

@coderabbitai
Copy link

coderabbitai bot commented Dec 19, 2025

✅ Actions performed

Summary regeneration triggered.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
router/core/websocket.go (1)

285-302: Consider handling flate.NewWriter error.

The error from flate.NewWriter is currently ignored. While the compression level is validated during initialization, explicitly handling this error would be more defensive.

🔎 Proposed defensive error handling
 func (c *wsConnectionWrapper) writeCompressed(data []byte) error {
 	var buf bytes.Buffer
 	writer := wsflate.NewWriter(&buf, func(w io.Writer) wsflate.Compressor {
-		fw, _ := flate.NewWriter(w, c.compressionLevel)
+		fw, err := flate.NewWriter(w, c.compressionLevel)
+		if err != nil {
+			// Fallback to default compression if level is somehow invalid
+			fw, _ = flate.NewWriter(w, flate.DefaultCompression)
+		}
 		return fw
 	})
router-tests/testenv/testenv.go (2)

2348-2384: Compression dial helper correctly mirrors existing retry logic; duplication is acceptable but extractable.

The implementation is in line with GraphQLWebsocketDialWithRetry: same retry/backoff behavior and ErrBadHandshake handling, with the only delta being EnableCompression: true. That keeps behavior predictable and is enough to negotiate permessage-deflate.

If you want to avoid divergence long term, you could factor the common retry loop into a small internal helper taking a *websocket.Dialer:

Possible small refactor to share retry logic (optional)
 func (e *Environment) GraphQLWebsocketDialWithRetry(header http.Header, query url.Values) (*websocket.Conn, *http.Response, error) {
-	dialer := websocket.Dialer{
-		Subprotocols: []string{"graphql-transport-ws"},
-	}
-	// ...
-	for i := 0; i <= maxSocketRetries; i++ {
-		// ...
-		conn, resp, err := dialer.Dial(urlStr, header)
-		// ...
-	}
-	return nil, nil, err
+	dialer := websocket.Dialer{
+		Subprotocols: []string{"graphql-transport-ws"},
+	}
+	return e.graphQLWebsocketDialWithRetryInternal(&dialer, header, query)
 }
 
 func (e *Environment) GraphQLWebsocketDialWithCompressionRetry(header http.Header, query url.Values) (*websocket.Conn, *http.Response, error) {
-	dialer := websocket.Dialer{
-		Subprotocols:      []string{"graphql-transport-ws"},
-		EnableCompression: true,
-	}
-	// same loop as above...
-	return nil, nil, err
+	dialer := websocket.Dialer{
+		Subprotocols:      []string{"graphql-transport-ws"},
+		EnableCompression: true,
+	}
+	return e.graphQLWebsocketDialWithRetryInternal(&dialer, header, query)
 }
+
+func (e *Environment) graphQLWebsocketDialWithRetryInternal(
+	dialer *websocket.Dialer,
+	header http.Header,
+	query url.Values,
+) (*websocket.Conn, *http.Response, error) {
+	waitBetweenRetriesInMs := rand.Intn(10)
+	timeToSleep := time.Duration(waitBetweenRetriesInMs) * time.Millisecond
+
+	var err error
+	for i := 0; i <= maxSocketRetries; i++ {
+		urlStr := e.GraphQLWebSocketSubscriptionURL()
+		if query != nil {
+			urlStr += "?" + query.Encode()
+		}
+		conn, resp, err := dialer.Dial(urlStr, header)
+		if resp != nil && err == nil {
+			return conn, resp, nil
+		}
+		if errors.Is(err, websocket.ErrBadHandshake) {
+			return conn, resp, err
+		}
+		if i != maxSocketRetries {
+			time.Sleep(timeToSleep)
+			timeToSleep *= 2
+		}
+	}
+	return nil, nil, err
+}

2403-2420: Compressed Init helper is consistent with the existing Init path and exposes the response for header assertions.

This mirrors InitGraphQLWebSocketConnection closely while (a) dialing through the compression-enabled helper and (b) returning the *http.Response so tests can inspect negotiated extensions. Cleanup and ack handling are identical, so behavior stays aligned across both paths.

If you want to DRY things up, you could have both Init helpers call a shared internal initGraphQLWebSocketConnection(dial func(...) (*websocket.Conn, *http.Response, error)), but it’s non-essential.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2173504 and 1e68a39.

📒 Files selected for processing (8)
  • router-tests/testenv/testenv.go (2 hunks)
  • router-tests/websocket_test.go (1 hunks)
  • router/core/websocket.go (10 hunks)
  • router/pkg/config/config.go (2 hunks)
  • router/pkg/config/config.schema.json (1 hunks)
  • router/pkg/config/fixtures/full.yaml (1 hunks)
  • router/pkg/config/testdata/config_defaults.json (1 hunks)
  • router/pkg/config/testdata/config_full.json (1 hunks)
🧰 Additional context used
🧠 Learnings (6)
📚 Learning: 2025-10-01T20:39:16.113Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2252
File: router-tests/telemetry/telemetry_test.go:9684-9693
Timestamp: 2025-10-01T20:39:16.113Z
Learning: Repo preference: In router-tests/telemetry/telemetry_test.go, keep strict > 0 assertions for request.operation.*Time (parsingTime, normalizationTime, validationTime, planningTime) in telemetry-related tests; do not relax to >= 0 unless CI flakiness is observed.

Applied to files:

  • router-tests/websocket_test.go
📚 Learning: 2025-08-28T09:18:10.121Z
Learnt from: endigma
Repo: wundergraph/cosmo PR: 2141
File: router-tests/http_subscriptions_test.go:100-108
Timestamp: 2025-08-28T09:18:10.121Z
Learning: In router-tests/http_subscriptions_test.go heartbeat tests, the message ordering should remain strict with data messages followed by heartbeat messages, as the timing is deterministic and known by design in the Cosmo router implementation.

Applied to files:

  • router-tests/websocket_test.go
📚 Learning: 2025-09-17T20:55:39.456Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2172
File: router/core/graph_server.go:0-0
Timestamp: 2025-09-17T20:55:39.456Z
Learning: The Initialize method in router/internal/retrytransport/manager.go has been updated to properly handle feature-flag-only subgraphs by collecting subgraphs from both routerConfig.GetSubgraphs() and routerConfig.FeatureFlagConfigs.ConfigByFeatureFlagName, ensuring all subgraphs receive retry configuration.

Applied to files:

  • router-tests/testenv/testenv.go
📚 Learning: 2025-11-19T15:13:57.821Z
Learnt from: dkorittki
Repo: wundergraph/cosmo PR: 2273
File: router/core/graphql_handler.go:0-0
Timestamp: 2025-11-19T15:13:57.821Z
Learning: In the Cosmo router (wundergraph/cosmo), error handling follows a two-phase pattern: (1) Prehandler phase handles request parsing, validation, and setup errors using `httpGraphqlError` and `writeOperationError` (in files like graphql_prehandler.go, operation_processor.go, parse_multipart.go, batch.go); (2) Execution phase handles resolver execution errors using `WriteError` in GraphQLHandler.ServeHTTP. Because all `httpGraphqlError` instances are caught in the prehandler before ServeHTTP is invoked, any error type checks for `httpGraphqlError` in the execution-phase WriteError method are unreachable code.

Applied to files:

  • router-tests/testenv/testenv.go
📚 Learning: 2025-07-21T15:06:36.664Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2067
File: router/pkg/config/config.schema.json:1637-1644
Timestamp: 2025-07-21T15:06:36.664Z
Learning: In the Cosmo router project, when extending JSON schema validation for security-sensitive fields like JWKS secrets, backwards compatibility is maintained by implementing warnings in the Go code rather than hard validation constraints in the schema. This allows existing configurations to continue working while alerting users to potential security issues.

Applied to files:

  • router/pkg/config/config.schema.json
  • router/pkg/config/testdata/config_full.json
📚 Learning: 2025-07-21T14:46:34.879Z
Learnt from: SkArchon
Repo: wundergraph/cosmo PR: 2067
File: router/pkg/authentication/jwks_token_decoder.go:80-106
Timestamp: 2025-07-21T14:46:34.879Z
Learning: In the Cosmo router project, required field validation for JWKS configuration (Secret, Algorithm, KeyId) is handled at the JSON schema level in config.schema.json rather than through runtime validation in the Go code at router/pkg/authentication/jwks_token_decoder.go.

Applied to files:

  • router/pkg/config/testdata/config_full.json
🧬 Code graph analysis (1)
router/core/websocket.go (1)
router/pkg/config/config.go (1)
  • WebSocketConfiguration (710-727)
🔇 Additional comments (15)
router/pkg/config/fixtures/full.yaml (1)

407-409: LGTM!

The compression configuration is correctly placed under the websocket section with sensible values. Level 6 provides a good balance between compression ratio and CPU usage.

router/pkg/config/testdata/config_defaults.json (1)

441-445: LGTM!

Default configuration correctly has compression disabled by default, which is appropriate for backward compatibility. Level 6 is a sensible default.

router/pkg/config/testdata/config_full.json (1)

821-825: LGTM!

The full configuration test data correctly reflects the compression settings from the YAML fixture.

router/core/websocket.go (6)

93-99: LGTM!

The compression initialization correctly validates the compression level and falls back to flate.DefaultCompression for out-of-range values. The check ensures user-provided values are within the valid 1-9 range for explicit compression levels.


169-182: LGTM!

The connection wrapper correctly stores compression state for use in read/write operations.


194-239: LGTM!

The compressed frame reading logic correctly:

  • Unmasks client frames (per WebSocket spec, client-to-server frames are masked)
  • Delegates control frame handling appropriately
  • Checks the RSV1 bit to determine if decompression is needed
  • Falls back to standard reading when compression is disabled

The read timeout set earlier in the function protects against infinite loops.


400-421: LGTM!

The compression negotiation is well-implemented:

  • Using ServerNoContextTakeover and ClientNoContextTakeover is a good choice for server-side implementations as it reduces memory overhead per connection
  • The negotiation only proceeds if the upgrade succeeds (err == nil)
  • The Accepted() check correctly determines if the client agreed to compression

256-261: LGTM!

The compression path is correctly integrated with proper mutex handling.


355-358: LGTM!

Handler correctly stores compression configuration for use during connection upgrade.

router-tests/websocket_test.go (3)

2395-2434: LGTM!

This test correctly validates the compression negotiation and data transmission when both server and client support permessage-deflate. Good verification of the Sec-WebSocket-Extensions header.


2436-2474: LGTM!

This test correctly validates the fallback behavior when the server has compression disabled. It ensures the client can still communicate without compression when negotiation fails.


2476-2514: LGTM!

Good test coverage for custom compression levels. The test uses a query with nested fields which helps validate compression/decompression of larger payloads.

router/pkg/config/config.schema.json (1)

563-581: WebSocket compression schema is well-scoped and consistent with existing config style.

The new websocket.compression block cleanly adds permessage-deflate tuning: enabled defaults to false, level is constrained to 1–9 with a sensible default of 6, and additionalProperties: false prevents accidental typos. This aligns with the rest of the WebSocket config and should integrate smoothly with the Go config types.

router/pkg/config/config.go (2)

725-727: LGTM!

The new Compression field is properly integrated into the WebSocketConfiguration struct with appropriate YAML tags and documentation.


729-735: Compression level validation is properly enforced at runtime.

The struct definition is correct and the 1-9 range validation is implemented in router/core/websocket.go (lines 96-98). Invalid values are silently replaced with flate.DefaultCompression, and the documented range in the comment is accurate.

@dbinnersley
Copy link
Author

@coderabbitai generate docstrings

@coderabbitai
Copy link

coderabbitai bot commented Dec 19, 2025

✅ Actions performed

Initiated docstring generation; will generate only if new commits exist.

@coderabbitai
Copy link

coderabbitai bot commented Dec 19, 2025

Note

Docstrings generation - SUCCESS
Generated docstrings for this pull request at #2425

coderabbitai bot added a commit that referenced this pull request Dec 19, 2025
Docstrings generation was requested by @dbinnersley.

* #2424 (comment)

The following files were modified:

* `router/core/websocket.go`
@github-actions
Copy link

github-actions bot commented Jan 3, 2026

This PR was marked stale due to lack of activity. It will be closed in 14 days.

@github-actions github-actions bot added the Stale label Jan 3, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant