From 6eaa8ce004539ced3f1919ad7c22620d566814db Mon Sep 17 00:00:00 2001 From: Alexandros Kyriakakis Date: Thu, 1 Feb 2024 14:58:37 +0200 Subject: [PATCH] Bug: For Logon/Logout/Resend/SequenceReset messages we should wait until sent (#21) ## Issue When log on request was queued because cannot be send due to connection not ready, Then it goes through the [main session loop](https://github.com/alpacahq/quickfix/blob/5dcde41f3e6e68e93e1fd381761cd9c7843aec43/session.go#L821-L841) which leads to [SendAppMessages](https://github.com/alpacahq/quickfix/blob/5dcde41f3e6e68e93e1fd381761cd9c7843aec43/session_state.go#L107-L111) But, since session is not logged on yet because the message queued is the log on message, we drop the `toSend` queued messages so we lose the log on message itself. ## Proposed solution For important messages like Logon/Logout/Resend/SequenceReset we block until sent. --- logon_state_test.go | 4 ++-- session.go | 19 +++++++++++++------ session_state.go | 2 +- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/logon_state_test.go b/logon_state_test.go index 96afc2e90..ee47a2be6 100644 --- a/logon_state_test.go +++ b/logon_state_test.go @@ -333,7 +333,7 @@ func (s *LogonStateTestSuite) TestFixMsgInLogonSeqNumTooHigh() { s.Require().Nil(err) s.MessageType(string(msgTypeLogon), sentMessage) - s.session.sendQueued() + s.session.sendQueued(true) s.MessageType(string(msgTypeResendRequest), s.MockApp.lastToAdmin) s.FieldEquals(tagBeginSeqNo, 1, s.MockApp.lastToAdmin.Body) @@ -373,7 +373,7 @@ func (s *LogonStateTestSuite) TestFixMsgInLogonSeqNumTooLow() { s.Require().Nil(err) s.MessageType(string(msgTypeLogout), sentMessage) - s.session.sendQueued() + s.session.sendQueued(true) s.MessageType(string(msgTypeLogout), s.MockApp.lastToAdmin) s.FieldEquals(tagText, "MsgSeqNum too low, expecting 2 but received 1", s.MockApp.lastToAdmin.Body) } diff --git a/session.go b/session.go index db5198e04..1bf60b122 100644 --- a/session.go +++ b/session.go @@ -265,7 +265,7 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error { } s.toSend = append(s.toSend, msgBytes) - s.sendQueued() + s.sendQueued(true) return nil } @@ -294,7 +294,7 @@ func (s *session) dropAndSendInReplyTo(msg *Message, inReplyTo *Message) error { s.dropQueued() s.toSend = append(s.toSend, msgBytes) - s.sendQueued() + s.sendQueued(true) return nil } @@ -350,9 +350,9 @@ func (s *session) persist(seqNum int, msgBytes []byte) error { return s.store.IncrNextSenderMsgSeqNum() } -func (s *session) sendQueued() { +func (s *session) sendQueued(blockUntilSent bool) { for i, msgBytes := range s.toSend { - if !s.sendBytes(msgBytes) { + if !s.sendBytes(msgBytes, blockUntilSent) { s.toSend = s.toSend[i:] s.notifyMessageOut() return @@ -371,15 +371,22 @@ func (s *session) EnqueueBytesAndSend(msg []byte) { defer s.sendMutex.Unlock() s.toSend = append(s.toSend, msg) - s.sendQueued() + s.sendQueued(true) } -func (s *session) sendBytes(msg []byte) bool { +func (s *session) sendBytes(msg []byte, blockUntilSent bool) bool { if s.messageOut == nil { s.log.OnEventf("Failed to send: disconnected") return false } + if blockUntilSent { + s.messageOut <- msg + s.log.OnOutgoing(msg) + s.stateTimer.Reset(s.HeartBtInt) + return true + } + select { case s.messageOut <- msg: s.log.OnOutgoing(msg) diff --git a/session_state.go b/session_state.go index 230ac8613..527556209 100644 --- a/session_state.go +++ b/session_state.go @@ -105,7 +105,7 @@ func (sm *stateMachine) SendAppMessages(session *session) { defer session.sendMutex.Unlock() if session.IsLoggedOn() { - session.sendQueued() + session.sendQueued(false) } else { session.dropQueued() }