Skip to content

Commit 7d001a9

Browse files
authored
Merge pull request #377 from reddec/master
Dynamic sessions
2 parents 819c58d + 743209e commit 7d001a9

File tree

2 files changed

+84
-12
lines changed

2 files changed

+84
-12
lines changed

acceptor.go

Lines changed: 83 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,17 @@ import (
1515

1616
//Acceptor accepts connections from FIX clients and manages the associated sessions.
1717
type Acceptor struct {
18-
app Application
19-
settings *Settings
20-
logFactory LogFactory
21-
storeFactory MessageStoreFactory
22-
globalLog Log
23-
sessions map[SessionID]*session
24-
sessionGroup sync.WaitGroup
25-
listener net.Listener
26-
listenerShutdown sync.WaitGroup
18+
app Application
19+
settings *Settings
20+
logFactory LogFactory
21+
storeFactory MessageStoreFactory
22+
globalLog Log
23+
sessions map[SessionID]*session
24+
sessionGroup sync.WaitGroup
25+
listener net.Listener
26+
listenerShutdown sync.WaitGroup
27+
dynamicSessions bool
28+
dynamicSessionChan chan *session
2729
sessionFactory
2830
}
2931

@@ -66,7 +68,14 @@ func (a *Acceptor) Start() error {
6668
a.sessionGroup.Done()
6769
}()
6870
}
69-
71+
if a.dynamicSessions {
72+
a.dynamicSessionChan = make(chan *session)
73+
a.sessionGroup.Add(1)
74+
go func() {
75+
a.dynamicSessionsLoop()
76+
a.sessionGroup.Done()
77+
}()
78+
}
7079
a.listenerShutdown.Add(1)
7180
go a.listenForConnections()
7281
return nil
@@ -80,6 +89,9 @@ func (a *Acceptor) Stop() {
8089

8190
a.listener.Close()
8291
a.listenerShutdown.Wait()
92+
if a.dynamicSessions {
93+
close(a.dynamicSessionChan)
94+
}
8395
for _, session := range a.sessions {
8496
session.stop()
8597
}
@@ -95,6 +107,11 @@ func NewAcceptor(app Application, storeFactory MessageStoreFactory, settings *Se
95107
logFactory: logFactory,
96108
sessions: make(map[SessionID]*session),
97109
}
110+
if a.settings.GlobalSettings().HasSetting(config.DynamicSessions) {
111+
if a.dynamicSessions, err = settings.globalSettings.BoolSetting(config.DynamicSessions); err != nil {
112+
return
113+
}
114+
}
98115

99116
if a.globalLog, err = logFactory.Create(); err != nil {
100117
return
@@ -222,8 +239,18 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {
222239
}
223240
session, ok := a.sessions[sessID]
224241
if !ok {
225-
a.globalLog.OnEventf("Session %v not found for incoming message: %s", sessID, msgBytes)
226-
return
242+
if !a.dynamicSessions {
243+
a.globalLog.OnEventf("Session %v not found for incoming message: %s", sessID, msgBytes)
244+
return
245+
}
246+
dynamicSession, err := a.sessionFactory.createSession(sessID, a.storeFactory, a.settings.globalSettings.clone(), a.logFactory, a.app)
247+
if err != nil {
248+
a.globalLog.OnEventf("Dynamic session %v failed to create: %v", sessID, err)
249+
return
250+
}
251+
a.dynamicSessionChan <- dynamicSession
252+
session = dynamicSession
253+
defer session.stop()
227254
}
228255

229256
msgIn := make(chan fixIn)
@@ -241,3 +268,47 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {
241268

242269
writeLoop(netConn, msgOut, a.globalLog)
243270
}
271+
272+
func (a *Acceptor) dynamicSessionsLoop() {
273+
var id int
274+
var sessions = map[int]*session{}
275+
var complete = make(chan int)
276+
defer close(complete)
277+
LOOP:
278+
for {
279+
select {
280+
case session, ok := <-a.dynamicSessionChan:
281+
if !ok {
282+
for _, oldSession := range sessions {
283+
oldSession.stop()
284+
}
285+
break LOOP
286+
}
287+
id++
288+
sessionID := id
289+
sessions[sessionID] = session
290+
go func() {
291+
session.run()
292+
err := UnregisterSession(session.sessionID)
293+
if err != nil {
294+
a.globalLog.OnEventf("Unregister dynamic session %v failed: %v", session.sessionID, err)
295+
return
296+
}
297+
complete <- sessionID
298+
}()
299+
case id := <-complete:
300+
delete(sessions, id)
301+
}
302+
}
303+
304+
if len(sessions) == 0 {
305+
return
306+
}
307+
308+
for id := range complete {
309+
delete(sessions, id)
310+
if len(sessions) == 0 {
311+
return
312+
}
313+
}
314+
}

config/configuration.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,5 @@ const (
5858
MaxLatency string = "MaxLatency"
5959
PersistMessages string = "PersistMessages"
6060
RejectInvalidMessage string = "RejectInvalidMessage"
61+
DynamicSessions string = "DynamicSessions"
6162
)

0 commit comments

Comments
 (0)