@@ -22,20 +22,25 @@ namespace libsignalservice
22
22
/// </summary>
23
23
public class SignalServiceMessagePipe
24
24
{
25
- private readonly ILogger Logger = LibsignalLogging . CreateLogger < SignalServiceMessagePipe > ( ) ;
26
- private readonly ISignalWebSocketFactory SignalWebSocketFactory ;
27
- private readonly SignalWebSocketConnection Websocket ;
28
- private readonly ICredentialsProvider ? CredentialsProvider ;
29
- private CancellationToken Token ;
25
+ private readonly ILogger logger = LibsignalLogging . CreateLogger < SignalServiceMessagePipe > ( ) ;
26
+ private readonly ISignalWebSocketFactory signalWebSocketFactory ;
27
+ private readonly SignalWebSocketConnection websocket ;
28
+ private readonly ICredentialsProvider ? credentialsProvider ;
29
+ private CancellationToken token ;
30
30
31
- internal SignalServiceMessagePipe ( CancellationToken token , SignalWebSocketConnection websocket ,
32
- ICredentialsProvider ? credentialsProvider , ISignalWebSocketFactory webSocketFactory )
31
+ internal SignalServiceMessagePipe ( SignalWebSocketConnection websocket ,
32
+ ICredentialsProvider ? credentialsProvider , ISignalWebSocketFactory webSocketFactory , CancellationToken ? token = null )
33
33
{
34
- Logger . LogTrace ( "SignalServiceMessagePipe()" ) ;
35
- Token = token ;
36
- Websocket = websocket ;
37
- CredentialsProvider = credentialsProvider ;
38
- SignalWebSocketFactory = webSocketFactory ;
34
+ if ( token == null )
35
+ {
36
+ token = CancellationToken . None ;
37
+ }
38
+
39
+ logger . LogTrace ( "SignalServiceMessagePipe()" ) ;
40
+ this . token = token . Value ;
41
+ this . websocket = websocket ;
42
+ this . credentialsProvider = credentialsProvider ;
43
+ signalWebSocketFactory = webSocketFactory ;
39
44
}
40
45
41
46
/// <summary>
@@ -44,8 +49,8 @@ internal SignalServiceMessagePipe(CancellationToken token, SignalWebSocketConnec
44
49
/// <returns>Task</returns>
45
50
public async Task Connect ( )
46
51
{
47
- Logger . LogTrace ( "Connecting to message pipe" ) ;
48
- await Websocket . Connect ( ) ;
52
+ logger . LogTrace ( "Connecting to message pipe" ) ;
53
+ await websocket . Connect ( ) ;
49
54
}
50
55
51
56
/// <summary>
@@ -54,39 +59,39 @@ public async Task Connect()
54
59
/// <param name="callback"></param>
55
60
public async Task ReadBlocking ( IMessagePipeCallback callback )
56
61
{
57
- Logger . LogTrace ( "ReadBlocking()" ) ;
58
- if ( CredentialsProvider == null )
62
+ logger . LogTrace ( "ReadBlocking()" ) ;
63
+ if ( credentialsProvider == null )
59
64
{
60
65
throw new ArgumentException ( "You can't read messages if you haven't specified credentials" ) ;
61
66
}
62
- WebSocketRequestMessage request = Websocket . ReadRequestBlocking ( ) ;
67
+ WebSocketRequestMessage request = websocket . ReadRequestBlocking ( ) ;
63
68
64
69
if ( IsSignalServiceEnvelope ( request ) )
65
70
{
66
71
SignalServiceMessagePipeMessage message = new SignalServiceEnvelope ( request . Body . ToByteArray ( ) ) ;
67
72
WebSocketResponseMessage response = CreateWebSocketResponse ( request ) ;
68
73
try
69
74
{
70
- Logger . LogDebug ( "Calling callback with message {0}" , request . Id ) ;
75
+ logger . LogDebug ( "Calling callback with message {0}" , request . Id ) ;
71
76
await callback . OnMessage ( message ) ;
72
77
}
73
78
finally
74
79
{
75
- if ( ! Token . IsCancellationRequested )
80
+ if ( ! token . IsCancellationRequested )
76
81
{
77
- Logger . LogDebug ( "Confirming message {0}" , request . Id ) ;
78
- Websocket . SendResponse ( response ) ;
82
+ logger . LogDebug ( "Confirming message {0}" , request . Id ) ;
83
+ websocket . SendResponse ( response ) ;
79
84
}
80
85
}
81
86
}
82
87
else if ( IsPipeEmptyMessage ( request ) )
83
88
{
84
- Logger . LogInformation ( "Calling callback with SignalServiceMessagePipeEmptyMessage" ) ;
89
+ logger . LogInformation ( "Calling callback with SignalServiceMessagePipeEmptyMessage" ) ;
85
90
await callback . OnMessage ( new SignalServiceMessagePipeEmptyMessage ( ) ) ;
86
91
}
87
92
else
88
93
{
89
- Logger . LogWarning ( "Unknown request: {0} {1}" , request . Verb , request . Path ) ;
94
+ logger . LogWarning ( "Unknown request: {0} {1}" , request . Verb , request . Path ) ;
90
95
}
91
96
}
92
97
@@ -98,7 +103,7 @@ public async Task ReadBlocking(IMessagePipeCallback callback)
98
103
/// <returns></returns>
99
104
public async Task < SendMessageResponse > Send ( OutgoingPushMessageList list , UnidentifiedAccess ? unidentifiedAccess )
100
105
{
101
- Logger . LogTrace ( "Send()" ) ;
106
+ logger . LogTrace ( "Send()" ) ;
102
107
var headers = new List < string > ( )
103
108
{
104
109
"content-type:application/json"
@@ -115,7 +120,7 @@ public async Task<SendMessageResponse> Send(OutgoingPushMessageList list, Uniden
115
120
Body = ByteString . CopyFrom ( Encoding . UTF8 . GetBytes ( JsonUtil . ToJson ( list ) ) )
116
121
} ;
117
122
requestmessage . Headers . AddRange ( headers ) ;
118
- var sendTask = ( await Websocket . SendRequest ( requestmessage ) ) . Task ;
123
+ var sendTask = ( await websocket . SendRequest ( requestmessage ) ) . Task ;
119
124
var timerCancelSource = new CancellationTokenSource ( ) ;
120
125
if ( await Task . WhenAny ( sendTask , Task . Delay ( 10 * 1000 , timerCancelSource . Token ) ) == sendTask )
121
126
{
@@ -131,7 +136,7 @@ public async Task<SendMessageResponse> Send(OutgoingPushMessageList list, Uniden
131
136
}
132
137
else
133
138
{
134
- Logger . LogError ( "Sending message {0} failed: timeout" , requestmessage . Id ) ;
139
+ logger . LogError ( "Sending message {0} failed: timeout" , requestmessage . Id ) ;
135
140
throw new IOException ( "timeout reached while waiting for confirmation" ) ;
136
141
}
137
142
}
@@ -144,7 +149,7 @@ public async Task<SendMessageResponse> Send(OutgoingPushMessageList list, Uniden
144
149
/// <returns></returns>
145
150
public async Task < SignalServiceProfile > GetProfile ( SignalServiceAddress address , UnidentifiedAccess ? unidentifiedAccess )
146
151
{
147
- Logger . LogTrace ( "GetProfile()" ) ;
152
+ logger . LogTrace ( "GetProfile()" ) ;
148
153
var headers = new List < string > ( )
149
154
{
150
155
"content-type:application/json"
@@ -160,7 +165,7 @@ public async Task<SignalServiceProfile> GetProfile(SignalServiceAddress address,
160
165
Path = $ "/v1/profile/{ address . GetIdentifier ( ) } "
161
166
} ;
162
167
163
- var sendTask = ( await Websocket . SendRequest ( requestMessage ) ) . Task ;
168
+ var sendTask = ( await websocket . SendRequest ( requestMessage ) ) . Task ;
164
169
var timerCancelSource = new CancellationTokenSource ( ) ;
165
170
if ( await Task . WhenAny ( sendTask , Task . Delay ( TimeSpan . FromSeconds ( 10 ) , timerCancelSource . Token ) ) == sendTask )
166
171
{
@@ -192,7 +197,7 @@ public async Task<AttachmentV2UploadAttributes> GetAttachmentV2UploadAttributesA
192
197
Path = "/v2/attachments/form/upload"
193
198
} ;
194
199
195
- var sendTask = ( await Websocket . SendRequest ( requestMessage ) ) . Task ;
200
+ var sendTask = ( await websocket . SendRequest ( requestMessage ) ) . Task ;
196
201
var timerCancelSource = new CancellationTokenSource ( ) ;
197
202
if ( await Task . WhenAny ( sendTask , Task . Delay ( TimeSpan . FromSeconds ( 10 ) , timerCancelSource . Token ) ) == sendTask )
198
203
{
@@ -225,7 +230,7 @@ public async Task<AttachmentV3UploadAttributes> GetAttachmentV3UploadAttributesA
225
230
Path = "/v3/attachments/form/upload"
226
231
} ;
227
232
228
- var sendTask = ( await Websocket . SendRequest ( requestMessage ) ) . Task ;
233
+ var sendTask = ( await websocket . SendRequest ( requestMessage ) ) . Task ;
229
234
var timerCancelSource = new CancellationTokenSource ( ) ;
230
235
if ( await Task . WhenAny ( sendTask , Task . Delay ( TimeSpan . FromSeconds ( 10 ) , timerCancelSource . Token ) ) == sendTask )
231
236
{
@@ -249,7 +254,7 @@ public async Task<AttachmentV3UploadAttributes> GetAttachmentV3UploadAttributesA
249
254
/// </summary>
250
255
public void Shutdown ( )
251
256
{
252
- Websocket . Disconnect ( ) ;
257
+ websocket . Disconnect ( ) ;
253
258
}
254
259
255
260
private bool IsSignalServiceEnvelope ( WebSocketRequestMessage message )
0 commit comments