-
Notifications
You must be signed in to change notification settings - Fork 648
dynamic initiator session (QFJ-247) #157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
48dd318
4a3ad00
cac34e2
e6c9228
e2f728f
4379e7a
aa88bc2
723e729
316dd24
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,6 +59,9 @@ public abstract class AbstractSocketInitiator extends SessionConnector implement | |
|
||
protected final Logger log = LoggerFactory.getLogger(getClass()); | ||
private final Set<IoSessionInitiator> initiators = new HashSet<>(); | ||
|
||
private static final String SETTING_INACTIVE_SESSION = "Inactive"; | ||
|
||
|
||
protected AbstractSocketInitiator(Application application, | ||
MessageStoreFactory messageStoreFactory, SessionSettings settings, | ||
|
@@ -81,75 +84,81 @@ protected void createSessionInitiators() | |
// more and more initiators which are not equal because the local port differs | ||
initiators.clear(); | ||
createSessions(); | ||
SessionSettings settings = getSettings(); | ||
for (final Session session : getSessionMap().values()) { | ||
final SessionID sessionID = session.getSessionID(); | ||
final int[] reconnectingIntervals = getReconnectIntervalInSeconds(sessionID); | ||
|
||
final SocketAddress[] socketAddresses = getSocketAddresses(sessionID); | ||
if (socketAddresses.length == 0) { | ||
throw new ConfigError("Must specify at least one socket address"); | ||
} | ||
|
||
SocketAddress localAddress = getLocalAddress(settings, sessionID); | ||
createInitiator(session); | ||
} | ||
} catch (final FieldConvertError e) { | ||
throw new ConfigError(e); | ||
} | ||
} | ||
|
||
final NetworkingOptions networkingOptions = new NetworkingOptions(getSettings() | ||
.getSessionProperties(sessionID, true)); | ||
private void createInitiator(final Session session) throws ConfigError, FieldConvertError { | ||
|
||
SessionSettings settings = getSettings(); | ||
final SessionID sessionID = session.getSessionID(); | ||
final int[] reconnectingIntervals = getReconnectIntervalInSeconds(sessionID); | ||
|
||
boolean sslEnabled = false; | ||
SSLConfig sslConfig = null; | ||
if (getSettings().isSetting(sessionID, SSLSupport.SETTING_USE_SSL) | ||
&& BooleanConverter.convert(getSettings().getString(sessionID, SSLSupport.SETTING_USE_SSL))) { | ||
sslEnabled = true; | ||
sslConfig = SSLSupport.getSslConfig(getSettings(), sessionID); | ||
} | ||
final SocketAddress[] socketAddresses = getSocketAddresses(sessionID); | ||
if (socketAddresses.length == 0) { | ||
throw new ConfigError("Must specify at least one socket address"); | ||
} | ||
|
||
String proxyUser = null; | ||
String proxyPassword = null; | ||
String proxyHost = null; | ||
SocketAddress localAddress = getLocalAddress(settings, sessionID); | ||
|
||
String proxyType = null; | ||
String proxyVersion = null; | ||
final NetworkingOptions networkingOptions = new NetworkingOptions(getSettings() | ||
.getSessionProperties(sessionID, true)); | ||
|
||
String proxyWorkstation = null; | ||
String proxyDomain = null; | ||
boolean sslEnabled = false; | ||
SSLConfig sslConfig = null; | ||
if (getSettings().isSetting(sessionID, SSLSupport.SETTING_USE_SSL) | ||
&& BooleanConverter.convert(getSettings().getString(sessionID, SSLSupport.SETTING_USE_SSL))) { | ||
sslEnabled = true; | ||
sslConfig = SSLSupport.getSslConfig(getSettings(), sessionID); | ||
} | ||
|
||
int proxyPort = -1; | ||
String proxyUser = null; | ||
String proxyPassword = null; | ||
String proxyHost = null; | ||
|
||
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_TYPE)) { | ||
proxyType = settings.getString(sessionID, Initiator.SETTING_PROXY_TYPE); | ||
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_VERSION)) { | ||
proxyVersion = settings.getString(sessionID, | ||
Initiator.SETTING_PROXY_VERSION); | ||
} | ||
String proxyType = null; | ||
String proxyVersion = null; | ||
|
||
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_USER)) { | ||
proxyUser = settings.getString(sessionID, Initiator.SETTING_PROXY_USER); | ||
proxyPassword = settings.getString(sessionID, | ||
Initiator.SETTING_PROXY_PASSWORD); | ||
} | ||
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_WORKSTATION) | ||
&& getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) { | ||
proxyWorkstation = settings.getString(sessionID, | ||
Initiator.SETTING_PROXY_WORKSTATION); | ||
proxyDomain = settings.getString(sessionID, Initiator.SETTING_PROXY_DOMAIN); | ||
} | ||
String proxyWorkstation = null; | ||
String proxyDomain = null; | ||
|
||
proxyHost = settings.getString(sessionID, Initiator.SETTING_PROXY_HOST); | ||
proxyPort = (int) settings.getLong(sessionID, Initiator.SETTING_PROXY_PORT); | ||
} | ||
int proxyPort = -1; | ||
|
||
final IoSessionInitiator ioSessionInitiator = new IoSessionInitiator(session, | ||
socketAddresses, localAddress, reconnectingIntervals, | ||
getScheduledExecutorService(), networkingOptions, | ||
getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig, | ||
proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation); | ||
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_TYPE)) { | ||
proxyType = settings.getString(sessionID, Initiator.SETTING_PROXY_TYPE); | ||
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_VERSION)) { | ||
proxyVersion = settings.getString(sessionID, | ||
Initiator.SETTING_PROXY_VERSION); | ||
} | ||
|
||
initiators.add(ioSessionInitiator); | ||
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_USER)) { | ||
proxyUser = settings.getString(sessionID, Initiator.SETTING_PROXY_USER); | ||
proxyPassword = settings.getString(sessionID, | ||
Initiator.SETTING_PROXY_PASSWORD); | ||
} | ||
} catch (final FieldConvertError e) { | ||
throw new ConfigError(e); | ||
if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_WORKSTATION) | ||
&& getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) { | ||
proxyWorkstation = settings.getString(sessionID, | ||
Initiator.SETTING_PROXY_WORKSTATION); | ||
proxyDomain = settings.getString(sessionID, Initiator.SETTING_PROXY_DOMAIN); | ||
} | ||
|
||
proxyHost = settings.getString(sessionID, Initiator.SETTING_PROXY_HOST); | ||
proxyPort = (int) settings.getLong(sessionID, Initiator.SETTING_PROXY_PORT); | ||
} | ||
|
||
final IoSessionInitiator ioSessionInitiator = new IoSessionInitiator(session, | ||
socketAddresses, localAddress, reconnectingIntervals, | ||
getScheduledExecutorService(), networkingOptions, | ||
getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, sslConfig, | ||
proxyType, proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, proxyWorkstation); | ||
|
||
initiators.add(ioSessionInitiator); | ||
|
||
} | ||
|
||
// QFJ-482 | ||
|
@@ -184,8 +193,10 @@ private void createSessions() throws ConfigError, FieldConvertError { | |
final SessionID sessionID = i.next(); | ||
if (isInitiatorSession(sessionID)) { | ||
try { | ||
final Session quickfixSession = createSession(sessionID); | ||
initiatorSessions.put(sessionID, quickfixSession); | ||
if (!settings.isSetting(sessionID, SETTING_INACTIVE_SESSION) || !settings.getBool(sessionID, SETTING_INACTIVE_SESSION)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some tests on this would be nice. |
||
final Session quickfixSession = createSession(sessionID); | ||
initiatorSessions.put(sessionID, quickfixSession); | ||
} | ||
} catch (final Throwable e) { | ||
if (continueInitOnError) { | ||
log.error("error during session initialization, continuing...", e); | ||
|
@@ -196,11 +207,20 @@ private void createSessions() throws ConfigError, FieldConvertError { | |
} | ||
} | ||
} | ||
if (initiatorSessions.isEmpty()) { | ||
throw new ConfigError("no initiators in settings"); | ||
} | ||
setSessions(initiatorSessions); | ||
} | ||
|
||
public void createDynamicSession(SessionID sessionID) throws ConfigError { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again for a new feature it would be nice to see tests. For a considerable feature like a dynamic session some of the end-to-end / regression tests seem appropriate. |
||
|
||
try { | ||
Session session = createSession(sessionID); | ||
super.addDynamicSession(session); | ||
createInitiator(session); | ||
startInitiators(); | ||
} catch (final FieldConvertError e) { | ||
throw new ConfigError(e); | ||
} | ||
} | ||
|
||
private int[] getReconnectIntervalInSeconds(SessionID sessionID) throws ConfigError { | ||
final SessionSettings settings = getSettings(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Belongs in Initiator class.