Skip to content

Cleanup Sessions on stop #175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions quickfixj-core/src/main/java/quickfix/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -676,15 +676,23 @@ static void registerSession(Session session) {
sessions.put(session.getSessionID(), session);
}

static void unregisterSessions(List<SessionID> sessionIds) {
static void unregisterSessions(List<SessionID> sessionIds, boolean doClose) {
for (final SessionID sessionId : sessionIds) {
final Session session = sessions.remove(sessionId);
if (session != null) {
try {
unregisterSession(sessionId, doClose);
}
}

static void unregisterSession(SessionID sessionId, boolean doClose) {
final Session session = sessions.get(sessionId);
if (session != null) {
try {
if (doClose) {
session.close();
} catch (final IOException e) {
LOG.error("Failed to close session resources", e);
}
} catch (final IOException e) {
LOG.error("Failed to close session resources", e);
} finally {
sessions.remove(sessionId);
}
}
}
Expand Down Expand Up @@ -2911,13 +2919,15 @@ public boolean isAllowedForSession(InetAddress remoteInetAddress) {
}

/**
* Closes session resources. This is for internal use and should typically
* not be called by an user application.
* Closes session resources and unregisters session. This is for internal
* use and should typically not be called by an user application.
*/
@Override
public void close() throws IOException {
closeIfCloseable(getLog());
closeIfCloseable(getStore());
// clean up session just in case close() was not called from Session.unregisterSession()
unregisterSession(this.sessionID, false);
}

private void closeIfCloseable(Object resource) throws IOException {
Expand Down
3 changes: 2 additions & 1 deletion quickfixj-core/src/main/java/quickfix/SocketAcceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public void stop(boolean forceDisconnect) {
stopSessionTimer();
} finally {
eventHandlingStrategy.stopHandlingMessages();
Session.unregisterSessions(getSessions());
Session.unregisterSessions(getSessions(), true);
clearConnectorSessions();
isStarted = Boolean.FALSE;
}
}
Expand Down
3 changes: 2 additions & 1 deletion quickfixj-core/src/main/java/quickfix/SocketInitiator.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public void stop(boolean forceDisconnect) {
stopInitiators();
} finally {
eventHandlingStrategy.stopHandlingMessages();
Session.unregisterSessions(getSessions());
Session.unregisterSessions(getSessions(), true);
clearConnectorSessions();
isStarted = Boolean.FALSE;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public void stop(boolean forceDisconnect) {
}
stopSessionTimer();
eventHandlingStrategy.stopDispatcherThreads();
Session.unregisterSessions(getSessions());
Session.unregisterSessions(getSessions(), true);
clearConnectorSessions();
}

public void block() throws ConfigError, RuntimeError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public void stop(boolean forceDisconnect) {
logoutAllSessions(forceDisconnect);
stopInitiators();
eventHandlingStrategy.stopDispatcherThreads();
Session.unregisterSessions(getSessions());
Session.unregisterSessions(getSessions(), true);
clearConnectorSessions();
}

public void block() throws ConfigError, RuntimeError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ protected void setSessions(Map<SessionID, Session> sessions) {
propertyChangeSupport.firePropertyChange(SESSIONS_PROPERTY, null, sessions);
}

/**
* Will remove all Sessions from the SessionConnector's Session map.
* Please make sure that these Sessions were unregistered before via
* Session.unregisterSessions().
*/
protected void clearConnectorSessions() {
this.sessions.clear();
}

/**
* Get the list of session managed by this connector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ protected AbstractSocketInitiator(SessionSettings settings, SessionFactory sessi
protected void createSessionInitiators()
throws ConfigError {
try {
// QFJ698: clear() is needed on restart, otherwise the set gets filled up with
// more and more initiators which are not equal because the local port differs
initiators.clear();
createSessions();
SessionSettings settings = getSettings();
for (final Session session : getSessionMap().values()) {
Expand Down Expand Up @@ -278,8 +275,9 @@ protected void startInitiators() {
}

protected void stopInitiators() {
for (final IoSessionInitiator initiator : initiators) {
initiator.stop();
for (Iterator<IoSessionInitiator> iterator = initiators.iterator(); iterator.hasNext();) {
iterator.next().stop();
iterator.remove();
}
super.stopSessionTimer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
public class SessionAdminTest extends TestCase {

public void testResetSequence() throws Exception {
Session session = SessionFactoryTestSupport.createSession();
MockSessionAdmin admin = new MockSessionAdmin(session, null, null);
admin.resetSequence(25);
assertEquals(1, admin.sentMessages.size());
assertEquals(25, admin.sentMessages.get(0).getInt(NewSeqNo.FIELD));
try (Session session = SessionFactoryTestSupport.createSession()) {
MockSessionAdmin admin = new MockSessionAdmin(session, null, null);
admin.resetSequence(25);
assertEquals(1, admin.sentMessages.size());
assertEquals(25, admin.sentMessages.get(0).getInt(NewSeqNo.FIELD));
}
}

private class MockSessionAdmin extends SessionAdmin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package quickfix;

import java.io.IOException;
import org.junit.Before;
import org.junit.Test;
import quickfix.field.ApplVerID;
Expand All @@ -29,6 +30,7 @@

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import org.junit.After;
import static org.junit.Assert.*;

public class DefaultSessionFactoryTest {
Expand All @@ -45,6 +47,11 @@ public void setUp() throws Exception {
new SLF4JLogFactory(new SessionSettings()));
}

@After
public void tearDown() {
Session.unregisterSession(sessionID, true);
}

@Test
public void testMinimalSettings() throws Exception {
factory.create(sessionID, settings);
Expand Down Expand Up @@ -86,29 +93,31 @@ public void testFixtDataDictionaryConfiguration() throws Exception {
settings.setString(sessionID, Session.SETTING_APP_DATA_DICTIONARY, "FIX42.xml");
settings.setString(sessionID, Session.SETTING_APP_DATA_DICTIONARY + "." + FixVersions.BEGINSTRING_FIX40, "FIX40.xml");

Session session = factory.create(sessionID, settings);

DataDictionaryProvider provider = session.getDataDictionaryProvider();
assertThat(provider.getSessionDataDictionary(sessionID.getBeginString()),
is(notNullValue()));
try (Session session = factory.create(sessionID, settings)) {

assertThat(provider.getApplicationDataDictionary(new ApplVerID(ApplVerID.FIX42)),
is(notNullValue()));
assertThat(provider.getApplicationDataDictionary(new ApplVerID(ApplVerID.FIX40)),
is(notNullValue()));
DataDictionaryProvider provider = session.getDataDictionaryProvider();
assertThat(provider.getSessionDataDictionary(sessionID.getBeginString()),
is(notNullValue()));

assertThat(provider.getApplicationDataDictionary(new ApplVerID(ApplVerID.FIX42)),
is(notNullValue()));
assertThat(provider.getApplicationDataDictionary(new ApplVerID(ApplVerID.FIX40)),
is(notNullValue()));
}
}

@Test
public void testPreFixtDataDictionaryConfiguration() throws Exception {
settings.setBool(sessionID, Session.SETTING_USE_DATA_DICTIONARY, true);

Session session = factory.create(sessionID, settings);
try (Session session = factory.create(sessionID, settings)) {

DataDictionaryProvider provider = session.getDataDictionaryProvider();
assertThat(provider.getSessionDataDictionary(sessionID.getBeginString()),
is(notNullValue()));
assertThat(provider.getApplicationDataDictionary(new ApplVerID(ApplVerID.FIX42)),
is(notNullValue()));
DataDictionaryProvider provider = session.getDataDictionaryProvider();
assertThat(provider.getSessionDataDictionary(sessionID.getBeginString()),
is(notNullValue()));
assertThat(provider.getApplicationDataDictionary(new ApplVerID(ApplVerID.FIX42)),
is(notNullValue()));
}
}

@Test
Expand Down Expand Up @@ -181,13 +190,15 @@ public void testIncorrectTimeValues() throws Exception {
@Test
public void testTestRequestDelayMultiplier() throws Exception {
settings.setString(sessionID, Session.SETTING_TEST_REQUEST_DELAY_MULTIPLIER, "0.37");
Session session = factory.create(sessionID, settings);
assertEquals(0.37, session.getTestRequestDelayMultiplier(), 0);
try (Session session = factory.create(sessionID, settings)) {
assertEquals(0.37, session.getTestRequestDelayMultiplier(), 0);
}
}

private void createSessionAndAssertConfigError(String message, String pattern) {
Session session = null;
try {
factory.create(sessionID, settings);
session = factory.create(sessionID, settings);
fail(message);
} catch (ConfigError e) {
if (pattern != null) {
Expand All @@ -196,6 +207,14 @@ private void createSessionAndAssertConfigError(String message, String pattern) {
assertTrue("exception message not matched, expected: " + pattern + ", got: "
+ e.getMessage(), m.matches());
}
} finally {
if (session != null) {
try {
session.close();
} catch (IOException ex) {
// ignore
}
}
}
}

Expand All @@ -214,7 +233,8 @@ private void setUpDefaultSettings(SessionID sessionID) {
@Test
public void testReconnectIntervalInDefaultSession() throws Exception {
settings.setString(sessionID, "ReconnectInterval", "2x5;3x15");
factory.create(sessionID, settings);
Session session = factory.create(sessionID, settings);
session.close();
}

@Test
Expand Down
17 changes: 9 additions & 8 deletions quickfixj-core/src/test/java/quickfix/FileLogTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,15 @@ public void testLogErrorWhenFilesystemRemoved() throws IOException {
settings.setBool(sessionID, FileLogFactory.SETTING_INCLUDE_MILLIS_IN_TIMESTAMP, false);
FileLogFactory factory = new FileLogFactory(settings);

Session session = new Session(new UnitTestApplication(), new MemoryStoreFactory(),
try (Session session = new Session(new UnitTestApplication(), new MemoryStoreFactory(),
sessionID, new DefaultDataDictionaryProvider(), null, factory,
new DefaultMessageFactory(), 0);
Session.registerSession(session);

FileLog log = (FileLog) session.getLog();
log.close();
log.logIncoming("test");
// no stack overflow exception thrown
new DefaultMessageFactory(), 0)) {
Session.registerSession(session);

FileLog log = (FileLog) session.getLog();
log.close();
log.logIncoming("test");
// no stack overflow exception thrown
}
}
}
19 changes: 16 additions & 3 deletions quickfixj-core/src/test/java/quickfix/JdbcLogTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,25 @@

import javax.sql.DataSource;

import junit.framework.TestCase;

public class JdbcLogTest extends TestCase {
import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertNotNull;
import static junit.framework.TestCase.assertTrue;
import static junit.framework.TestCase.fail;
import org.junit.After;
import org.junit.Test;

public class JdbcLogTest {
private JdbcLog log;
private JdbcLogFactory logFactory;
private Connection connection;
private SessionID sessionID;

@After
public void tearDown() {
Session.unregisterSession(sessionID, true);
}

@Test
public void testLog() throws Exception {
doLogTest(null);
}
Expand All @@ -60,6 +71,7 @@ private void doLogTest(DataSource dataSource) throws ClassNotFoundException, SQL
assertEquals(0, getRowCount(connection, "event_log"));
}

@Test
public void testLogWithHeartbeatFiltering() throws Exception {
setUpJdbcLog(false, null);

Expand All @@ -83,6 +95,7 @@ public void testLogWithHeartbeatFiltering() throws Exception {
* (such as we can't connect ot the DB, or the tables are missing) and doesn't try
* to print failing exceptions recursively until the stack overflows
*/
@Test
public void testHandlesRecursivelyFailingException() throws Exception {
setUpJdbcLog(false, null);

Expand Down
Loading