forked from connamara/quickfixn
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathThreadedSocketAcceptor.cs
executable file
·424 lines (368 loc) · 14.9 KB
/
ThreadedSocketAcceptor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System;
namespace QuickFix
{
// TODO v2.0 - consider changing to internal
/// <summary>
/// Acceptor implementation - with threads
/// Creates a ThreadedSocketReactor for every listening endpoint.
/// </summary>
public class ThreadedSocketAcceptor : IAcceptor
{
class AcceptorSocketDescriptor
{
#region Properties
public ThreadedSocketReactor SocketReactor
{
get { return socketReactor_; }
}
public IPEndPoint Address
{
get { return socketEndPoint_; }
}
#endregion
#region Private Members
private ThreadedSocketReactor socketReactor_;
private IPEndPoint socketEndPoint_;
private Dictionary<SessionID, Session> acceptedSessions_ = new Dictionary<SessionID, Session>();
#endregion
public AcceptorSocketDescriptor(IPEndPoint socketEndPoint, SocketSettings socketSettings, QuickFix.Dictionary sessionDict)
{
socketEndPoint_ = socketEndPoint;
socketReactor_ = new ThreadedSocketReactor(socketEndPoint_, socketSettings, sessionDict);
}
public void AcceptSession(Session session)
{
acceptedSessions_[session.SessionID] = session;
}
/// <summary>
/// Remove a session from those tied to this socket.
/// </summary>
/// <param name="sessionID">ID of session to be removed</param>
/// <returns>true if session removed, false if not found</returns>
public bool RemoveSession(SessionID sessionID)
{
return acceptedSessions_.Remove(sessionID);
}
public Dictionary<SessionID, Session> GetAcceptedSessions()
{
return new Dictionary<SessionID, Session>(acceptedSessions_);
}
}
private Dictionary<SessionID, Session> sessions_ = new Dictionary<SessionID, Session>();
private SessionSettings settings_;
private Dictionary<IPEndPoint, AcceptorSocketDescriptor> socketDescriptorForAddress_ = new Dictionary<IPEndPoint, AcceptorSocketDescriptor>();
private SessionFactory sessionFactory_;
private bool isStarted_ = false;
private bool _disposed = false;
private object sync_ = new object();
#region Constructors
public ThreadedSocketAcceptor(IApplication application, IMessageStoreFactory storeFactory, SessionSettings settings)
: this(new SessionFactory(application, storeFactory), settings)
{ }
public ThreadedSocketAcceptor(IApplication application, IMessageStoreFactory storeFactory, SessionSettings settings, ILogFactory logFactory)
: this(new SessionFactory(application, storeFactory, logFactory), settings)
{ }
public ThreadedSocketAcceptor(IApplication application, IMessageStoreFactory storeFactory, SessionSettings settings, ILogFactory logFactory, IMessageFactory messageFactory)
: this(new SessionFactory(application, storeFactory, logFactory, messageFactory), settings)
{ }
public ThreadedSocketAcceptor(SessionFactory sessionFactory, SessionSettings settings)
{
try
{
CreateSessions(settings, sessionFactory);
}
catch (System.Exception e)
{
throw new ConfigError(e.Message, e);
}
}
#endregion
#region Private Methods
private void CreateSessions(SessionSettings settings, SessionFactory sessionFactory)
{
sessionFactory_ = sessionFactory;
settings_ = settings;
foreach (SessionID sessionID in settings.GetSessions())
{
QuickFix.Dictionary dict = settings.Get(sessionID);
CreateSession(sessionID, dict);
}
if (0 == socketDescriptorForAddress_.Count)
throw new ConfigError("No acceptor sessions found in SessionSettings.");
}
private AcceptorSocketDescriptor GetAcceptorSocketDescriptor(Dictionary dict)
{
int port = System.Convert.ToInt32(dict.GetLong(SessionSettings.SOCKET_ACCEPT_PORT));
SocketSettings socketSettings = new SocketSettings();
IPEndPoint socketEndPoint;
if (dict.Has(SessionSettings.SOCKET_ACCEPT_HOST))
{
string host = dict.GetString(SessionSettings.SOCKET_ACCEPT_HOST);
IPAddress[] addrs = Dns.GetHostAddresses(host);
socketEndPoint = new IPEndPoint(addrs[0], port);
// Set hostname (if it is not already configured)
socketSettings.ServerCommonName = socketSettings.ServerCommonName ?? host;
}
else
{
socketEndPoint = new IPEndPoint(IPAddress.Any, port);
}
socketSettings.Configure(dict);
AcceptorSocketDescriptor descriptor;
if (!socketDescriptorForAddress_.TryGetValue(socketEndPoint, out descriptor))
{
descriptor = new AcceptorSocketDescriptor(socketEndPoint, socketSettings, dict);
socketDescriptorForAddress_[socketEndPoint] = descriptor;
}
return descriptor;
}
/// <summary>
/// Create session, either at start-up or as an ad-hoc operation
/// </summary>
/// <param name="sessionID">ID of new session<param>
/// <param name="dict">config settings for new session</param></param>
/// <returns>true if session added successfully, false if session already exists or is not an acceptor</returns>
private bool CreateSession(SessionID sessionID, Dictionary dict)
{
if (!sessions_.ContainsKey(sessionID))
{
string connectionType = dict.GetString(SessionSettings.CONNECTION_TYPE);
if ("acceptor" == connectionType)
{
AcceptorSocketDescriptor descriptor = GetAcceptorSocketDescriptor(dict);
Session session = sessionFactory_.Create(sessionID, dict);
descriptor.AcceptSession(session);
sessions_[sessionID] = session;
return true;
}
}
return false;
}
private void StartAcceptingConnections()
{
lock (sync_)
{
/// FIXME StartSessionTimer();
foreach (AcceptorSocketDescriptor socketDescriptor in socketDescriptorForAddress_.Values)
{
socketDescriptor.SocketReactor.Start();
/// FIXME log_.Info("Listening for connections on " + socketDescriptor.getAddress());
}
}
}
private void StopAcceptingConnections()
{
lock (sync_)
{
foreach (AcceptorSocketDescriptor socketDescriptor in socketDescriptorForAddress_.Values)
{
socketDescriptor.SocketReactor.Shutdown();
/// FIXME log_.Info("No longer accepting connections on " + socketDescriptor.getAddress());
}
}
}
private void LogoutAllSessions(bool force)
{
foreach (Session session in sessions_.Values)
{
try
{
session.Logout();
}
catch (System.Exception e)
{
/// FIXME logError(session.getSessionID(), "Error during logout", e);
System.Console.WriteLine("Error during logout of Session " + session.SessionID + ": " + e.Message);
}
}
if (force && IsLoggedOn)
{
foreach (Session session in sessions_.Values)
{
try
{
if (session.IsLoggedOn)
session.Disconnect("Forcibly disconnecting session");
}
catch (System.Exception e)
{
/// FIXME logError(session.getSessionID(), "Error during disconnect", e);
System.Console.WriteLine("Error during disconnect of Session " + session.SessionID + ": " + e.Message);
}
}
}
if (!force)
WaitForLogout();
}
/// <summary>
/// FIXME
/// </summary>
private void WaitForLogout()
{
System.Console.WriteLine("TODO - ThreadedSocketAcceptor.WaitForLogout not implemented!");
/*
int start = System.Environment.TickCount;
HashSet<Session> sessions = new HashSet<Session>(sessions_.Values);
while(sessions.Count > 0)
{
Thread.Sleep(100);
int elapsed = System.Environment.TickCount - start;
Iterator<Session> sessionItr = loggedOnSessions.iterator();
while (sessionItr.hasNext())
{
Session session = sessionItr.next();
if (elapsed >= session.getLogoutTimeout() * 1000L)
{
session.disconnect("Logout timeout, force disconnect", false);
sessionItr.remove();
}
}
// Be sure we don't look forever
if (elapsed > 60000)
{
log.warn("Stopping session logout wait after 1 minute");
break;
}
}
*/
}
private void DisposeSessions()
{
foreach (var session in sessions_.Values)
{
session.Dispose();
}
}
#endregion
#region Acceptor Members
public void Start()
{
if (_disposed)
throw new ObjectDisposedException(GetType().Name);
lock (sync_)
{
if (!isStarted_)
{
StartAcceptingConnections();
isStarted_ = true;
}
}
}
public void Stop()
{
Stop(false);
}
public void Stop(bool force)
{
if (_disposed)
throw new ObjectDisposedException(GetType().Name);
StopAcceptingConnections();
LogoutAllSessions(force);
DisposeSessions();
sessions_.Clear();
/// FIXME StopSessionTimer();
/// FIXME Session.UnregisterSessions(GetSessions());
}
/// <summary>
/// Check whether any sessions are logged on
/// </summary>
/// <returns>true if any session is logged on, else false</returns>
public bool IsLoggedOn
{
get
{
return sessions_.Values.Any(session => session.IsLoggedOn);
}
}
/// <summary>
/// Get the SessionIDs for the sessions managed by this acceptor.
/// </summary>
/// <returns>the SessionIDs for the sessions managed by this acceptor</returns>
public HashSet<SessionID> GetSessionIDs()
{
return new HashSet<SessionID>(sessions_.Keys);
}
/// <summary>
/// TODO: not yet implemented
/// </summary>
/// <returns></returns>
public Dictionary<SessionID, IPEndPoint> GetAcceptorAddresses()
{
throw new System.NotImplementedException();
}
/// <summary>
/// Add new session as an ad-oc (dynamic) operation
/// </summary>
/// <param name="sessionID">ID of new session<param>
/// <param name="dict">config settings for new session</param></param>
/// <returns>true if session added successfully, false if session already exists or is not an acceptor</returns>
public bool AddSession(SessionID sessionID, Dictionary dict)
{
lock (settings_)
if (!settings_.Has(sessionID)) // session won't be in settings if ad-hoc creation after startup
settings_.Set(sessionID, dict); // need to to this here to merge in default config settings
else
return false; // session already exists
if (CreateSession(sessionID, dict))
return true;
lock (settings_) // failed to create session, so remove from settings
settings_.Remove(sessionID);
return false;
}
/// <summary>
/// Ad-hoc removal of an existing session
/// </summary>
/// <param name="sessionID">ID of session to be removed</param>
/// <param name="terminateActiveSession">if true, force disconnection and removal of session even if it has an active connection</param>
/// <returns>true if session removed or not already present; false if could not be removed due to an active connection</returns>
public bool RemoveSession(SessionID sessionID, bool terminateActiveSession)
{
Session session = null;
if (sessions_.TryGetValue(sessionID, out session))
{
if (session.IsLoggedOn && !terminateActiveSession)
return false;
session.Disconnect("Dynamic session removal");
foreach (AcceptorSocketDescriptor descriptor in socketDescriptorForAddress_.Values)
if (descriptor.RemoveSession(sessionID))
break;
sessions_.Remove(sessionID);
session.Dispose();
lock (settings_)
settings_.Remove(sessionID);
}
return true;
}
#endregion
/// <summary>
/// Any subclasses of ThreadedSocketAcceptor should override this if they have resources to dispose
/// Any override should call base.Dispose(disposing).
/// </summary>
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
try
{
Stop();
_disposed = true;
}
catch (ObjectDisposedException)
{
// ignore
}
}
/// <summary>
/// Disposes created sessions
/// </summary>
/// <remarks>
/// To simply stop the acceptor without disposing sessions, use Stop() or Stop(bool)
/// </remarks>
public void Dispose()
{
Stop();
}
}
}