Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.

Commit e7b3196

Browse files
committed
[ package:dds ] Add server-sent event (SSE) support to DDS
This support is required for web clients of dwds within google3 Change-Id: Ia1ecbf8f5ba79d53cb340c83a579dc0810ec0065 Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/150183 Reviewed-by: Gary Roumanis <grouma@google.com>
1 parent d4ac706 commit e7b3196

File tree

17 files changed

+15183
-58
lines changed

17 files changed

+15183
-58
lines changed

.dart_tool/package_config.json

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,12 @@
556556
"packageUri": "lib/",
557557
"languageVersion": "2.1"
558558
},
559+
{
560+
"name": "sse",
561+
"rootUri": "../third_party/pkg/sse",
562+
"packageUri": "lib/",
563+
"languageVersion": "2.6"
564+
},
559565
{
560566
"name": "stack_trace",
561567
"rootUri": "../third_party/pkg/stack_trace",
@@ -586,6 +592,12 @@
586592
"packageUri": "lib/",
587593
"languageVersion": "2.0"
588594
},
595+
{
596+
"name": "sync_http",
597+
"rootUri": "../third_party/pkg/sync_http",
598+
"packageUri": "lib/",
599+
"languageVersion": "2.6"
600+
},
589601
{
590602
"name": "telemetry",
591603
"rootUri": "../pkg/telemetry",
@@ -694,6 +706,12 @@
694706
"packageUri": "lib/",
695707
"languageVersion": "2.2"
696708
},
709+
{
710+
"name": "webdriver",
711+
"rootUri": "../third_party/pkg/webdriver",
712+
"packageUri": "lib/",
713+
"languageVersion": "2.6"
714+
},
697715
{
698716
"name": "web_components",
699717
"rootUri": "../third_party/pkg/web_components",
@@ -713,4 +731,4 @@
713731
"languageVersion": "2.4"
714732
}
715733
]
716-
}
734+
}

.packages

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,13 @@ source_map_stack_trace:third_party/pkg/source_map_stack_trace/lib
9292
sourcemap_testing:pkg/sourcemap_testing/lib
9393
source_maps:third_party/pkg/source_maps/lib
9494
source_span:third_party/pkg/source_span/lib
95+
sse:third_party/pkg/sse/lib
9596
stack_trace:third_party/pkg/stack_trace/lib
9697
stagehand:third_party/pkg/stagehand/lib
9798
status_file:pkg/status_file/lib
9899
stream_channel:third_party/pkg/stream_channel/lib
99100
string_scanner:third_party/pkg/string_scanner/lib
101+
sync_http:third_party/pkg/sync_http/lib
100102
telemetry:pkg/telemetry/lib
101103
term_glyph:third_party/pkg/term_glyph/lib
102104
test:third_party/pkg/test/pkgs/test/lib
@@ -115,6 +117,7 @@ vm:pkg/vm/lib
115117
vm_service:pkg/vm_service/lib
116118
vm_snapshot_analysis:pkg/vm_snapshot_analysis/lib
117119
watcher:third_party/pkg/watcher/lib
120+
webdriver:third_party/pkg/webdriver/lib
118121
web_components:third_party/pkg/web_components/lib
119122
web_socket_channel:third_party/pkg/web_socket_channel/lib
120123
yaml:third_party/pkg/yaml/lib

DEPS

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ vars = {
9696
# For more details, see https://github.com/dart-lang/sdk/issues/30164
9797
"dart_style_tag": "1.3.6", # Please see the note above before updating.
9898

99+
"chromedriver_tag": "83.0.4103.39",
99100
"dartdoc_tag" : "v0.32.2",
100101
"ffi_rev": "454ab0f9ea6bd06942a983238d8a6818b1357edb",
101102
"fixnum_rev": "9b38f49f6679654d66a363e69e48173cca07e882",
@@ -142,10 +143,12 @@ vars = {
142143
"source_maps-0.9.4_rev": "38524",
143144
"source_maps_rev": "87b4fd9027378bbd51b02e9d7df794eee8a82b7a",
144145
"source_span_tag": "1.7.0",
146+
"sse_tag": "e5cf68975e8e87171a3dc297577aa073454a91dc",
145147
"stack_trace_tag": "56811dbb2530d823b764fe167ec335879a4adb32",
146148
"stagehand_tag": "v3.3.9",
147149
"stream_channel_tag": "70433d577be02c48cb16d72d65654f3b4d82c6ed",
148150
"string_scanner_rev": "a918e7371af6b6e73bfd534ff9da6084741c1f99",
151+
"sync_http_rev": "a85d7ec764ea485cbbc49f3f3e7f1b43f87a1c74",
149152
"test_descriptor_tag": "1.1.1",
150153
"test_process_tag": "1.0.3",
151154
"term_glyph_rev": "b3da31e9684a99cfe5f192b89914492018b44da7",
@@ -156,6 +159,7 @@ vars = {
156159
"usage_tag": "3.4.0",
157160
"vector_math_rev": "90631fbb609f61d42f28621253c0ec9fc6a326d2",
158161
"watcher_rev": "fc3c9aae5d31d707b3013b42634dde8d8a1161b4",
162+
"webdriver_rev": "5a8d6805d9cf8a3cbb4fcd64849b538b7491e50e",
159163
"web_components_rev": "8f57dac273412a7172c8ade6f361b407e2e4ed02",
160164
"web_socket_channel_rev": "490061ef0e22d3c8460ad2802f9948219365ad6b",
161165
"WebCore_rev": "fb11e887f77919450e497344da570d780e078bc8",
@@ -408,6 +412,8 @@ deps = {
408412
Var("dart_root") + "/third_party/pkg/source_map_stack_trace":
409413
Var("dart_git") + "source_map_stack_trace.git" +
410414
"@" + Var("source_map_stack_trace_tag"),
415+
Var("dart_root") + "/third_party/pkg/sse":
416+
Var("dart_git") + "sse.git" + "@" + Var("sse_tag"),
411417
Var("dart_root") + "/third_party/pkg/stack_trace":
412418
Var("dart_git") + "stack_trace.git" + "@" + Var("stack_trace_tag"),
413419
Var("dart_root") + "/third_party/pkg/stagehand":
@@ -418,6 +424,8 @@ deps = {
418424
Var("dart_root") + "/third_party/pkg/string_scanner":
419425
Var("dart_git") + "string_scanner.git" +
420426
"@" + Var("string_scanner_rev"),
427+
Var("dart_root") + "/third_party/pkg/sync_http":
428+
Var("dart_git") + "sync_http.git" + "@" + Var("sync_http_rev"),
421429
Var("dart_root") + "/third_party/pkg/term_glyph":
422430
Var("dart_git") + "term_glyph.git" + "@" + Var("term_glyph_rev"),
423431
Var("dart_root") + "/third_party/pkg/test":
@@ -443,6 +451,10 @@ deps = {
443451
Var("dart_root") + "/third_party/pkg/web_components":
444452
Var("dart_git") + "web-components.git" +
445453
"@" + Var("web_components_rev"),
454+
Var("dart_root") + "/third_party/pkg/webdriver":
455+
Var("dart_git") + "external/github.com/google/webdriver.dart.git" +
456+
"@" + Var("webdriver_rev"),
457+
446458
Var("dart_root") + "/third_party/pkg/web_socket_channel":
447459
Var("dart_git") + "web_socket_channel.git" +
448460
"@" + Var("web_socket_channel_rev"),
@@ -460,6 +472,17 @@ deps = {
460472
"dep_type": "cipd",
461473
},
462474

475+
Var("dart_root") + "/third_party/webdriver/chrome": {
476+
"packages": [
477+
{
478+
"package": "dart/third_party/chromedriver/${{platform}}",
479+
"version": "version:" + Var("chromedriver_tag"),
480+
}
481+
],
482+
"condition": "host_cpu == 'x64'",
483+
"dep_type": "cipd",
484+
},
485+
463486
Var("dart_root") + "/pkg/analysis_server/language_model": {
464487
"packages": [
465488
{

pkg/dds/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# 1.3.0
2+
3+
- Added support for SSE connections from web-based clients.
4+
15
# 1.2.4
26

37
- Fixed another issue where a `StateError` could be raised within `DartDevelopmentService`

pkg/dds/README.md

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,35 @@ A package used to spawn the Dart Developer Service (DDS), which is used to commu
22

33
# Functionality
44

5-
Existing VM Service clients can issue both HTTP and websocket requests to a running DDS instance as if it were an instance of the VM Service itself. If a request corresponds to an RPC defined in the [VM Service Protocol][service-protocol], DDS will forward the request and return the response from the VM Service. Requests corresponding to an RPC defined in the [DDS Protocol][dds-protocol] will be handled directly by the DDS instance.
5+
Existing VM Service clients can issue both HTTP, websocket, and SSE requests to a running DDS instance as if it were an instance of the VM Service itself. If a request corresponds to an RPC defined in the [VM Service Protocol][service-protocol], DDS will forward the request and return the response from the VM Service. Requests corresponding to an RPC defined in the [DDS Protocol][dds-protocol] will be handled directly by the DDS instance.
6+
7+
# SSE Support
8+
9+
For certain web clients it may be preferrable or required to communicate with DDS using server-sent events (SSE). DDS has a SSE handler listening for requests on `/$debugHandler`.
10+
11+
## SSE and package:vm_service example
12+
13+
```dart
14+
import 'package:sse/sse.dart';
15+
import 'package:vm_service/vm_service.dart';
16+
17+
void main() {
18+
// Establish connection with DDS using SSE.
19+
final ddsChannel = SseClient('${ddsUri}\$debugHandler');
20+
21+
// Wait for ddsChannel to be established
22+
await ddsChannel.onOpen.first;
23+
24+
// Initialize VmService using the sink and stream from ddsChannel.
25+
final vmService = VmService(
26+
ddsChannel.stream,
27+
(e) => ddsChannel.sink.add(e),
28+
);
29+
30+
// You're ready to query DDS and the VM service!
31+
print(await vmService.getVersion());
32+
}
33+
```
634

735
[dds-protocol]: dds_protocol.md
836
[service-protocol]: https://github.com/dart-lang/sdk/blob/master/runtime/vm/service/service.md

pkg/dds/lib/dds.dart

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import 'package:shelf/shelf.dart';
2121
import 'package:shelf/shelf_io.dart' as io;
2222
import 'package:shelf_proxy/shelf_proxy.dart';
2323
import 'package:shelf_web_socket/shelf_web_socket.dart';
24+
import 'package:sse/server/sse_handler.dart';
2425
import 'package:stream_channel/stream_channel.dart';
2526
import 'package:web_socket_channel/web_socket_channel.dart';
2627

@@ -110,6 +111,12 @@ abstract class DartDevelopmentService {
110111
/// Returns `null` if the service is not running.
111112
Uri get uri;
112113

114+
/// The [Uri] VM service clients can use to communicate with this
115+
/// [DartDevelopmentService] via server-sent events (SSE).
116+
///
117+
/// Returns `null` if the service is not running.
118+
Uri get sseUri;
119+
113120
/// The [Uri] VM service clients can use to communicate with this
114121
/// [DartDevelopmentService] via a [WebSocket].
115122
///

pkg/dds/lib/src/client.dart

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,46 @@ part of dds;
77
/// Representation of a single DDS client which manages the connection and
88
/// DDS request intercepting / forwarding.
99
class _DartDevelopmentServiceClient {
10-
_DartDevelopmentServiceClient(
10+
factory _DartDevelopmentServiceClient.fromWebSocket(
11+
DartDevelopmentService dds,
12+
WebSocketChannel ws,
13+
json_rpc.Peer vmServicePeer,
14+
) =>
15+
_DartDevelopmentServiceClient._(
16+
dds,
17+
ws,
18+
vmServicePeer,
19+
);
20+
21+
factory _DartDevelopmentServiceClient.fromSSEConnection(
22+
DartDevelopmentService dds,
23+
SseConnection sse,
24+
json_rpc.Peer vmServicePeer,
25+
) =>
26+
_DartDevelopmentServiceClient._(
27+
dds,
28+
sse,
29+
vmServicePeer,
30+
);
31+
32+
_DartDevelopmentServiceClient._(
1133
this.dds,
12-
this.ws,
34+
this.connection,
1335
json_rpc.Peer vmServicePeer,
1436
) : _vmServicePeer = vmServicePeer {
1537
_clientPeer = json_rpc.Peer(
1638
// Manually create a StreamChannel<String> instead of calling
17-
// ws.cast<String>() as cast() results in addStream() being called,
39+
// .cast<String>() as cast() results in addStream() being called,
1840
// binding the underlying sink. This results in a StateError being thrown
1941
// if we try and add directly to the sink, which we do for binary events
2042
// in _StreamManager's streamNotify().
2143
StreamChannel<String>(
22-
ws.stream.cast(),
44+
connection.stream.cast(),
2345
StreamController(sync: true)
2446
..stream
2547
.cast()
26-
.listen((event) => ws.sink.add(event))
27-
.onDone(() => ws.sink.close()),
48+
.listen((event) => connection.sink.add(event))
49+
.onDone(() => connection.sink.close()),
2850
),
2951
strictProtocolChecks: false,
3052
);
@@ -231,8 +253,8 @@ class _DartDevelopmentServiceClient {
231253
String _name;
232254

233255
final _DartDevelopmentService dds;
256+
final StreamChannel connection;
234257
final Map<String, String> services = {};
235258
final json_rpc.Peer _vmServicePeer;
236-
final WebSocketChannel ws;
237259
json_rpc.Peer _clientPeer;
238260
}

pkg/dds/lib/src/dds_impl.dart

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,28 +139,47 @@ class _DartDevelopmentService implements DartDevelopmentService {
139139
// Attempt to upgrade HTTP requests to a websocket before processing them as
140140
// standard HTTP requests. The websocket handler will fail quickly if the
141141
// request doesn't appear to be a websocket upgrade request.
142-
Cascade _handlers() => Cascade().add(_webSocketHandler()).add(_httpHandler());
142+
Cascade _handlers() {
143+
return Cascade()
144+
.add(_webSocketHandler())
145+
.add(_sseHandler())
146+
.add(_httpHandler());
147+
}
143148

144149
Handler _webSocketHandler() => webSocketHandler((WebSocketChannel ws) {
145-
final client = _DartDevelopmentServiceClient(
150+
final client = _DartDevelopmentServiceClient.fromWebSocket(
146151
this,
147152
ws,
148153
_vmServiceClient,
149154
);
150155
clientManager.addClient(client);
151156
});
152157

158+
Handler _sseHandler() {
159+
final handler = authCodesEnabled
160+
? SseHandler(Uri.parse('/$_authCode/$_kSseHandlerPath'))
161+
: SseHandler(Uri.parse('/$_kSseHandlerPath'));
162+
163+
handler.connections.rest.listen((sseConnection) {
164+
final client = _DartDevelopmentServiceClient.fromSSEConnection(
165+
this,
166+
sseConnection,
167+
_vmServiceClient,
168+
);
169+
clientManager.addClient(client);
170+
});
171+
172+
return handler.handler;
173+
}
174+
153175
Handler _httpHandler() {
154176
// DDS doesn't support any HTTP requests itself, so we just forward all of
155177
// them to the VM service.
156178
final cascade = Cascade().add(proxyHandler(remoteVmServiceUri));
157179
return cascade.handler;
158180
}
159181

160-
Uri _toWebSocket(Uri uri) {
161-
if (uri == null) {
162-
return null;
163-
}
182+
List<String> _cleanupPathSegments(Uri uri) {
164183
final pathSegments = <String>[];
165184
if (uri.pathSegments.isNotEmpty) {
166185
pathSegments.addAll(uri.pathSegments.where(
@@ -170,10 +189,27 @@ class _DartDevelopmentService implements DartDevelopmentService {
170189
(s) => s.isNotEmpty,
171190
));
172191
}
192+
return pathSegments;
193+
}
194+
195+
Uri _toWebSocket(Uri uri) {
196+
if (uri == null) {
197+
return null;
198+
}
199+
final pathSegments = _cleanupPathSegments(uri);
173200
pathSegments.add('ws');
174201
return uri.replace(scheme: 'ws', pathSegments: pathSegments);
175202
}
176203

204+
Uri _toSse(Uri uri) {
205+
if (uri == null) {
206+
return null;
207+
}
208+
final pathSegments = _cleanupPathSegments(uri);
209+
pathSegments.add(_kSseHandlerPath);
210+
return uri.replace(pathSegments: pathSegments);
211+
}
212+
177213
String _getNamespace(_DartDevelopmentServiceClient client) =>
178214
clientManager.clients.keyOf(client);
179215

@@ -186,6 +222,7 @@ class _DartDevelopmentService implements DartDevelopmentService {
186222
Uri _remoteVmServiceUri;
187223

188224
Uri get uri => _uri;
225+
Uri get sseUri => _toSse(_uri);
189226
Uri get wsUri => _toWebSocket(_uri);
190227
Uri _uri;
191228

@@ -210,6 +247,8 @@ class _DartDevelopmentService implements DartDevelopmentService {
210247
_StreamManager get streamManager => _streamManager;
211248
_StreamManager _streamManager;
212249

250+
static const _kSseHandlerPath = '\$debugHandler';
251+
213252
json_rpc.Peer _vmServiceClient;
214253
WebSocketChannel _vmServiceSocket;
215254
HttpServer _server;

pkg/dds/lib/src/stream_manager.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class _StreamManager {
2828
continue;
2929
}
3030
if (isBinaryData) {
31-
listener.ws.sink.add(data);
31+
listener.connection.sink.add(data);
3232
} else {
3333
listener.sendNotification('streamNotify', data);
3434
}

0 commit comments

Comments
 (0)