Skip to content

Commit

Permalink
Merge pull request #18 from Goddchen/feature/refactor-start-read-loop
Browse files Browse the repository at this point in the history
refactor: replace startReadLoop
  • Loading branch information
Goddchen authored Dec 24, 2022
2 parents 934eec0 + 3a51548 commit 16c4afd
Show file tree
Hide file tree
Showing 8 changed files with 982 additions and 111 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,7 @@
## 0.6.0

- Add clipboard support

## 0.7.0

- Refactoring: replace `startReadLoop()` with `Stream<RemoteFrameBufferClientReadMessage> incomingMessages` and `void handleIncomingMessages()`
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,16 @@ client.updateStream.listen(
// Update your framebuffer
},
);
await client.startReadLoop();
// To let the package handle incoming messages
client.handleIncomingMessages();
// Or handle yourself
StreamSubscription<RemoteFrameBufferClientReadMessage> subscription =
client.incomingMessages.listen(
(final RemoteFrameBufferClientReadMessage message) {
// Do something
},
);
// Request an initial framebuffer update
client.requestUpdate();
```

Expand Down
2 changes: 1 addition & 1 deletion example/dart_rfb_example.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ import 'package:dart_rfb/dart_rfb.dart';
void main() async {
final RemoteFrameBufferClient client = RemoteFrameBufferClient();
await client.connect(hostname: '127.0.0.1');
await client.startReadLoop();
client.handleIncomingMessages();
}
237 changes: 130 additions & 107 deletions lib/src/client/remote_frame_buffer_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import 'package:dart_des/dart_des.dart';
import 'package:dart_rfb/src/client/config.dart';
import 'package:dart_rfb/src/client/remote_frame_buffer_client_key_event.dart';
import 'package:dart_rfb/src/client/remote_frame_buffer_client_pointer_event.dart';
import 'package:dart_rfb/src/client/remote_frame_buffer_client_read_message.dart';
import 'package:dart_rfb/src/client/remote_frame_buffer_client_update.dart';
import 'package:dart_rfb/src/constants.dart';
import 'package:dart_rfb/src/extensions/byte_data_extensions.dart';
Expand Down Expand Up @@ -39,6 +40,9 @@ class RemoteFrameBufferClient {

Option<Config> _config = none();

Option<StreamSubscription<RemoteFrameBufferClientReadMessage>>
_incomingMessagesSubscription = none();

Option<String> _password = none();

bool _readLoopRunning = false;
Expand All @@ -59,6 +63,85 @@ class RemoteFrameBufferClient {
/// The config used by the underlying session.
Option<Config> get config => _config;

/// Start the reading loop that reads incoming protocol messages.
/// You can then call [handleIncomingMessages] to let the package handle
/// incoming messages or just listen to the returned [Stream] diretly to
/// handle incoming messages yourself.
Stream<RemoteFrameBufferClientReadMessage> get incomingMessages async* {
assert(!_readLoopRunning);
yield* _socket.match(
() => throw Exception('Socket not available'),
(final RawSocket socket) async* {
final Config config =
_config.getOrElse(() => throw Exception('Config not available'));
_readLoopRunning = true;
while (_readLoopRunning) {
final int messageType = (await socket
.readSync(
length: 1,
readWaitDuration: some(Constants.socketReadWaitDuration),
)
.run())
.getUint8(0);
logger.log(Level.INFO, '< messageType: $messageType');
switch (messageType) {
case 0:
// read and ignore padding
await socket.readSync(length: 1).run();
yield (await RemoteFrameBufferFrameBufferUpdateMessage
.readFromSocket(config: config, socket: socket)
.run())
.match(
(final Object error) => throw Exception(
'Error reading and handling update message: $error',
),
(
final RemoteFrameBufferFrameBufferUpdateMessage updateMessage,
) {
logger.info(
'< update rectangles: ${updateMessage.rectangles.groupListsBy((final RemoteFrameBufferFrameBufferUpdateMessageRectangle rectangle) => rectangle.encodingType).mapValue((final List<RemoteFrameBufferFrameBufferUpdateMessageRectangle> list) => list.length)}',
);
return RemoteFrameBufferClientReadMessage.frameBufferUpdate(
message: updateMessage,
);
},
);
break;
case 1: // SetColorMapEntries
final int numberOfColors =
(await socket.readSync(length: 5).run()).getUint16(3);
socket.readSync(length: numberOfColors * 6);
yield const RemoteFrameBufferClientReadMessage
.setColorMapEntries();
break;
case 2: // Bell
// no data, just ignore for now
yield const RemoteFrameBufferClientReadMessage.bell();
break;
case 3: // ServerCutText
final RemoteFrameBufferServerCutTextMessage message =
(await RemoteFrameBufferServerCutTextMessage.readFromSocket(
socket: socket,
).run())
.getOrElse(
(final Object error) => throw Exception(
'Error reading server cut text: $error',
),
);
logger.info('< $message');
yield RemoteFrameBufferClientReadMessage.serverCutTextMessage(
message: message,
);
// _serverClipBoardStreamController.add(message.text);
break;
default:
throw Exception('Receive unsupported message type: $messageType');
}
}
},
);
}

/// A [Stream] that will give access to the server's clipboard updates.
Stream<String> get serverClipBoardStream =>
_serverClipBoardStreamController.stream;
Expand All @@ -70,6 +153,14 @@ class RemoteFrameBufferClient {
/// Dispose the currently active session and all used resources.
Future<void> close() async {
_readLoopRunning = false;
await _incomingMessagesSubscription.match(
() {},
(
final StreamSubscription<RemoteFrameBufferClientReadMessage>
subscription,
) =>
subscription.cancel(),
);
await _updateStreamController.close();
}

Expand Down Expand Up @@ -101,6 +192,45 @@ class RemoteFrameBufferClient {
(final _) {},
);

/// Invoke this method to let the package read and handle incoming messages.
/// If you want to handle incoming messages yourself, directly listen to
/// [incomingMessages].
void handleIncomingMessages() {
_incomingMessagesSubscription = some(
incomingMessages
.listen((final RemoteFrameBufferClientReadMessage message) {
message.when(
bell: () {},
frameBufferUpdate:
(final RemoteFrameBufferFrameBufferUpdateMessage message) {
_updateStreamController.add(
RemoteFrameBufferClientUpdate(
rectangles: message.rectangles.map(
(
final RemoteFrameBufferFrameBufferUpdateMessageRectangle
rectangle,
) =>
RemoteFrameBufferClientUpdateRectangle(
byteData: rectangle.pixelData,
encodingType: rectangle.encodingType,
height: rectangle.height,
width: rectangle.width,
x: rectangle.x,
y: rectangle.y,
),
),
),
);
},
serverCutTextMessage:
(final RemoteFrameBufferServerCutTextMessage message) =>
_serverClipBoardStreamController.add(message.text),
setColorMapEntries: () {},
);
}),
);
}

/// Request a framebuffer update from the server.
/// Call this once you are finished processing any received updates.
void requestUpdate() => _socket.match(
Expand Down Expand Up @@ -178,113 +308,6 @@ class RemoteFrameBufferClient {
},
);

/// Start the reading loop that handles incoming protocol messages.
Future<void> startReadLoop() async => await _socket.match(
() => throw Exception('Socket not available'),
(final RawSocket socket) async {
(await TaskEither<Object, void>.tryCatch(
() async {
final Config config = _config
.getOrElse(() => throw Exception('Config not available'));
socket.write(
RemoteFrameBufferFrameBufferUpdateRequestMessage(
height: config.frameBufferHeight,
incremental: true,
width: config.frameBufferWidth,
x: 0,
y: 0,
).toBytes().asUint8List(),
);
_readLoopRunning = true;
while (_readLoopRunning) {
final int messageType = (await socket
.readSync(
length: 1,
readWaitDuration:
some(Constants.socketReadWaitDuration),
)
.run())
.getUint8(0);
logger.log(Level.INFO, '< messageType: $messageType');
switch (messageType) {
case 0:
// read and ignore padding
await socket.readSync(length: 1).run();
(await RemoteFrameBufferFrameBufferUpdateMessage
.readFromSocket(config: config, socket: socket)
.run())
.match(
(final Object error) => logger.log(
Level.INFO,
'Error reading and handling update message: $error',
),
(
final RemoteFrameBufferFrameBufferUpdateMessage
updateMessage,
) {
logger.log(
Level.INFO,
'< update rectangles: ${updateMessage.rectangles.groupListsBy((final RemoteFrameBufferFrameBufferUpdateMessageRectangle rectangle) => rectangle.encodingType).mapValue((final List<RemoteFrameBufferFrameBufferUpdateMessageRectangle> list) => list.length)}',
);
_updateStreamController.add(
RemoteFrameBufferClientUpdate(
rectangles: updateMessage.rectangles.map(
(
final RemoteFrameBufferFrameBufferUpdateMessageRectangle
rectangle,
) =>
RemoteFrameBufferClientUpdateRectangle(
byteData: rectangle.pixelData,
encodingType: rectangle.encodingType,
height: rectangle.height,
width: rectangle.width,
x: rectangle.x,
y: rectangle.y,
),
),
),
);
},
);
break;
case 1: // SetColorMapEntries
final int numberOfColors =
(await socket.readSync(length: 5).run()).getUint16(3);
socket.readSync(length: numberOfColors * 6);
break;
case 2: // Bell
// no data, just ignore for now
break;
case 3: // ServerCutText
final RemoteFrameBufferServerCutTextMessage message =
(await RemoteFrameBufferServerCutTextMessage
.readFromSocket(socket: socket)
.run())
.getOrElse(
(final Object error) => throw Exception(
'Error reading server cut text: $error',
),
);
logger.info('< $message');
_serverClipBoardStreamController.add(message.text);
break;
default:
logger.info(
'Receive unsupported message type: $messageType',
);
break;
}
}
},
(final Object error, final _) => error,
).run())
.match(
(final Object error) => throw Exception(error),
(final _) {},
);
},
);

TaskEither<Object, void> _handleSecurityType({
required final RawSocket socket,
}) =>
Expand Down
20 changes: 20 additions & 0 deletions lib/src/client/remote_frame_buffer_client_read_message.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import 'package:dart_rfb/src/protocol/frame_buffer_update_message.dart';
import 'package:dart_rfb/src/protocol/server_cut_text_message.dart';
import 'package:freezed_annotation/freezed_annotation.dart';

part 'remote_frame_buffer_client_read_message.freezed.dart';

@freezed
class RemoteFrameBufferClientReadMessage
with _$RemoteFrameBufferClientReadMessage {
const factory RemoteFrameBufferClientReadMessage.bell() =
_RemoteFrameBufferClientReadMessageBell;
const factory RemoteFrameBufferClientReadMessage.frameBufferUpdate({
required final RemoteFrameBufferFrameBufferUpdateMessage message,
}) = _RemoteFrameBufferClientReadMessageFrameBufferUpdate;
const factory RemoteFrameBufferClientReadMessage.serverCutTextMessage({
required final RemoteFrameBufferServerCutTextMessage message,
}) = _RemoteFrameBufferClientReadMessageServerCutTextMessage;
const factory RemoteFrameBufferClientReadMessage.setColorMapEntries() =
_RemoteFrameBufferClientReadMessageSetColorMapEntries;
}
Loading

0 comments on commit 16c4afd

Please sign in to comment.