Skip to content

dynamic initiator session (QFJ-247) #157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,13 @@ <H3>QuickFIX Settings</H3>
<TD> valid IP address in the format of x.x.x.x or a domain name </TD>
<TD> If unset the socket will be bound to all local interfaces.</TD>
</TR>
<TR ALIGN="left" VALIGN="middle">
<TD> <I>DynamicSession</I> </TD>

<TD> Leave the corresponding session disconnected until AbstractSocketInitiator.createDynamicSession is called</TD>
<TD> Y<br/>N </TD>
<TD> N</TD>
</TR>

<TR ALIGN="center" VALIGN="middle">
<TD COLSPAN="4" class="subsection"><A NAME="Acceptor">Acceptor</A></TD>
Expand Down
7 changes: 7 additions & 0 deletions quickfixj-core/src/main/java/quickfix/Initiator.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,11 @@ public interface Initiator extends Connector {
* type is "initiator".
*/
String SETTING_PROXY_WORKSTATION = "ProxyWorkstation";

/**
* Leave the corresponding session disconnected until
* AbstractSocketInitiator.createDynamicSession is called
*/
String SETTING_DYNAMIC_SESSION = "DynamicSession";

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,75 +78,81 @@ protected void createSessionInitiators()
throws ConfigError {
try {
createSessions();
SessionSettings settings = getSettings();
for (final Session session : getSessionMap().values()) {
final SessionID sessionID = session.getSessionID();
final int[] reconnectingIntervals = getReconnectIntervalInSeconds(sessionID);

final SocketAddress[] socketAddresses = getSocketAddresses(sessionID);
if (socketAddresses.length == 0) {
throw new ConfigError("Must specify at least one socket address");
}

SocketAddress localAddress = getLocalAddress(settings, sessionID);
createInitiator(session);
}
} catch (final FieldConvertError e) {
throw new ConfigError(e);
}
}

final NetworkingOptions networkingOptions = new NetworkingOptions(getSettings()
.getSessionProperties(sessionID, true));
private void createInitiator(final Session session) throws ConfigError, FieldConvertError {

SessionSettings settings = getSettings();
final SessionID sessionID = session.getSessionID();
final int[] reconnectingIntervals = getReconnectIntervalInSeconds(sessionID);

boolean sslEnabled = false;
SSLConfig sslConfig = null;
if (getSettings().isSetting(sessionID, SSLSupport.SETTING_USE_SSL)
&& BooleanConverter.convert(getSettings().getString(sessionID, SSLSupport.SETTING_USE_SSL))) {
sslEnabled = true;
sslConfig = SSLSupport.getSslConfig(getSettings(), sessionID);
}
final SocketAddress[] socketAddresses = getSocketAddresses(sessionID);
if (socketAddresses.length == 0) {
throw new ConfigError("Must specify at least one socket address");
}

String proxyUser = null;
String proxyPassword = null;
String proxyHost = null;
SocketAddress localAddress = getLocalAddress(settings, sessionID);

String proxyType = null;
String proxyVersion = null;
final NetworkingOptions networkingOptions = new NetworkingOptions(getSettings()
.getSessionProperties(sessionID, true));

String proxyWorkstation = null;
String proxyDomain = null;
boolean sslEnabled = false;
SSLConfig sslConfig = null;
if (getSettings().isSetting(sessionID, SSLSupport.SETTING_USE_SSL)
&& BooleanConverter.convert(getSettings().getString(sessionID, SSLSupport.SETTING_USE_SSL))) {
sslEnabled = true;
sslConfig = SSLSupport.getSslConfig(getSettings(), sessionID);
}

int proxyPort = -1;
String proxyUser = null;
String proxyPassword = null;
String proxyHost = null;

if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_TYPE)) {
proxyType = settings.getString(sessionID, Initiator.SETTING_PROXY_TYPE);
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_VERSION)) {
proxyVersion = settings.getString(sessionID,
Initiator.SETTING_PROXY_VERSION);
}
String proxyType = null;
String proxyVersion = null;

if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_USER)) {
proxyUser = settings.getString(sessionID, Initiator.SETTING_PROXY_USER);
proxyPassword = settings.getString(sessionID,
Initiator.SETTING_PROXY_PASSWORD);
}
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_WORKSTATION)
&& getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) {
proxyWorkstation = settings.getString(sessionID,
Initiator.SETTING_PROXY_WORKSTATION);
proxyDomain = settings.getString(sessionID, Initiator.SETTING_PROXY_DOMAIN);
}
String proxyWorkstation = null;
String proxyDomain = null;

proxyHost = settings.getString(sessionID, Initiator.SETTING_PROXY_HOST);
proxyPort = (int) settings.getLong(sessionID, Initiator.SETTING_PROXY_PORT);
}
int proxyPort = -1;

final IoSessionInitiator ioSessionInitiator = new IoSessionInitiator(session,
socketAddresses, localAddress, reconnectingIntervals,
getScheduledExecutorService(), networkingOptions,
getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig,
proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation);
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_TYPE)) {
proxyType = settings.getString(sessionID, Initiator.SETTING_PROXY_TYPE);
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_VERSION)) {
proxyVersion = settings.getString(sessionID,
Initiator.SETTING_PROXY_VERSION);
}

initiators.add(ioSessionInitiator);
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_USER)) {
proxyUser = settings.getString(sessionID, Initiator.SETTING_PROXY_USER);
proxyPassword = settings.getString(sessionID,
Initiator.SETTING_PROXY_PASSWORD);
}
} catch (final FieldConvertError e) {
throw new ConfigError(e);
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_WORKSTATION)
&& getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) {
proxyWorkstation = settings.getString(sessionID,
Initiator.SETTING_PROXY_WORKSTATION);
proxyDomain = settings.getString(sessionID, Initiator.SETTING_PROXY_DOMAIN);
}

proxyHost = settings.getString(sessionID, Initiator.SETTING_PROXY_HOST);
proxyPort = (int) settings.getLong(sessionID, Initiator.SETTING_PROXY_PORT);
}

final IoSessionInitiator ioSessionInitiator = new IoSessionInitiator(session,
socketAddresses, localAddress, reconnectingIntervals,
getScheduledExecutorService(), networkingOptions,
getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig,
proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation);

initiators.add(ioSessionInitiator);

}

// QFJ-482
Expand Down Expand Up @@ -181,8 +187,10 @@ private void createSessions() throws ConfigError, FieldConvertError {
final SessionID sessionID = i.next();
if (isInitiatorSession(sessionID)) {
try {
final Session quickfixSession = createSession(sessionID);
initiatorSessions.put(sessionID, quickfixSession);
if (!settings.isSetting(sessionID, SETTING_DYNAMIC_SESSION) || !settings.getBool(sessionID, SETTING_DYNAMIC_SESSION)) {
final Session quickfixSession = createSession(sessionID);
initiatorSessions.put(sessionID, quickfixSession);
}
} catch (final Throwable e) {
if (continueInitOnError) {
log.error("error during session initialization, continuing...", e);
Expand All @@ -193,11 +201,20 @@ private void createSessions() throws ConfigError, FieldConvertError {
}
}
}
if (initiatorSessions.isEmpty()) {
throw new ConfigError("no initiators in settings");
}
setSessions(initiatorSessions);
}

public void createDynamicSession(SessionID sessionID) throws ConfigError {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again for a new feature it would be nice to see tests. For a considerable feature like a dynamic session some of the end-to-end / regression tests seem appropriate.


try {
Session session = createSession(sessionID);
super.addDynamicSession(session);
createInitiator(session);
startInitiators();
} catch (final FieldConvertError e) {
throw new ConfigError(e);
}
}

private int[] getReconnectIntervalInSeconds(SessionID sessionID) throws ConfigError {
final SessionSettings settings = getSettings();
Expand Down
116 changes: 116 additions & 0 deletions quickfixj-core/src/test/java/quickfix/mina/SessionConnectorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import quickfix.ConfigError;
import quickfix.DefaultSessionFactory;
import quickfix.FixVersions;
import quickfix.Initiator;
import quickfix.MemoryStoreFactory;
import quickfix.RuntimeError;
import quickfix.SLF4JLogFactory;
Expand All @@ -31,7 +32,11 @@
import quickfix.SessionID;
import quickfix.SessionSettings;
import quickfix.SessionState;
import quickfix.SocketInitiator;
import quickfix.UnitTestApplication;
import quickfix.mina.initiator.AbstractSocketInitiator;
import quickfix.mina.initiator.IoSessionInitiator;
import quickfix.mina.ssl.SSLSupport;

import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
Expand All @@ -41,6 +46,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -187,6 +194,56 @@ public void testAddingRemovingDynamicSessions() throws Exception {
session2.close();
}

/**
* Test dynamic initiator sessions
*/
@Test
public void testDynamicInitiatorSession() throws Exception {
SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX40, "TW", "ISLD");
SessionID sessionID2 = new SessionID(FixVersions.BEGINSTRING_FIX40, "me", "you");
SessionSettings settings = setUpInitiatorSessionSettings(sessionID);
DefaultSessionFactory sessionFactory = new DefaultSessionFactory(new UnitTestApplication(),
new MemoryStoreFactory(), new SLF4JLogFactory(new SessionSettings()));

AbstractSocketInitiatorUnderTest connector = new AbstractSocketInitiatorUnderTest(settings, sessionFactory);
connector.setSessions(new HashMap<>());
//Two sessions to test dynamic sessions while check initializers
connector.createDynamicSession(sessionID);
connector.createDynamicSession(sessionID2);
List<Session> sessions = connector.getManagedSessions();
//Check sessions created and available
assertEquals(2,sessions.size());
HashMap<SessionID, Session> map = new HashMap<>();
for (Session s : sessions) {
map.put(s.getSessionID(), s);
}
assertNotNull(map.get(sessionID));
assertNotNull(map.get(sessionID2));
//Check initiators created and not null
assertEquals(2, connector.getInitiators().size());
Set<IoSessionInitiator> initiators = connector.getInitiators();
for(IoSessionInitiator initiator: initiators){
assertNotNull(initiator);
}
connector.removeDynamicSession(sessionID);
connector.removeDynamicSession(sessionID2);
//Check if initiators are re - created for this sessions but not sessions available
assertEquals(0, connector.getManagedSessions().size());
connector.createSessionInitiators();
sessions=connector.getManagedSessions();
initiators=connector.getInitiators();
//Sessions re created during session initiatore re creation, initiators are stacked
assertEquals(2, sessions.size());
assertEquals(4,initiators.size());
//This should remove initiators
connector.stopInitiators();
assertEquals(0,connector.getInitiators().size());
//Tear down
for(Session s:sessions){
s.close();
}
}

private SessionSettings setUpSessionSettings(SessionID sessionID) {
SessionSettings settings = new SessionSettings();
settings.setString(Session.SETTING_USE_DATA_DICTIONARY, "N");
Expand All @@ -197,6 +254,34 @@ private SessionSettings setUpSessionSettings(SessionID sessionID) {
SessionFactory.ACCEPTOR_CONNECTION_TYPE);
return settings;
}
private SessionSettings setUpInitiatorSessionSettings(SessionID sessionID) {
SessionSettings settings = new SessionSettings();
settings.setString(Session.SETTING_USE_DATA_DICTIONARY, "N");
settings.setString(Session.SETTING_START_TIME, "00:00:00");
settings.setString(Session.SETTING_END_TIME, "00:00:00");
settings.setString(Acceptor.SETTING_SOCKET_ACCEPT_PORT, "9999");
settings.setLong(Session.SETTING_HEARTBTINT,100L);
settings.setString(SocketInitiator.SETTING_SOCKET_CONNECT_HOST,"127.0.0.1");
settings.setString(SocketInitiator.SETTING_SOCKET_CONNECT_PORT,"54321");
settings.setString(SessionFactory.SETTING_CONNECTION_TYPE,
SessionFactory.INITIATOR_CONNECTION_TYPE);
settings.setBool( SSLSupport.SETTING_USE_SSL,true);
settings.setString(Initiator.SETTING_PROXY_TYPE,"socks");
settings.setString(Initiator.SETTING_PROXY_VERSION,"5");
settings.setString(Initiator.SETTING_PROXY_USER,"Test Proxy User");
settings.setString(Initiator.SETTING_PROXY_PASSWORD,"Test Proxy User Password");
settings.setString(Initiator.SETTING_PROXY_WORKSTATION,"Test Proxy Workstation");
settings.setString(Initiator.SETTING_PROXY_DOMAIN,"Test Proxy Domain");
settings.setString(Initiator.SETTING_PROXY_HOST,"Test Proxy Host");
settings.setString(Initiator.SETTING_PROXY_PORT,"888");



settings.setBool(Initiator.SETTING_DYNAMIC_SESSION,false);
settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE,
SessionFactory.INITIATOR_CONNECTION_TYPE);
return settings;
}

private final class SessionConnectorListener implements PropertyChangeListener {
public void propertyChange(PropertyChangeEvent event) {
Expand Down Expand Up @@ -224,4 +309,35 @@ public void stop(boolean force) {
public void block() throws ConfigError, RuntimeError {
}
}
private static class AbstractSocketInitiatorUnderTest extends AbstractSocketInitiator {

public AbstractSocketInitiatorUnderTest(SessionSettings settings, SessionFactory sessionFactory) throws ConfigError {
super(settings, sessionFactory);
}

public void start() throws ConfigError, RuntimeError {
}
public void createDynamicSession(SessionID sessionID) throws ConfigError {
super.createDynamicSession(sessionID);
}
public void stop() {
}
public void stopInitiators(){
super.stopInitiators();
}
public void stop(boolean force) {
}

public void block() throws ConfigError, RuntimeError {
}
@Override
protected void createSessionInitiators() throws ConfigError {
super.createSessionInitiators();
}

@Override
protected EventHandlingStrategy getEventHandlingStrategy() {
return null;
}
}
}