forked from connamara/quickfixn
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathThreadedSocketReactor.cs
executable file
·154 lines (138 loc) · 4.93 KB
/
ThreadedSocketReactor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System;
namespace QuickFix
{
// TODO v2.0 - consider changing to internal
/// <summary>
/// Handles incoming connections on a single endpoint. When a socket connection
/// is accepted, a ClientHandlerThread is created to handle the connection
/// </summary>
public class ThreadedSocketReactor
{
public enum State { RUNNING, SHUTDOWN_REQUESTED, SHUTDOWN_COMPLETE }
#region Properties
public State ReactorState
{
get { lock (sync_) { return state_; } }
}
#endregion
#region Private Members
private object sync_ = new object();
private State state_ = State.RUNNING;
private long nextClientId_ = 0;
private Thread serverThread_ = null;
private LinkedList<ClientHandlerThread> clientThreads_ = new LinkedList<ClientHandlerThread>();
private TcpListener tcpListener_;
private SocketSettings socketSettings_;
private QuickFix.Dictionary sessionDict_;
#endregion
[Obsolete("Use the other constructor")]
public ThreadedSocketReactor(IPEndPoint serverSocketEndPoint, SocketSettings socketSettings)
: this(serverSocketEndPoint, socketSettings, null)
{ }
public ThreadedSocketReactor(IPEndPoint serverSocketEndPoint, SocketSettings socketSettings, QuickFix.Dictionary sessionDict)
{
socketSettings_ = socketSettings;
tcpListener_ = new TcpListener(serverSocketEndPoint);
sessionDict_ = sessionDict;
}
public void Start()
{
serverThread_ = new Thread(new ThreadStart(Run));
serverThread_.Start();
}
public void Shutdown()
{
lock (sync_)
{
if (State.RUNNING == state_)
{
try
{
state_ = State.SHUTDOWN_REQUESTED;
tcpListener_.Server.Close();
tcpListener_.Stop();
}
catch (System.Exception e)
{
this.Log("Error while closing server socket: " + e.Message);
}
}
}
}
/// <summary>
/// TODO apply networking options, e.g. NO DELAY, LINGER, etc.
/// </summary>
public void Run()
{
tcpListener_.Start();
while (State.RUNNING == ReactorState)
{
try
{
TcpClient client = tcpListener_.AcceptTcpClient();
ApplySocketOptions(client, socketSettings_);
ClientHandlerThread t = new ClientHandlerThread(client, nextClientId_++, sessionDict_, socketSettings_);
lock (sync_)
{
clientThreads_.AddLast(t);
}
// FIXME set the client thread's exception handler here
t.Log("connected");
t.Start();
}
catch (System.Exception e)
{
if (State.RUNNING == ReactorState)
this.Log("Error accepting connection: " + e.Message);
}
}
ShutdownClientHandlerThreads();
}
/// <summary>
/// FIXME get socket options from SessionSettings
/// </summary>
/// <param name="client"></param>
public static void ApplySocketOptions(TcpClient client, SocketSettings socketSettings)
{
client.LingerState = new LingerOption(false, 0);
client.NoDelay = socketSettings.SocketNodelay;
}
private void ShutdownClientHandlerThreads()
{
lock (sync_)
{
if (State.SHUTDOWN_COMPLETE != state_)
{
this.Log("shutting down...");
while (clientThreads_.Count > 0)
{
ClientHandlerThread t = clientThreads_.First.Value;
clientThreads_.RemoveFirst();
t.Shutdown("reactor is shutting down");
try
{
t.Join();
}
catch (System.Exception e)
{
t.Log("Error shutting down: " + e.Message);
}
}
state_ = State.SHUTDOWN_COMPLETE;
}
}
}
/// <summary>
/// FIXME do real logging
/// </summary>
/// <param name="s"></param>
private void Log(string s)
{
System.Console.WriteLine(s);
}
}
}