Skip to content

Commit

Permalink
Bug: For Logon/Logout/Resend/SequenceReset messages we should wait un…
Browse files Browse the repository at this point in the history
…til sent (#21)

## Issue 
When log on request was queued because cannot be send due to connection
not ready,
Then it goes through the [main session
loop](https://github.com/alpacahq/quickfix/blob/5dcde41f3e6e68e93e1fd381761cd9c7843aec43/session.go#L821-L841)
which leads to
[SendAppMessages](https://github.com/alpacahq/quickfix/blob/5dcde41f3e6e68e93e1fd381761cd9c7843aec43/session_state.go#L107-L111)

But, since session is not logged on yet because the message queued is
the log on message, we drop the `toSend` queued messages so we lose the
log on message itself.


## Proposed solution
For important messages like Logon/Logout/Resend/SequenceReset we block
until sent.
  • Loading branch information
AlexandrosKyriakakis authored Feb 1, 2024
1 parent 3594015 commit 6eaa8ce
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 9 deletions.
4 changes: 2 additions & 2 deletions logon_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (s *LogonStateTestSuite) TestFixMsgInLogonSeqNumTooHigh() {
s.Require().Nil(err)
s.MessageType(string(msgTypeLogon), sentMessage)

s.session.sendQueued()
s.session.sendQueued(true)
s.MessageType(string(msgTypeResendRequest), s.MockApp.lastToAdmin)
s.FieldEquals(tagBeginSeqNo, 1, s.MockApp.lastToAdmin.Body)

Expand Down Expand Up @@ -373,7 +373,7 @@ func (s *LogonStateTestSuite) TestFixMsgInLogonSeqNumTooLow() {
s.Require().Nil(err)
s.MessageType(string(msgTypeLogout), sentMessage)

s.session.sendQueued()
s.session.sendQueued(true)
s.MessageType(string(msgTypeLogout), s.MockApp.lastToAdmin)
s.FieldEquals(tagText, "MsgSeqNum too low, expecting 2 but received 1", s.MockApp.lastToAdmin.Body)
}
19 changes: 13 additions & 6 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error {
}

s.toSend = append(s.toSend, msgBytes)
s.sendQueued()
s.sendQueued(true)

return nil
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func (s *session) dropAndSendInReplyTo(msg *Message, inReplyTo *Message) error {

s.dropQueued()
s.toSend = append(s.toSend, msgBytes)
s.sendQueued()
s.sendQueued(true)

return nil
}
Expand Down Expand Up @@ -350,9 +350,9 @@ func (s *session) persist(seqNum int, msgBytes []byte) error {
return s.store.IncrNextSenderMsgSeqNum()
}

func (s *session) sendQueued() {
func (s *session) sendQueued(blockUntilSent bool) {
for i, msgBytes := range s.toSend {
if !s.sendBytes(msgBytes) {
if !s.sendBytes(msgBytes, blockUntilSent) {
s.toSend = s.toSend[i:]
s.notifyMessageOut()
return
Expand All @@ -371,15 +371,22 @@ func (s *session) EnqueueBytesAndSend(msg []byte) {
defer s.sendMutex.Unlock()

s.toSend = append(s.toSend, msg)
s.sendQueued()
s.sendQueued(true)
}

func (s *session) sendBytes(msg []byte) bool {
func (s *session) sendBytes(msg []byte, blockUntilSent bool) bool {
if s.messageOut == nil {
s.log.OnEventf("Failed to send: disconnected")
return false
}

if blockUntilSent {
s.messageOut <- msg
s.log.OnOutgoing(msg)
s.stateTimer.Reset(s.HeartBtInt)
return true
}

select {
case s.messageOut <- msg:
s.log.OnOutgoing(msg)
Expand Down
2 changes: 1 addition & 1 deletion session_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (sm *stateMachine) SendAppMessages(session *session) {
defer session.sendMutex.Unlock()

if session.IsLoggedOn() {
session.sendQueued()
session.sendQueued(false)
} else {
session.dropQueued()
}
Expand Down

0 comments on commit 6eaa8ce

Please sign in to comment.