Skip to content

Commit

Permalink
Merge pull request connamara#365 from martinadams/issue_363
Browse files Browse the repository at this point in the history
Issue 363: fixes for dyamic session functionality
  • Loading branch information
cbusbey committed Feb 22, 2016
2 parents b6a66c2 + 321fa96 commit 4c76145
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 49 deletions.
34 changes: 27 additions & 7 deletions QuickFIXn/AbstractInitiator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void Start()
foreach (SessionID sessionID in _settings.GetSessions())
{
Dictionary dict = _settings.Get(sessionID);
AddSession(sessionID, dict);
CreateSession(sessionID, dict);
}

if (0 == sessions_.Count)
Expand All @@ -78,20 +78,40 @@ public void Start()
}

/// <summary>
/// Add new session, either at start-up or as an ad-hoc operation
/// Add new session as an ad-hoc (dynamic) operation
/// </summary>
/// <param name="sessionID">ID of new session<param>
/// <param name="dict">config settings for new session</param></param>
/// <returns>true if session added successfully, false if session already exists or is not an initiator</returns>
public bool AddSession(SessionID sessionID, Dictionary dict)
{
lock (_settings)
if (!_settings.Has(sessionID)) // session won't be in settings if ad-hoc creation after startup
_settings.Set(sessionID, dict); // need to to this here to merge in default config settings
else
return false; // session already exists

if (CreateSession(sessionID, dict))
return true;

lock (_settings) // failed to create new session
_settings.Remove(sessionID);
return false;
}

/// <summary>
/// Create session, either at start-up or as an ad-hoc operation
/// </summary>
/// <param name="sessionID">ID of new session<param>
/// <param name="dict">config settings for new session</param></param>
/// <returns>true if session added successfully, false if session already exists or is not an initiator</returns>
private bool CreateSession(SessionID sessionID, Dictionary dict)
{
if (dict.GetString(SessionSettings.CONNECTION_TYPE) == "initiator" && !sessionIDs_.Contains(sessionID))
{
Session session = sessionFactory_.Create(sessionID, dict);
lock (sync_)
{
if (!_settings.Has(sessionID)) // session won't be in settings if ad-hoc creation after startup
_settings.Set(sessionID, dict);
sessionIDs_.Add(sessionID);
sessions_[sessionID] = session;
SetDisconnected(sessionID);
Expand All @@ -118,19 +138,19 @@ public bool RemoveSession(SessionID sessionID, bool terminateActiveSession)
session = sessions_[sessionID];
if (session.IsLoggedOn && !terminateActiveSession)
return false;

sessions_.Remove(sessionID);
_settings.Remove(sessionID);
disconnectRequired = IsConnected(sessionID) || IsPending(sessionID);
if (disconnectRequired)
SetDisconnected(sessionID);
disconnected_.Remove(sessionID);
sessionIDs_.Remove(sessionID);
OnRemove(sessionID);
}
}
lock (_settings)
_settings.Remove(sessionID);
if (disconnectRequired)
session.Disconnect("Dynamic session removal");
OnRemove(sessionID); // ensure session's reader thread is gone before we dispose session
if (session != null)
session.Dispose();
return true;
Expand Down
4 changes: 3 additions & 1 deletion QuickFIXn/SocketInitiatorThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ public void Join()
if (null == thread_)
return;
Disconnect();
thread_.Join(5000);
// Make sure session's socket reader thread doesn't try to do a Join on itself!
if (Thread.CurrentThread.ManagedThreadId != thread_.ManagedThreadId)
thread_.Join(2000);
thread_ = null;
}

Expand Down
79 changes: 53 additions & 26 deletions QuickFIXn/ThreadedSocketAcceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public Dictionary<SessionID, Session> GetAcceptedSessions()
}

private Dictionary<SessionID, Session> sessions_ = new Dictionary<SessionID, Session>();
private SessionSettings settings_;
private Dictionary<IPEndPoint, AcceptorSocketDescriptor> socketDescriptorForAddress_ = new Dictionary<IPEndPoint, AcceptorSocketDescriptor>();
private SessionFactory sessionFactory_;
private bool isStarted_ = false;
Expand Down Expand Up @@ -103,10 +104,11 @@ public ThreadedSocketAcceptor(SessionFactory sessionFactory, SessionSettings set
private void CreateSessions(SessionSettings settings, SessionFactory sessionFactory)
{
sessionFactory_ = sessionFactory;
settings_ = settings;
foreach (SessionID sessionID in settings.GetSessions())
{
QuickFix.Dictionary dict = settings.Get(sessionID);
AddSession(sessionID, dict);
CreateSession(sessionID, dict);
}

if (0 == socketDescriptorForAddress_.Count)
Expand Down Expand Up @@ -146,12 +148,12 @@ private AcceptorSocketDescriptor GetAcceptorSocketDescriptor(Dictionary dict)
}

/// <summary>
/// Add new session, either at start-up or as an ad-hoc operation
/// Create session, either at start-up or as an ad-hoc operation
/// </summary>
/// <param name="sessionID">ID of new session<param>
/// <param name="dict">config settings for new session</param></param>
/// <returns>true if session added successfully, false if session already exists or is not an acceptor</returns>
public bool AddSession(SessionID sessionID, Dictionary dict)
private bool CreateSession(SessionID sessionID, Dictionary dict)
{
if (!sessions_.ContainsKey(sessionID))
{
Expand All @@ -168,29 +170,6 @@ public bool AddSession(SessionID sessionID, Dictionary dict)
return false;
}

/// <summary>
/// Ad-hoc removal of an existing session
/// </summary>
/// <param name="sessionID">ID of session to be removed</param>
/// <param name="terminateActiveSession">if true, force disconnection and removal of session even if it has an active connection</param>
/// <returns>true if session removed or not already present; false if could not be removed due to an active connection</returns>
public bool RemoveSession(SessionID sessionID, bool terminateActiveSession)
{
Session session = null;
if (sessions_.TryGetValue(sessionID, out session))
{
if (session.IsLoggedOn && !terminateActiveSession)
return false;
session.Disconnect("Dynamic session removal");
foreach (AcceptorSocketDescriptor descriptor in socketDescriptorForAddress_.Values)
if (descriptor.RemoveSession(sessionID))
break;
sessions_.Remove(sessionID);
session.Dispose();
}
return true;
}

private void StartAcceptingConnections()
{
lock (sync_)
Expand Down Expand Up @@ -345,6 +324,54 @@ public Dictionary<SessionID, IPEndPoint> GetAcceptorAddresses()
throw new System.NotImplementedException();
}

/// <summary>
/// Add new session as an ad-oc (dynamic) operation
/// </summary>
/// <param name="sessionID">ID of new session<param>
/// <param name="dict">config settings for new session</param></param>
/// <returns>true if session added successfully, false if session already exists or is not an acceptor</returns>
public bool AddSession(SessionID sessionID, Dictionary dict)
{
lock (settings_)
if (!settings_.Has(sessionID)) // session won't be in settings if ad-hoc creation after startup
settings_.Set(sessionID, dict); // need to to this here to merge in default config settings
else
return false; // session already exists

if (CreateSession(sessionID, dict))
return true;

lock (settings_) // failed to create session, so remove from settings
settings_.Remove(sessionID);
return false;
}


/// <summary>
/// Ad-hoc removal of an existing session
/// </summary>
/// <param name="sessionID">ID of session to be removed</param>
/// <param name="terminateActiveSession">if true, force disconnection and removal of session even if it has an active connection</param>
/// <returns>true if session removed or not already present; false if could not be removed due to an active connection</returns>
public bool RemoveSession(SessionID sessionID, bool terminateActiveSession)
{
Session session = null;
if (sessions_.TryGetValue(sessionID, out session))
{
if (session.IsLoggedOn && !terminateActiveSession)
return false;
session.Disconnect("Dynamic session removal");
foreach (AcceptorSocketDescriptor descriptor in socketDescriptorForAddress_.Values)
if (descriptor.RemoveSession(sessionID))
break;
sessions_.Remove(sessionID);
session.Dispose();
lock (settings_)
settings_.Remove(sessionID);
}
return true;
}

#endregion
}
}
33 changes: 27 additions & 6 deletions QuickFIXn/Transport/SocketInitiator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,17 @@ public static void SocketInitiatorThreadStart(object socketInitiatorThread)
catch (IOException ex) // Can be exception when connecting, during ssl authentication or when reading
{
t.Session.Log.OnEvent("Connection failed: " + ex.Message);
t.Initiator.RemoveThread(t);
t.Initiator.SetDisconnected(t.Session.SessionID);
}
catch (SocketException e)
{
t.Session.Log.OnEvent("Connection failed: " + e.Message);
}
catch (Exception)
{
// It might be the logger ObjectDisposedException, so don't try to log!
}
finally
{
t.Initiator.RemoveThread(t);
t.Initiator.SetDisconnected(t.Session.SessionID);
}
Expand All @@ -90,10 +95,26 @@ private void AddThread(SocketInitiatorThread thread)

private void RemoveThread(SocketInitiatorThread thread)
{
lock (sync_)
RemoveThread(thread.Session.SessionID);
}

private void RemoveThread(SessionID sessionID)
{
// We can come in here on the thread being removed, and on another thread too in the case
// of dynamic session removal, so make sure we won't deadlock...
if (Monitor.TryEnter(sync_))
{
thread.Join();
threads_.Remove(thread.Session.SessionID);
SocketInitiatorThread thread = null;
if (threads_.TryGetValue(sessionID, out thread))
{
try
{
thread.Join();
}
catch { }
threads_.Remove(sessionID);
}
Monitor.Exit(sync_);
}
}

Expand Down Expand Up @@ -172,7 +193,7 @@ protected override void OnStart()
/// <param name="sessionID">ID of session being removed</param>
protected override void OnRemove(SessionID sessionID)
{
sessionToHostNum_.Remove(sessionID);
RemoveThread(sessionID);
}

protected override bool OnPoll(double timeout)
Expand Down
51 changes: 42 additions & 9 deletions UnitTests/SessionDynamicTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public SocketState(Socket s)
const string FIXMessageEnd = @"\x0110=\d{3}\x01";
const string FIXMessageDelimit = @"(8=FIX|\A).*?(" + FIXMessageEnd + @"|\z)";

const string LogPath = "log";

SocketInitiator _initiator;
ThreadedSocketAcceptor _acceptor;
Dictionary<string, SocketState> _sessions;
Expand All @@ -80,10 +82,6 @@ Dictionary CreateSessionConfig(string targetCompID, bool isInitiator)
settings.SetString(SessionSettings.START_TIME, "12:00:00");
settings.SetString(SessionSettings.END_TIME, "12:00:00");
settings.SetString(SessionSettings.HEARTBTINT, "300");
settings.SetString(SessionSettings.SOCKET_CONNECT_HOST, Host);
settings.SetString(SessionSettings.SOCKET_CONNECT_PORT, ConnectPort.ToString());
settings.SetString(SessionSettings.SOCKET_ACCEPT_HOST, Host);
settings.SetString(SessionSettings.SOCKET_ACCEPT_PORT, AcceptPort.ToString());
return settings;
}

Expand Down Expand Up @@ -113,14 +111,23 @@ void StartEngine(bool initiator)
{
TestApplication application = new TestApplication(LogonCallback, LogoffCallback);
IMessageStoreFactory storeFactory = new MemoryStoreFactory();
ILogFactory logFactory = new ScreenLogFactory(false, false, false);
SessionSettings settings = new SessionSettings();
Dictionary defaults = new Dictionary();
defaults.SetString(QuickFix.SessionSettings.FILE_LOG_PATH, LogPath);

// Put IP endpoint settings into default section to verify that that defaults get merged into
// session-specific settings not only for static sessions, but also for dynamic ones
defaults.SetString(SessionSettings.SOCKET_CONNECT_HOST, Host);
defaults.SetString(SessionSettings.SOCKET_CONNECT_PORT, ConnectPort.ToString());
defaults.SetString(SessionSettings.SOCKET_ACCEPT_HOST, Host);
defaults.SetString(SessionSettings.SOCKET_ACCEPT_PORT, AcceptPort.ToString());

settings.Set(defaults);
ILogFactory logFactory = new FileLogFactory(settings);

if (initiator)
{
Dictionary defaults = new Dictionary();
defaults.SetString(SessionSettings.RECONNECT_INTERVAL, "1");
settings.Set(defaults);
settings.Set(CreateSessionID(StaticInitiatorCompID), CreateSessionConfig(StaticInitiatorCompID, true));
_initiator = new SocketInitiator(application, storeFactory, settings, logFactory);
_initiator.Start();
Expand Down Expand Up @@ -294,11 +301,22 @@ void SendLogon(Socket s, string senderCompID)
s.Send(Encoding.ASCII.GetBytes(msg.ToString()));
}

void ClearLogs()
{
if (System.IO.Directory.Exists(LogPath))
try
{
System.IO.Directory.Delete(LogPath, true);
}
catch { }
}

[SetUp]
public void Setup()
{
_sessions = new Dictionary<string, SocketState>();
_loggedOnCompIDs = new HashSet<string>();
ClearLogs();
}

[TearDown]
Expand All @@ -313,6 +331,9 @@ public void TearDown()

_initiator = null;
_acceptor = null;

Thread.Sleep(500);
ClearLogs();
}

[Test]
Expand All @@ -334,11 +355,15 @@ public void DynamicAcceptor()

// Add the dynamic acceptor and ensure that we can now log on
var sessionID = CreateSessionID(dynamicCompID);
Assert.IsTrue(_acceptor.AddSession(sessionID, CreateSessionConfig(dynamicCompID, false)), "Failed to add dynamic session to acceptor");
var sessionConfig = CreateSessionConfig(dynamicCompID, false);
Assert.IsTrue(_acceptor.AddSession(sessionID, sessionConfig), "Failed to add dynamic session to acceptor");
var socket03 = ConnectToEngine();
SendLogon(socket03, dynamicCompID);
Assert.IsTrue(WaitForLogonStatus(dynamicCompID), "Failed to logon dynamic acceptor session");

// Ensure that we can't add the same session again
Assert.IsFalse(_acceptor.AddSession(sessionID, sessionConfig), "Added dynamic session twice");

// Now that dynamic acceptor is logged on, ensure that unforced attempt to remove session fails
Assert.IsFalse(_acceptor.RemoveSession(sessionID, false), "Unexpected success removing active session");
Assert.IsTrue(socket03.Connected, "Unexpected loss of connection");
Expand All @@ -359,6 +384,7 @@ public void DynamicAcceptor()
Assert.IsTrue(_acceptor.RemoveSession(CreateSessionID(StaticAcceptorCompID), true), "Failed to remove active session");
Assert.IsTrue(WaitForDisconnect(socket01), "Socket still connected after session removed");
Assert.IsFalse(IsLoggedOn(StaticAcceptorCompID), "Session still logged on after being removed");

}

[Test]
Expand All @@ -374,11 +400,15 @@ public void DynamicInitiator()
// Add the dynamic initator and ensure that we can log on
string dynamicCompID = "ini10";
var sessionID = CreateSessionID(dynamicCompID);
Assert.IsTrue(_initiator.AddSession(sessionID, CreateSessionConfig(dynamicCompID, true)), "Failed to add dynamic session to initiator");
var sessionConfig = CreateSessionConfig(dynamicCompID, true);
Assert.IsTrue(_initiator.AddSession(sessionID, sessionConfig), "Failed to add dynamic session to initiator");
Assert.IsTrue(WaitForLogonMessage(dynamicCompID), "Failed to get logon message for dynamic initiator session");
SendInitiatorLogon(dynamicCompID);
Assert.IsTrue(WaitForLogonStatus(dynamicCompID), "Failed to logon dynamic initiator session");

// Ensure that we can't add the same session again
Assert.IsFalse(_initiator.AddSession(sessionID, sessionConfig), "Added dynamic session twice");

// Now that dynamic initiator is logged on, ensure that unforced attempt to remove session fails
Assert.IsFalse(_initiator.RemoveSession(sessionID, false), "Unexpected success removing active session");
Assert.IsTrue(IsLoggedOn(dynamicCompID), "Unexpected logoff");
Expand All @@ -402,6 +432,9 @@ public void DynamicInitiator()
Assert.IsTrue(_initiator.RemoveSession(CreateSessionID(StaticInitiatorCompID), true), "Failed to remove active session");
Assert.IsTrue(WaitForDisconnect(StaticInitiatorCompID), "Socket still connected after session removed");
Assert.IsFalse(IsLoggedOn(StaticInitiatorCompID), "Session still logged on after being removed");

// Check that log directory default setting has been effective
Assert.Greater(System.IO.Directory.GetFiles(LogPath, QuickFix.Values.BeginString_FIX42 + "*.log").Length, 0);
}
}
}

0 comments on commit 4c76145

Please sign in to comment.