Skip to content

Commit

Permalink
[pkg/dds] refactor parts into private libraries
Browse files Browse the repository at this point in the history
Change-Id: Ic104627b676119742606f7b34b025c6f0d093db0
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/176600
Reviewed-by: Ben Konyi <bkonyi@google.com>
Commit-Queue: Devon Carew <devoncarew@google.com>
  • Loading branch information
devoncarew authored and commit-bot@chromium.org committed Dec 29, 2020
1 parent df7473e commit 13b1af4
Show file tree
Hide file tree
Showing 14 changed files with 261 additions and 201 deletions.
2 changes: 1 addition & 1 deletion pkg/dds/analysis_options.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
include: package:pedantic/analysis_options.1.7.0.yaml
include: package:pedantic/analysis_options.1.8.0.yaml

linter:
rules:
Expand Down
40 changes: 7 additions & 33 deletions pkg/dds/lib/dds.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,9 @@
library dds;

import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'dart:io';
import 'dart:math';
import 'dart:typed_data';

import 'package:async/async.dart';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'package:meta/meta.dart';
import 'package:pedantic/pedantic.dart';
import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart' as io;
import 'package:shelf_proxy/shelf_proxy.dart';
import 'package:shelf_web_socket/shelf_web_socket.dart';
import 'package:sse/server/sse_handler.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

part 'src/binary_compatible_peer.dart';
part 'src/client.dart';
part 'src/client_manager.dart';
part 'src/constants.dart';
part 'src/dds_impl.dart';
part 'src/expression_evaluator.dart';
part 'src/logging_repository.dart';
part 'src/isolate_manager.dart';
part 'src/named_lookup.dart';
part 'src/rpc_error_codes.dart';
part 'src/stream_manager.dart';

import 'src/dds_impl.dart';

/// An intermediary between a Dart VM service and its clients that offers
/// additional functionality on top of the standard VM service protocol.
Expand Down Expand Up @@ -99,7 +73,7 @@ abstract class DartDevelopmentService {
}
}

final service = _DartDevelopmentService(
final service = DartDevelopmentServiceImpl(
remoteVmServiceUri,
serviceUri,
enableAuthCodes,
Expand Down Expand Up @@ -171,22 +145,22 @@ class DartDevelopmentServiceException implements Exception {
/// Set when a connection error has occurred after startup.
static const int connectionError = 3;

factory DartDevelopmentServiceException._existingDdsInstanceError(
String message) {
factory DartDevelopmentServiceException.existingDdsInstance(String message) {
return DartDevelopmentServiceException._(existingDdsInstanceError, message);
}

factory DartDevelopmentServiceException._failedToStartError() {
factory DartDevelopmentServiceException.failedToStart() {
return DartDevelopmentServiceException._(
failedToStartError, 'Failed to start Dart Development Service');
}

factory DartDevelopmentServiceException._connectionError(String message) {
factory DartDevelopmentServiceException.connectionIssue(String message) {
return DartDevelopmentServiceException._(connectionError, message);
}

DartDevelopmentServiceException._(this.errorCode, this.message);

@override
String toString() => 'DartDevelopmentServiceException: $message';

final int errorCode;
Expand Down
19 changes: 14 additions & 5 deletions pkg/dds/lib/src/binary_compatible_peer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

part of dds;
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';

import 'package:async/async.dart';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

import 'stream_manager.dart';

/// Adds support for binary events send from the VM service, which are not part
/// of the official JSON RPC 2.0 specification.
Expand All @@ -17,10 +26,10 @@ part of dds;
/// ```
/// where `metadata` is the JSON body of the event.
///
/// [_BinaryCompatiblePeer] assumes that only stream events can contain a
/// [BinaryCompatiblePeer] assumes that only stream events can contain a
/// binary payload (e.g., clients cannot send a `BinaryEvent` to the VM service).
class _BinaryCompatiblePeer extends json_rpc.Peer {
_BinaryCompatiblePeer(WebSocketChannel ws, _StreamManager streamManager)
class BinaryCompatiblePeer extends json_rpc.Peer {
BinaryCompatiblePeer(WebSocketChannel ws, StreamManager streamManager)
: super(
ws.transform<String>(
StreamChannelTransformer(
Expand All @@ -39,7 +48,7 @@ class _BinaryCompatiblePeer extends json_rpc.Peer {
);

static void _transformStream(
_StreamManager streamManager, dynamic data, EventSink<String> sink) {
StreamManager streamManager, dynamic data, EventSink<String> sink) {
if (data is String) {
// Non-binary messages come in as Strings. Simply forward to the sink.
sink.add(data);
Expand Down
63 changes: 37 additions & 26 deletions pkg/dds/lib/src/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,46 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

part of dds;
import 'dart:async';

import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'package:meta/meta.dart';
import 'package:sse/server/sse_handler.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

import '../dds.dart';
import 'constants.dart';
import 'dds_impl.dart';
import 'rpc_error_codes.dart';
import 'stream_manager.dart';

/// Representation of a single DDS client which manages the connection and
/// DDS request intercepting / forwarding.
class _DartDevelopmentServiceClient {
factory _DartDevelopmentServiceClient.fromWebSocket(
class DartDevelopmentServiceClient {
factory DartDevelopmentServiceClient.fromWebSocket(
DartDevelopmentService dds,
WebSocketChannel ws,
json_rpc.Peer vmServicePeer,
) =>
_DartDevelopmentServiceClient._(
DartDevelopmentServiceClient._(
dds,
ws,
vmServicePeer,
);

factory _DartDevelopmentServiceClient.fromSSEConnection(
factory DartDevelopmentServiceClient.fromSSEConnection(
DartDevelopmentService dds,
SseConnection sse,
json_rpc.Peer vmServicePeer,
) =>
_DartDevelopmentServiceClient._(
DartDevelopmentServiceClient._(
dds,
sse,
vmServicePeer,
);

_DartDevelopmentServiceClient._(
DartDevelopmentServiceClient._(
this.dds,
this.connection,
json_rpc.Peer vmServicePeer,
Expand All @@ -39,7 +51,7 @@ class _DartDevelopmentServiceClient {
// .cast<String>() as cast() results in addStream() being called,
// binding the underlying sink. This results in a StateError being thrown
// if we try and add directly to the sink, which we do for binary events
// in _StreamManager's streamNotify().
// in StreamManager's streamNotify().
StreamChannel<String>(
connection.stream.cast(),
StreamController(sync: true)
Expand Down Expand Up @@ -88,21 +100,21 @@ class _DartDevelopmentServiceClient {
_clientPeer.registerMethod('streamListen', (parameters) async {
final streamId = parameters['streamId'].asString;
await dds.streamManager.streamListen(this, streamId);
return _RPCResponses.success;
return RPCResponses.success;
});

_clientPeer.registerMethod('streamCancel', (parameters) async {
final streamId = parameters['streamId'].asString;
await dds.streamManager.streamCancel(this, streamId);
return _RPCResponses.success;
return RPCResponses.success;
});

_clientPeer.registerMethod('registerService', (parameters) async {
final serviceId = parameters['service'].asString;
final alias = parameters['alias'].asString;
if (services.containsKey(serviceId)) {
throw _RpcErrorCodes.buildRpcException(
_RpcErrorCodes.kServiceAlreadyRegistered,
throw RpcErrorCodes.buildRpcException(
RpcErrorCodes.kServiceAlreadyRegistered,
);
}
services[serviceId] = alias;
Expand All @@ -112,7 +124,7 @@ class _DartDevelopmentServiceClient {
serviceId,
alias,
);
return _RPCResponses.success;
return RPCResponses.success;
});

_clientPeer.registerMethod(
Expand Down Expand Up @@ -140,9 +152,8 @@ class _DartDevelopmentServiceClient {
'getLogHistorySize',
(parameters) => {
'type': 'Size',
'size': _StreamManager
.loggingRepositories[_StreamManager.kLoggingStream]
.bufferSize,
'size': StreamManager
.loggingRepositories[StreamManager.kLoggingStream].bufferSize,
});

_clientPeer.registerMethod('setLogHistorySize', (parameters) {
Expand All @@ -152,9 +163,9 @@ class _DartDevelopmentServiceClient {
"'size' must be greater or equal to zero",
);
}
_StreamManager.loggingRepositories[_StreamManager.kLoggingStream]
StreamManager.loggingRepositories[StreamManager.kLoggingStream]
.resize(size);
return _RPCResponses.success;
return RPCResponses.success;
});

_clientPeer.registerMethod('getDartDevelopmentServiceVersion',
Expand Down Expand Up @@ -219,21 +230,21 @@ class _DartDevelopmentServiceClient {
[
// Forward the request to the service client or...
serviceClient.sendRequest(method, parameters.asMap).catchError((_) {
throw _RpcErrorCodes.buildRpcException(
_RpcErrorCodes.kServiceDisappeared,
throw RpcErrorCodes.buildRpcException(
RpcErrorCodes.kServiceDisappeared,
);
}, test: (error) => error is StateError),
// if the service client closes, return an error response.
serviceClient._clientPeer.done.then(
(_) => throw _RpcErrorCodes.buildRpcException(
_RpcErrorCodes.kServiceDisappeared,
(_) => throw RpcErrorCodes.buildRpcException(
RpcErrorCodes.kServiceDisappeared,
),
),
],
);
}
throw json_rpc.RpcException(
_RpcErrorCodes.kMethodNotFound,
RpcErrorCodes.kMethodNotFound,
'Unknown service: ${parameters.method}',
);
});
Expand All @@ -260,12 +271,12 @@ class _DartDevelopmentServiceClient {
String get name => _name;

// NOTE: this should not be called directly except from:
// - `_ClientManager._clearClientName`
// - `_ClientManager._setClientNameHelper`
// - `ClientManager._clearClientName`
// - `ClientManager._setClientNameHelper`
set name(String n) => _name = n ?? defaultClientName;
String _name;

final _DartDevelopmentService dds;
final DartDevelopmentServiceImpl dds;
final StreamChannel connection;
final Map<String, String> services = {};
final json_rpc.Peer _vmServicePeer;
Expand Down
Loading

0 comments on commit 13b1af4

Please sign in to comment.