-
Notifications
You must be signed in to change notification settings - Fork 7
/
socket_service.dart
115 lines (100 loc) · 3.16 KB
/
socket_service.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import 'dart:async';
import 'dart:convert';
import 'package:xrpl_dart/xrpl_dart.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
enum WebsocketStatus { connecting, connect, discounnect }
typedef OnResponse = void Function(Map<String, dynamic>);
typedef OnClose = void Function(Object?);
class WebsockerRequestCompeleter {
WebsockerRequestCompeleter(this.request);
final Completer<Map<String, dynamic>> completer = Completer();
final RPCRequestDetails request;
}
class RPCWebSocketService with RpcService {
RPCWebSocketService._(this.url, WebSocketChannel channel,
{this.defaultRequestTimeOut = const Duration(seconds: 30),
this.onClose,
this.onEvents})
: _socket = channel {
_subscription = channel.stream
.cast<String>()
.listen(_onMessge, onError: _onClose, onDone: _onDone);
}
WebSocketChannel? _socket;
StreamSubscription<String>? _subscription;
final Duration defaultRequestTimeOut;
OnClose? onClose;
OnResponse? onEvents;
Map<int, WebsockerRequestCompeleter> requests = {};
bool _isDiscounnect = false;
bool get isConnected => _isDiscounnect;
@override
final String url;
void add(RPCRequestDetails params) {
if (_isDiscounnect) {
throw StateError("socket has beed discounected");
}
_socket?.sink.add(json.encode(params.toWebsocketParams()));
}
void _onClose(Object? error) {
_isDiscounnect = true;
_socket?.sink.close().catchError((e) => null);
_socket = null;
_subscription?.cancel().catchError((e) {});
_subscription = null;
onClose?.call(error);
onClose = null;
onEvents = null;
}
void _onDone() {
_onClose(null);
}
void discounnect() {
_onClose(null);
}
static Future<RPCWebSocketService> connect(
String url, {
Iterable<String>? protocols,
Duration defaultRequestTimeOut = const Duration(seconds: 30),
OnClose? onClose,
OnResponse? onEvents,
final Duration connectionTimeOut = const Duration(seconds: 30),
}) async {
final channel =
WebSocketChannel.connect(Uri.parse(url), protocols: protocols);
await channel.ready.timeout(connectionTimeOut);
return RPCWebSocketService._(url, channel,
defaultRequestTimeOut: defaultRequestTimeOut,
onClose: onClose,
onEvents: onEvents);
}
void _onMessge(String event) {
final Map<String, dynamic> decode = json.decode(event);
if (decode.containsKey("id")) {
final int id = decode["id"];
final request = requests.remove(id);
request?.completer.complete(decode);
} else {
onEvents?.call(decode);
}
}
@override
Future<Map<String, dynamic>> call(RPCRequestDetails params,
[Duration? timeout]) async {
final WebsockerRequestCompeleter compeleter =
WebsockerRequestCompeleter(params);
try {
requests[params.id] = compeleter;
add(params);
final result = await compeleter.completer.future
.timeout(timeout ?? defaultRequestTimeOut);
return result;
} finally {
requests.remove(params.id);
}
}
@override
Future<String> post(String url, String body, {Map<String, String>? header}) {
throw UnimplementedError();
}
}