Skip to content

Commit c9b51d9

Browse files
andyflurychrjohn
authored andcommitted
dynamic initiator session (QFJ-247) (#157)
* create dynamic initiator session * update documentation * Added dynamic initiator sessions Test
1 parent 0f97fbb commit c9b51d9

File tree

4 files changed

+207
-60
lines changed

4 files changed

+207
-60
lines changed

quickfixj-core/src/main/doc/usermanual/usage/configuration.html

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,13 @@ <H3>QuickFIX Settings</H3>
540540
<TD> valid IP address in the format of x.x.x.x or a domain name </TD>
541541
<TD> If unset the socket will be bound to all local interfaces.</TD>
542542
</TR>
543+
<TR ALIGN="left" VALIGN="middle">
544+
<TD> <I>DynamicSession</I> </TD>
545+
546+
<TD> Leave the corresponding session disconnected until AbstractSocketInitiator.createDynamicSession is called</TD>
547+
<TD> Y<br/>N </TD>
548+
<TD> N</TD>
549+
</TR>
543550

544551
<TR ALIGN="center" VALIGN="middle">
545552
<TD COLSPAN="4" class="subsection"><A NAME="Acceptor">Acceptor</A></TD>

quickfixj-core/src/main/java/quickfix/Initiator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,4 +116,11 @@ public interface Initiator extends Connector {
116116
* type is "initiator".
117117
*/
118118
String SETTING_PROXY_WORKSTATION = "ProxyWorkstation";
119+
120+
/**
121+
* Leave the corresponding session disconnected until
122+
* AbstractSocketInitiator.createDynamicSession is called
123+
*/
124+
String SETTING_DYNAMIC_SESSION = "DynamicSession";
125+
119126
}

quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java

Lines changed: 77 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -78,75 +78,81 @@ protected void createSessionInitiators()
7878
throws ConfigError {
7979
try {
8080
createSessions();
81-
SessionSettings settings = getSettings();
8281
for (final Session session : getSessionMap().values()) {
83-
final SessionID sessionID = session.getSessionID();
84-
final int[] reconnectingIntervals = getReconnectIntervalInSeconds(sessionID);
85-
86-
final SocketAddress[] socketAddresses = getSocketAddresses(sessionID);
87-
if (socketAddresses.length == 0) {
88-
throw new ConfigError("Must specify at least one socket address");
89-
}
90-
91-
SocketAddress localAddress = getLocalAddress(settings, sessionID);
82+
createInitiator(session);
83+
}
84+
} catch (final FieldConvertError e) {
85+
throw new ConfigError(e);
86+
}
87+
}
9288

93-
final NetworkingOptions networkingOptions = new NetworkingOptions(getSettings()
94-
.getSessionProperties(sessionID, true));
89+
private void createInitiator(final Session session) throws ConfigError, FieldConvertError {
90+
91+
SessionSettings settings = getSettings();
92+
final SessionID sessionID = session.getSessionID();
93+
final int[] reconnectingIntervals = getReconnectIntervalInSeconds(sessionID);
9594

96-
boolean sslEnabled = false;
97-
SSLConfig sslConfig = null;
98-
if (getSettings().isSetting(sessionID, SSLSupport.SETTING_USE_SSL)
99-
&& BooleanConverter.convert(getSettings().getString(sessionID, SSLSupport.SETTING_USE_SSL))) {
100-
sslEnabled = true;
101-
sslConfig = SSLSupport.getSslConfig(getSettings(), sessionID);
102-
}
95+
final SocketAddress[] socketAddresses = getSocketAddresses(sessionID);
96+
if (socketAddresses.length == 0) {
97+
throw new ConfigError("Must specify at least one socket address");
98+
}
10399

104-
String proxyUser = null;
105-
String proxyPassword = null;
106-
String proxyHost = null;
100+
SocketAddress localAddress = getLocalAddress(settings, sessionID);
107101

108-
String proxyType = null;
109-
String proxyVersion = null;
102+
final NetworkingOptions networkingOptions = new NetworkingOptions(getSettings()
103+
.getSessionProperties(sessionID, true));
110104

111-
String proxyWorkstation = null;
112-
String proxyDomain = null;
105+
boolean sslEnabled = false;
106+
SSLConfig sslConfig = null;
107+
if (getSettings().isSetting(sessionID, SSLSupport.SETTING_USE_SSL)
108+
&& BooleanConverter.convert(getSettings().getString(sessionID, SSLSupport.SETTING_USE_SSL))) {
109+
sslEnabled = true;
110+
sslConfig = SSLSupport.getSslConfig(getSettings(), sessionID);
111+
}
113112

114-
int proxyPort = -1;
113+
String proxyUser = null;
114+
String proxyPassword = null;
115+
String proxyHost = null;
115116

116-
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_TYPE)) {
117-
proxyType = settings.getString(sessionID, Initiator.SETTING_PROXY_TYPE);
118-
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_VERSION)) {
119-
proxyVersion = settings.getString(sessionID,
120-
Initiator.SETTING_PROXY_VERSION);
121-
}
117+
String proxyType = null;
118+
String proxyVersion = null;
122119

123-
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_USER)) {
124-
proxyUser = settings.getString(sessionID, Initiator.SETTING_PROXY_USER);
125-
proxyPassword = settings.getString(sessionID,
126-
Initiator.SETTING_PROXY_PASSWORD);
127-
}
128-
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_WORKSTATION)
129-
&& getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) {
130-
proxyWorkstation = settings.getString(sessionID,
131-
Initiator.SETTING_PROXY_WORKSTATION);
132-
proxyDomain = settings.getString(sessionID, Initiator.SETTING_PROXY_DOMAIN);
133-
}
120+
String proxyWorkstation = null;
121+
String proxyDomain = null;
134122

135-
proxyHost = settings.getString(sessionID, Initiator.SETTING_PROXY_HOST);
136-
proxyPort = (int) settings.getLong(sessionID, Initiator.SETTING_PROXY_PORT);
137-
}
123+
int proxyPort = -1;
138124

139-
final IoSessionInitiator ioSessionInitiator = new IoSessionInitiator(session,
140-
socketAddresses, localAddress, reconnectingIntervals,
141-
getScheduledExecutorService(), networkingOptions,
142-
getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig,
143-
proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation);
125+
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_TYPE)) {
126+
proxyType = settings.getString(sessionID, Initiator.SETTING_PROXY_TYPE);
127+
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_VERSION)) {
128+
proxyVersion = settings.getString(sessionID,
129+
Initiator.SETTING_PROXY_VERSION);
130+
}
144131

145-
initiators.add(ioSessionInitiator);
132+
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_USER)) {
133+
proxyUser = settings.getString(sessionID, Initiator.SETTING_PROXY_USER);
134+
proxyPassword = settings.getString(sessionID,
135+
Initiator.SETTING_PROXY_PASSWORD);
146136
}
147-
} catch (final FieldConvertError e) {
148-
throw new ConfigError(e);
137+
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_WORKSTATION)
138+
&& getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) {
139+
proxyWorkstation = settings.getString(sessionID,
140+
Initiator.SETTING_PROXY_WORKSTATION);
141+
proxyDomain = settings.getString(sessionID, Initiator.SETTING_PROXY_DOMAIN);
142+
}
143+
144+
proxyHost = settings.getString(sessionID, Initiator.SETTING_PROXY_HOST);
145+
proxyPort = (int) settings.getLong(sessionID, Initiator.SETTING_PROXY_PORT);
149146
}
147+
148+
final IoSessionInitiator ioSessionInitiator = new IoSessionInitiator(session,
149+
socketAddresses, localAddress, reconnectingIntervals,
150+
getScheduledExecutorService(), networkingOptions,
151+
getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig,
152+
proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation);
153+
154+
initiators.add(ioSessionInitiator);
155+
150156
}
151157

152158
// QFJ-482
@@ -181,8 +187,10 @@ private void createSessions() throws ConfigError, FieldConvertError {
181187
final SessionID sessionID = i.next();
182188
if (isInitiatorSession(sessionID)) {
183189
try {
184-
final Session quickfixSession = createSession(sessionID);
185-
initiatorSessions.put(sessionID, quickfixSession);
190+
if (!settings.isSetting(sessionID, SETTING_DYNAMIC_SESSION) || !settings.getBool(sessionID, SETTING_DYNAMIC_SESSION)) {
191+
final Session quickfixSession = createSession(sessionID);
192+
initiatorSessions.put(sessionID, quickfixSession);
193+
}
186194
} catch (final Throwable e) {
187195
if (continueInitOnError) {
188196
log.error("error during session initialization, continuing...", e);
@@ -193,11 +201,20 @@ private void createSessions() throws ConfigError, FieldConvertError {
193201
}
194202
}
195203
}
196-
if (initiatorSessions.isEmpty()) {
197-
throw new ConfigError("no initiators in settings");
198-
}
199204
setSessions(initiatorSessions);
200205
}
206+
207+
public void createDynamicSession(SessionID sessionID) throws ConfigError {
208+
209+
try {
210+
Session session = createSession(sessionID);
211+
super.addDynamicSession(session);
212+
createInitiator(session);
213+
startInitiators();
214+
} catch (final FieldConvertError e) {
215+
throw new ConfigError(e);
216+
}
217+
}
201218

202219
private int[] getReconnectIntervalInSeconds(SessionID sessionID) throws ConfigError {
203220
final SessionSettings settings = getSettings();

quickfixj-core/src/test/java/quickfix/mina/SessionConnectorTest.java

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import quickfix.ConfigError;
2424
import quickfix.DefaultSessionFactory;
2525
import quickfix.FixVersions;
26+
import quickfix.Initiator;
2627
import quickfix.MemoryStoreFactory;
2728
import quickfix.RuntimeError;
2829
import quickfix.SLF4JLogFactory;
@@ -31,7 +32,11 @@
3132
import quickfix.SessionID;
3233
import quickfix.SessionSettings;
3334
import quickfix.SessionState;
35+
import quickfix.SocketInitiator;
3436
import quickfix.UnitTestApplication;
37+
import quickfix.mina.initiator.AbstractSocketInitiator;
38+
import quickfix.mina.initiator.IoSessionInitiator;
39+
import quickfix.mina.ssl.SSLSupport;
3540

3641
import java.beans.PropertyChangeEvent;
3742
import java.beans.PropertyChangeListener;
@@ -41,6 +46,8 @@
4146
import java.util.HashMap;
4247
import java.util.List;
4348
import java.util.Map;
49+
import java.util.Set;
50+
4451
import static org.junit.Assert.assertEquals;
4552
import static org.junit.Assert.assertFalse;
4653
import static org.junit.Assert.assertNotNull;
@@ -187,6 +194,56 @@ public void testAddingRemovingDynamicSessions() throws Exception {
187194
session2.close();
188195
}
189196

197+
/**
198+
* Test dynamic initiator sessions
199+
*/
200+
@Test
201+
public void testDynamicInitiatorSession() throws Exception {
202+
SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX40, "TW", "ISLD");
203+
SessionID sessionID2 = new SessionID(FixVersions.BEGINSTRING_FIX40, "me", "you");
204+
SessionSettings settings = setUpInitiatorSessionSettings(sessionID);
205+
DefaultSessionFactory sessionFactory = new DefaultSessionFactory(new UnitTestApplication(),
206+
new MemoryStoreFactory(), new SLF4JLogFactory(new SessionSettings()));
207+
208+
AbstractSocketInitiatorUnderTest connector = new AbstractSocketInitiatorUnderTest(settings, sessionFactory);
209+
connector.setSessions(new HashMap<>());
210+
//Two sessions to test dynamic sessions while check initializers
211+
connector.createDynamicSession(sessionID);
212+
connector.createDynamicSession(sessionID2);
213+
List<Session> sessions = connector.getManagedSessions();
214+
//Check sessions created and available
215+
assertEquals(2,sessions.size());
216+
HashMap<SessionID, Session> map = new HashMap<>();
217+
for (Session s : sessions) {
218+
map.put(s.getSessionID(), s);
219+
}
220+
assertNotNull(map.get(sessionID));
221+
assertNotNull(map.get(sessionID2));
222+
//Check initiators created and not null
223+
assertEquals(2, connector.getInitiators().size());
224+
Set<IoSessionInitiator> initiators = connector.getInitiators();
225+
for(IoSessionInitiator initiator: initiators){
226+
assertNotNull(initiator);
227+
}
228+
connector.removeDynamicSession(sessionID);
229+
connector.removeDynamicSession(sessionID2);
230+
//Check if initiators are re - created for this sessions but not sessions available
231+
assertEquals(0, connector.getManagedSessions().size());
232+
connector.createSessionInitiators();
233+
sessions=connector.getManagedSessions();
234+
initiators=connector.getInitiators();
235+
//Sessions re created during session initiatore re creation, initiators are stacked
236+
assertEquals(2, sessions.size());
237+
assertEquals(4,initiators.size());
238+
//This should remove initiators
239+
connector.stopInitiators();
240+
assertEquals(0,connector.getInitiators().size());
241+
//Tear down
242+
for(Session s:sessions){
243+
s.close();
244+
}
245+
}
246+
190247
private SessionSettings setUpSessionSettings(SessionID sessionID) {
191248
SessionSettings settings = new SessionSettings();
192249
settings.setString(Session.SETTING_USE_DATA_DICTIONARY, "N");
@@ -197,6 +254,34 @@ private SessionSettings setUpSessionSettings(SessionID sessionID) {
197254
SessionFactory.ACCEPTOR_CONNECTION_TYPE);
198255
return settings;
199256
}
257+
private SessionSettings setUpInitiatorSessionSettings(SessionID sessionID) {
258+
SessionSettings settings = new SessionSettings();
259+
settings.setString(Session.SETTING_USE_DATA_DICTIONARY, "N");
260+
settings.setString(Session.SETTING_START_TIME, "00:00:00");
261+
settings.setString(Session.SETTING_END_TIME, "00:00:00");
262+
settings.setString(Acceptor.SETTING_SOCKET_ACCEPT_PORT, "9999");
263+
settings.setLong(Session.SETTING_HEARTBTINT,100L);
264+
settings.setString(SocketInitiator.SETTING_SOCKET_CONNECT_HOST,"127.0.0.1");
265+
settings.setString(SocketInitiator.SETTING_SOCKET_CONNECT_PORT,"54321");
266+
settings.setString(SessionFactory.SETTING_CONNECTION_TYPE,
267+
SessionFactory.INITIATOR_CONNECTION_TYPE);
268+
settings.setBool( SSLSupport.SETTING_USE_SSL,true);
269+
settings.setString(Initiator.SETTING_PROXY_TYPE,"socks");
270+
settings.setString(Initiator.SETTING_PROXY_VERSION,"5");
271+
settings.setString(Initiator.SETTING_PROXY_USER,"Test Proxy User");
272+
settings.setString(Initiator.SETTING_PROXY_PASSWORD,"Test Proxy User Password");
273+
settings.setString(Initiator.SETTING_PROXY_WORKSTATION,"Test Proxy Workstation");
274+
settings.setString(Initiator.SETTING_PROXY_DOMAIN,"Test Proxy Domain");
275+
settings.setString(Initiator.SETTING_PROXY_HOST,"Test Proxy Host");
276+
settings.setString(Initiator.SETTING_PROXY_PORT,"888");
277+
278+
279+
280+
settings.setBool(Initiator.SETTING_DYNAMIC_SESSION,false);
281+
settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE,
282+
SessionFactory.INITIATOR_CONNECTION_TYPE);
283+
return settings;
284+
}
200285

201286
private final class SessionConnectorListener implements PropertyChangeListener {
202287
public void propertyChange(PropertyChangeEvent event) {
@@ -224,4 +309,35 @@ public void stop(boolean force) {
224309
public void block() throws ConfigError, RuntimeError {
225310
}
226311
}
312+
private static class AbstractSocketInitiatorUnderTest extends AbstractSocketInitiator {
313+
314+
public AbstractSocketInitiatorUnderTest(SessionSettings settings, SessionFactory sessionFactory) throws ConfigError {
315+
super(settings, sessionFactory);
316+
}
317+
318+
public void start() throws ConfigError, RuntimeError {
319+
}
320+
public void createDynamicSession(SessionID sessionID) throws ConfigError {
321+
super.createDynamicSession(sessionID);
322+
}
323+
public void stop() {
324+
}
325+
public void stopInitiators(){
326+
super.stopInitiators();
327+
}
328+
public void stop(boolean force) {
329+
}
330+
331+
public void block() throws ConfigError, RuntimeError {
332+
}
333+
@Override
334+
protected void createSessionInitiators() throws ConfigError {
335+
super.createSessionInitiators();
336+
}
337+
338+
@Override
339+
protected EventHandlingStrategy getEventHandlingStrategy() {
340+
return null;
341+
}
342+
}
227343
}

0 commit comments

Comments
 (0)