diff --git a/QuickFIXn/AbstractInitiator.cs b/QuickFIXn/AbstractInitiator.cs index 7abffd562..f720b1c5d 100644 --- a/QuickFIXn/AbstractInitiator.cs +++ b/QuickFIXn/AbstractInitiator.cs @@ -64,7 +64,7 @@ public void Start() foreach (SessionID sessionID in _settings.GetSessions()) { Dictionary dict = _settings.Get(sessionID); - AddSession(sessionID, dict); + CreateSession(sessionID, dict); } if (0 == sessions_.Count) @@ -78,20 +78,40 @@ public void Start() } /// - /// Add new session, either at start-up or as an ad-hoc operation + /// Add new session as an ad-hoc (dynamic) operation /// /// ID of new session /// config settings for new session /// true if session added successfully, false if session already exists or is not an initiator public bool AddSession(SessionID sessionID, Dictionary dict) + { + lock (_settings) + if (!_settings.Has(sessionID)) // session won't be in settings if ad-hoc creation after startup + _settings.Set(sessionID, dict); // need to to this here to merge in default config settings + else + return false; // session already exists + + if (CreateSession(sessionID, dict)) + return true; + + lock (_settings) // failed to create new session + _settings.Remove(sessionID); + return false; + } + + /// + /// Create session, either at start-up or as an ad-hoc operation + /// + /// ID of new session + /// config settings for new session + /// true if session added successfully, false if session already exists or is not an initiator + private bool CreateSession(SessionID sessionID, Dictionary dict) { if (dict.GetString(SessionSettings.CONNECTION_TYPE) == "initiator" && !sessionIDs_.Contains(sessionID)) { Session session = sessionFactory_.Create(sessionID, dict); lock (sync_) { - if (!_settings.Has(sessionID)) // session won't be in settings if ad-hoc creation after startup - _settings.Set(sessionID, dict); sessionIDs_.Add(sessionID); sessions_[sessionID] = session; SetDisconnected(sessionID); @@ -118,19 +138,19 @@ public bool RemoveSession(SessionID sessionID, bool terminateActiveSession) session = sessions_[sessionID]; if (session.IsLoggedOn && !terminateActiveSession) return false; - sessions_.Remove(sessionID); - _settings.Remove(sessionID); disconnectRequired = IsConnected(sessionID) || IsPending(sessionID); if (disconnectRequired) SetDisconnected(sessionID); disconnected_.Remove(sessionID); sessionIDs_.Remove(sessionID); - OnRemove(sessionID); } } + lock (_settings) + _settings.Remove(sessionID); if (disconnectRequired) session.Disconnect("Dynamic session removal"); + OnRemove(sessionID); // ensure session's reader thread is gone before we dispose session if (session != null) session.Dispose(); return true; diff --git a/QuickFIXn/SocketInitiatorThread.cs b/QuickFIXn/SocketInitiatorThread.cs index a84aecf39..354885e0a 100755 --- a/QuickFIXn/SocketInitiatorThread.cs +++ b/QuickFIXn/SocketInitiatorThread.cs @@ -50,7 +50,9 @@ public void Join() if (null == thread_) return; Disconnect(); - thread_.Join(5000); + // Make sure session's socket reader thread doesn't try to do a Join on itself! + if (Thread.CurrentThread.ManagedThreadId != thread_.ManagedThreadId) + thread_.Join(2000); thread_ = null; } diff --git a/QuickFIXn/ThreadedSocketAcceptor.cs b/QuickFIXn/ThreadedSocketAcceptor.cs index c76329991..8f1690d80 100755 --- a/QuickFIXn/ThreadedSocketAcceptor.cs +++ b/QuickFIXn/ThreadedSocketAcceptor.cs @@ -65,6 +65,7 @@ public Dictionary GetAcceptedSessions() } private Dictionary sessions_ = new Dictionary(); + private SessionSettings settings_; private Dictionary socketDescriptorForAddress_ = new Dictionary(); private SessionFactory sessionFactory_; private bool isStarted_ = false; @@ -103,10 +104,11 @@ public ThreadedSocketAcceptor(SessionFactory sessionFactory, SessionSettings set private void CreateSessions(SessionSettings settings, SessionFactory sessionFactory) { sessionFactory_ = sessionFactory; + settings_ = settings; foreach (SessionID sessionID in settings.GetSessions()) { QuickFix.Dictionary dict = settings.Get(sessionID); - AddSession(sessionID, dict); + CreateSession(sessionID, dict); } if (0 == socketDescriptorForAddress_.Count) @@ -146,12 +148,12 @@ private AcceptorSocketDescriptor GetAcceptorSocketDescriptor(Dictionary dict) } /// - /// Add new session, either at start-up or as an ad-hoc operation + /// Create session, either at start-up or as an ad-hoc operation /// /// ID of new session /// config settings for new session /// true if session added successfully, false if session already exists or is not an acceptor - public bool AddSession(SessionID sessionID, Dictionary dict) + private bool CreateSession(SessionID sessionID, Dictionary dict) { if (!sessions_.ContainsKey(sessionID)) { @@ -168,29 +170,6 @@ public bool AddSession(SessionID sessionID, Dictionary dict) return false; } - /// - /// Ad-hoc removal of an existing session - /// - /// ID of session to be removed - /// if true, force disconnection and removal of session even if it has an active connection - /// true if session removed or not already present; false if could not be removed due to an active connection - public bool RemoveSession(SessionID sessionID, bool terminateActiveSession) - { - Session session = null; - if (sessions_.TryGetValue(sessionID, out session)) - { - if (session.IsLoggedOn && !terminateActiveSession) - return false; - session.Disconnect("Dynamic session removal"); - foreach (AcceptorSocketDescriptor descriptor in socketDescriptorForAddress_.Values) - if (descriptor.RemoveSession(sessionID)) - break; - sessions_.Remove(sessionID); - session.Dispose(); - } - return true; - } - private void StartAcceptingConnections() { lock (sync_) @@ -345,6 +324,54 @@ public Dictionary GetAcceptorAddresses() throw new System.NotImplementedException(); } + /// + /// Add new session as an ad-oc (dynamic) operation + /// + /// ID of new session + /// config settings for new session + /// true if session added successfully, false if session already exists or is not an acceptor + public bool AddSession(SessionID sessionID, Dictionary dict) + { + lock (settings_) + if (!settings_.Has(sessionID)) // session won't be in settings if ad-hoc creation after startup + settings_.Set(sessionID, dict); // need to to this here to merge in default config settings + else + return false; // session already exists + + if (CreateSession(sessionID, dict)) + return true; + + lock (settings_) // failed to create session, so remove from settings + settings_.Remove(sessionID); + return false; + } + + + /// + /// Ad-hoc removal of an existing session + /// + /// ID of session to be removed + /// if true, force disconnection and removal of session even if it has an active connection + /// true if session removed or not already present; false if could not be removed due to an active connection + public bool RemoveSession(SessionID sessionID, bool terminateActiveSession) + { + Session session = null; + if (sessions_.TryGetValue(sessionID, out session)) + { + if (session.IsLoggedOn && !terminateActiveSession) + return false; + session.Disconnect("Dynamic session removal"); + foreach (AcceptorSocketDescriptor descriptor in socketDescriptorForAddress_.Values) + if (descriptor.RemoveSession(sessionID)) + break; + sessions_.Remove(sessionID); + session.Dispose(); + lock (settings_) + settings_.Remove(sessionID); + } + return true; + } + #endregion } } diff --git a/QuickFIXn/Transport/SocketInitiator.cs b/QuickFIXn/Transport/SocketInitiator.cs index 030c17fcd..c58dfca0b 100755 --- a/QuickFIXn/Transport/SocketInitiator.cs +++ b/QuickFIXn/Transport/SocketInitiator.cs @@ -69,12 +69,17 @@ public static void SocketInitiatorThreadStart(object socketInitiatorThread) catch (IOException ex) // Can be exception when connecting, during ssl authentication or when reading { t.Session.Log.OnEvent("Connection failed: " + ex.Message); - t.Initiator.RemoveThread(t); - t.Initiator.SetDisconnected(t.Session.SessionID); } catch (SocketException e) { t.Session.Log.OnEvent("Connection failed: " + e.Message); + } + catch (Exception) + { + // It might be the logger ObjectDisposedException, so don't try to log! + } + finally + { t.Initiator.RemoveThread(t); t.Initiator.SetDisconnected(t.Session.SessionID); } @@ -90,10 +95,26 @@ private void AddThread(SocketInitiatorThread thread) private void RemoveThread(SocketInitiatorThread thread) { - lock (sync_) + RemoveThread(thread.Session.SessionID); + } + + private void RemoveThread(SessionID sessionID) + { + // We can come in here on the thread being removed, and on another thread too in the case + // of dynamic session removal, so make sure we won't deadlock... + if (Monitor.TryEnter(sync_)) { - thread.Join(); - threads_.Remove(thread.Session.SessionID); + SocketInitiatorThread thread = null; + if (threads_.TryGetValue(sessionID, out thread)) + { + try + { + thread.Join(); + } + catch { } + threads_.Remove(sessionID); + } + Monitor.Exit(sync_); } } @@ -172,7 +193,7 @@ protected override void OnStart() /// ID of session being removed protected override void OnRemove(SessionID sessionID) { - sessionToHostNum_.Remove(sessionID); + RemoveThread(sessionID); } protected override bool OnPoll(double timeout) diff --git a/UnitTests/SessionDynamicTest.cs b/UnitTests/SessionDynamicTest.cs index 3f3f01e6f..407907853 100644 --- a/UnitTests/SessionDynamicTest.cs +++ b/UnitTests/SessionDynamicTest.cs @@ -66,6 +66,8 @@ public SocketState(Socket s) const string FIXMessageEnd = @"\x0110=\d{3}\x01"; const string FIXMessageDelimit = @"(8=FIX|\A).*?(" + FIXMessageEnd + @"|\z)"; + const string LogPath = "log"; + SocketInitiator _initiator; ThreadedSocketAcceptor _acceptor; Dictionary _sessions; @@ -80,10 +82,6 @@ Dictionary CreateSessionConfig(string targetCompID, bool isInitiator) settings.SetString(SessionSettings.START_TIME, "12:00:00"); settings.SetString(SessionSettings.END_TIME, "12:00:00"); settings.SetString(SessionSettings.HEARTBTINT, "300"); - settings.SetString(SessionSettings.SOCKET_CONNECT_HOST, Host); - settings.SetString(SessionSettings.SOCKET_CONNECT_PORT, ConnectPort.ToString()); - settings.SetString(SessionSettings.SOCKET_ACCEPT_HOST, Host); - settings.SetString(SessionSettings.SOCKET_ACCEPT_PORT, AcceptPort.ToString()); return settings; } @@ -113,14 +111,23 @@ void StartEngine(bool initiator) { TestApplication application = new TestApplication(LogonCallback, LogoffCallback); IMessageStoreFactory storeFactory = new MemoryStoreFactory(); - ILogFactory logFactory = new ScreenLogFactory(false, false, false); SessionSettings settings = new SessionSettings(); + Dictionary defaults = new Dictionary(); + defaults.SetString(QuickFix.SessionSettings.FILE_LOG_PATH, LogPath); + + // Put IP endpoint settings into default section to verify that that defaults get merged into + // session-specific settings not only for static sessions, but also for dynamic ones + defaults.SetString(SessionSettings.SOCKET_CONNECT_HOST, Host); + defaults.SetString(SessionSettings.SOCKET_CONNECT_PORT, ConnectPort.ToString()); + defaults.SetString(SessionSettings.SOCKET_ACCEPT_HOST, Host); + defaults.SetString(SessionSettings.SOCKET_ACCEPT_PORT, AcceptPort.ToString()); + + settings.Set(defaults); + ILogFactory logFactory = new FileLogFactory(settings); if (initiator) { - Dictionary defaults = new Dictionary(); defaults.SetString(SessionSettings.RECONNECT_INTERVAL, "1"); - settings.Set(defaults); settings.Set(CreateSessionID(StaticInitiatorCompID), CreateSessionConfig(StaticInitiatorCompID, true)); _initiator = new SocketInitiator(application, storeFactory, settings, logFactory); _initiator.Start(); @@ -294,11 +301,22 @@ void SendLogon(Socket s, string senderCompID) s.Send(Encoding.ASCII.GetBytes(msg.ToString())); } + void ClearLogs() + { + if (System.IO.Directory.Exists(LogPath)) + try + { + System.IO.Directory.Delete(LogPath, true); + } + catch { } + } + [SetUp] public void Setup() { _sessions = new Dictionary(); _loggedOnCompIDs = new HashSet(); + ClearLogs(); } [TearDown] @@ -313,6 +331,9 @@ public void TearDown() _initiator = null; _acceptor = null; + + Thread.Sleep(500); + ClearLogs(); } [Test] @@ -334,11 +355,15 @@ public void DynamicAcceptor() // Add the dynamic acceptor and ensure that we can now log on var sessionID = CreateSessionID(dynamicCompID); - Assert.IsTrue(_acceptor.AddSession(sessionID, CreateSessionConfig(dynamicCompID, false)), "Failed to add dynamic session to acceptor"); + var sessionConfig = CreateSessionConfig(dynamicCompID, false); + Assert.IsTrue(_acceptor.AddSession(sessionID, sessionConfig), "Failed to add dynamic session to acceptor"); var socket03 = ConnectToEngine(); SendLogon(socket03, dynamicCompID); Assert.IsTrue(WaitForLogonStatus(dynamicCompID), "Failed to logon dynamic acceptor session"); + // Ensure that we can't add the same session again + Assert.IsFalse(_acceptor.AddSession(sessionID, sessionConfig), "Added dynamic session twice"); + // Now that dynamic acceptor is logged on, ensure that unforced attempt to remove session fails Assert.IsFalse(_acceptor.RemoveSession(sessionID, false), "Unexpected success removing active session"); Assert.IsTrue(socket03.Connected, "Unexpected loss of connection"); @@ -359,6 +384,7 @@ public void DynamicAcceptor() Assert.IsTrue(_acceptor.RemoveSession(CreateSessionID(StaticAcceptorCompID), true), "Failed to remove active session"); Assert.IsTrue(WaitForDisconnect(socket01), "Socket still connected after session removed"); Assert.IsFalse(IsLoggedOn(StaticAcceptorCompID), "Session still logged on after being removed"); + } [Test] @@ -374,11 +400,15 @@ public void DynamicInitiator() // Add the dynamic initator and ensure that we can log on string dynamicCompID = "ini10"; var sessionID = CreateSessionID(dynamicCompID); - Assert.IsTrue(_initiator.AddSession(sessionID, CreateSessionConfig(dynamicCompID, true)), "Failed to add dynamic session to initiator"); + var sessionConfig = CreateSessionConfig(dynamicCompID, true); + Assert.IsTrue(_initiator.AddSession(sessionID, sessionConfig), "Failed to add dynamic session to initiator"); Assert.IsTrue(WaitForLogonMessage(dynamicCompID), "Failed to get logon message for dynamic initiator session"); SendInitiatorLogon(dynamicCompID); Assert.IsTrue(WaitForLogonStatus(dynamicCompID), "Failed to logon dynamic initiator session"); + // Ensure that we can't add the same session again + Assert.IsFalse(_initiator.AddSession(sessionID, sessionConfig), "Added dynamic session twice"); + // Now that dynamic initiator is logged on, ensure that unforced attempt to remove session fails Assert.IsFalse(_initiator.RemoveSession(sessionID, false), "Unexpected success removing active session"); Assert.IsTrue(IsLoggedOn(dynamicCompID), "Unexpected logoff"); @@ -402,6 +432,9 @@ public void DynamicInitiator() Assert.IsTrue(_initiator.RemoveSession(CreateSessionID(StaticInitiatorCompID), true), "Failed to remove active session"); Assert.IsTrue(WaitForDisconnect(StaticInitiatorCompID), "Socket still connected after session removed"); Assert.IsFalse(IsLoggedOn(StaticInitiatorCompID), "Session still logged on after being removed"); + + // Check that log directory default setting has been effective + Assert.Greater(System.IO.Directory.GetFiles(LogPath, QuickFix.Values.BeginString_FIX42 + "*.log").Length, 0); } } }