Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/hotfix/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod test_request;
pub mod verification;

pub use parser::RawFixMessage;
pub use resend_request::ResendRequest;

pub trait FixMessage: Clone + Send + 'static {
fn write(&self, msg: &mut Message);
Expand Down
4 changes: 2 additions & 2 deletions crates/hotfix/src/message/resend_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use hotfix_message::message::Message;
use hotfix_message::{Part, fix44};

#[derive(Clone, Copy)]
pub(crate) struct ResendRequest {
pub struct ResendRequest {
begin_seq_no: u64,
end_seq_no: u64,
}

impl ResendRequest {
pub(crate) fn new(begin: u64, end: u64) -> Self {
pub fn new(begin: u64, end: u64) -> Self {
Self {
begin_seq_no: begin,
end_seq_no: end,
Expand Down
88 changes: 61 additions & 27 deletions crates/hotfix/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ mod info;
mod session_ref;
mod state;

use crate::config::SessionConfig;
use crate::message::FixMessage;
use crate::message::generate_message;
use crate::message::heartbeat::Heartbeat;
use crate::message::logon::{Logon, ResetSeqNumConfig};
use crate::message::parser::RawFixMessage;
use crate::store::MessageStore;
use crate::transport::writer::WriterRef;
use anyhow::{Result, anyhow};
use chrono::Utc;
use hotfix_message::dict::Dictionary;
Expand All @@ -12,16 +20,7 @@ use std::pin::Pin;
use tokio::select;
use tokio::sync::mpsc;
use tokio::time::{Duration, Instant, Sleep, sleep, sleep_until};
use tracing::{debug, error, info, warn};

use crate::config::SessionConfig;
use crate::message::FixMessage;
use crate::message::generate_message;
use crate::message::heartbeat::Heartbeat;
use crate::message::logon::{Logon, ResetSeqNumConfig};
use crate::message::parser::RawFixMessage;
use crate::store::MessageStore;
use crate::transport::writer::WriterRef;
use tracing::{debug, enabled, error, info, warn};

use crate::error::{CompIdType, MessageVerificationError};
use crate::message::logout::Logout;
Expand Down Expand Up @@ -362,25 +361,46 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
}

async fn on_resend_request(&mut self, message: &Message) -> Result<()> {
// TODO: verify message and send reject as necessary
if !self.state.is_connected() {
warn!("received resend request while disconnected, ignoring");
}

let begin_seq_number: usize = message.get(fix44::BEGIN_SEQ_NO).unwrap_or_else(|_| {
// TODO: send reject if there is no valid begin number
todo!()
});
let begin_seq_number: u64 = match message.get(fix44::BEGIN_SEQ_NO) {
Ok(seq_number) => seq_number,
Err(_) => {
let reject = Reject::new(
message
.header()
.get(fix44::MSG_SEQ_NUM)
.map_err(|_| anyhow!("failed to get seq number"))?,
)
.session_reject_reason(SessionRejectReason::RequiredTagMissing)
.text("missing begin sequence number for resend request");
self.send_message(reject).await;
return Ok(());
}
};

let end_seq_number: usize = match message.get(fix44::END_SEQ_NO) {
let end_seq_number: u64 = match message.get(fix44::END_SEQ_NO) {
Ok(seq_number) => {
let last_seq_number = self.store.next_sender_seq_number() as usize - 1;
let last_seq_number = self.store.next_sender_seq_number() - 1;
if seq_number == 0 {
last_seq_number
} else {
std::cmp::min(seq_number, last_seq_number)
}
}
Err(_) => {
// send reject if there is no valid end number
todo!()
let reject = Reject::new(
message
.header()
.get(fix44::MSG_SEQ_NUM)
.map_err(|_| anyhow!("failed to get seq number"))?,
)
.session_reject_reason(SessionRejectReason::RequiredTagMissing)
.text("missing end sequence number for resend request");
self.send_message(reject).await;
return Ok(());
}
};

Expand Down Expand Up @@ -583,19 +603,21 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
};
}

async fn resend_messages(&mut self, begin: usize, end: usize, _message: &Message) {
debug!(begin, end, "resending messages as requested");
let messages = self.store.get_slice(begin, end).await.unwrap();
async fn resend_messages(&mut self, begin: u64, end: u64, _message: &Message) {
info!(begin, end, "resending messages as requested");
let messages = self
.store
.get_slice(begin as usize, end as usize)
.await
.unwrap();

let no = messages.len();
debug!(no, "number of messages");
debug!(number_of_messages = no, "number of messages");

let mut reset_start: Option<u64> = None;
let mut sequence_number = 0;

for msg in messages {
let m = String::from_utf8(msg.clone()).unwrap();
debug!(m, "resending message");
let mut message = self
.message_builder
.build(msg.as_slice())
Expand All @@ -609,7 +631,6 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
.to_string();

if is_admin(message_type.as_str()) {
debug!("skipping message as it's an admin message");
if reset_start.is_none() {
reset_start = Some(sequence_number);
}
Expand All @@ -618,6 +639,7 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {

if let Some(begin) = reset_start {
let end = sequence_number;
Self::log_skipped_admin_messages(begin, end);
self.send_sequence_reset(begin, end).await;
reset_start = None;
}
Expand All @@ -633,16 +655,28 @@ impl<A: Application<M>, M: FixMessage, S: MessageStore> Session<A, M, S> {
message.encode(&self.message_config).unwrap(),
)
.await;
debug!(sequence_number, "resent message");

if enabled!(tracing::Level::DEBUG) {
let m = String::from_utf8(msg.clone()).unwrap();
debug!(sequence_number, message = m, "resent message");
}
}

if let Some(begin) = reset_start {
// the final reset if needed
let end = sequence_number;
Self::log_skipped_admin_messages(begin, end);
self.send_sequence_reset(begin, end).await;
}
}

fn log_skipped_admin_messages(begin: u64, end: u64) {
info!(
begin,
end, "skipped admin message(s) during resend, requesting reset for these"
);
}

fn reset_heartbeat_timer(&mut self) {
self.state
.reset_heartbeat_timer(self.config.heartbeat_interval);
Expand Down
22 changes: 22 additions & 0 deletions crates/hotfix/tests/common/test_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,25 @@ pub fn replace_field_value(raw_message: &mut Vec<u8>, tag: u32, new_value: &[u8]
}
}
}

/// Builds a resend request message without the required BeginSeqNo field.
pub fn build_invalid_resend_request(
msg_seq_num: u64,
begin_seq_no: Option<u64>,
end_seq_no: Option<u64>,
) -> Vec<u8> {
let mut msg = Message::new("FIX.4.4", "2"); // MsgType 2 = ResendRequest
msg.set(fix44::SENDER_COMP_ID, COUNTERPARTY_COMP_ID);
msg.set(fix44::TARGET_COMP_ID, OUR_COMP_ID);
msg.set(fix44::MSG_SEQ_NUM, msg_seq_num);
msg.set(fix44::SENDING_TIME, Timestamp::utc_now());

if let Some(begin_seq_no) = begin_seq_no {
msg.set(fix44::BEGIN_SEQ_NO, begin_seq_no);
}
if let Some(end_seq_no) = end_seq_no {
msg.set(fix44::END_SEQ_NO, end_seq_no);
}

msg.encode(&Config::default()).unwrap()
}
89 changes: 84 additions & 5 deletions crates/hotfix/tests/session_test_cases/resend_tests.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::common::actions::when;
use crate::common::assertions::{assert_msg_type, then};
use crate::common::setup::given_an_active_session;
use crate::common::setup::{HEARTBEAT_INTERVAL, given_an_active_session};
use crate::common::test_messages::{
TestMessage, build_execution_report_with_incorrect_body_length,
TestMessage, build_execution_report_with_incorrect_body_length, build_invalid_resend_request,
};
use hotfix::message::FixMessage;
use hotfix::message::{FixMessage, ResendRequest};
use hotfix::session::Status;
use hotfix_message::FieldType;
use hotfix_message::fix44::MsgType;
use hotfix_message::fix44::{GAP_FILL_FLAG, MsgType, NEW_SEQ_NO};
use hotfix_message::{FieldType, Part};
use std::time::Duration;

#[tokio::test]
async fn test_message_sequence_number_too_high() {
Expand Down Expand Up @@ -142,3 +143,81 @@ async fn test_resent_message_previously_received_is_ignored() {
when(&session).requests_disconnect().await;
then(&mut counterparty).gets_disconnected().await;
}

/// Tests that when a counterparty sends a resend request without the required field,
/// the session rejects the invalid message.
#[tokio::test]
async fn test_invalid_resend_request_gets_rejected() {
// We run the test twice - once with an invalid BeginSeqNo and once with an invalid EndSeqNo.
for (begin_seq_no, end_seq_no) in [(None, Some(2)), (Some(1), None)] {
let (session, mut counterparty) = given_an_active_session().await;

// build a resend request message missing the required BeginSeqNo (tag 7)
let seq_num = counterparty.next_target_sequence_number();
let invalid_resend_request =
build_invalid_resend_request(seq_num, begin_seq_no, end_seq_no);
when(&mut counterparty)
.sends_raw_message(invalid_resend_request)
.await;

// the session should reject this invalid resend request
then(&mut counterparty)
.receives(|msg| assert_msg_type(msg, MsgType::Reject))
.await;

when(&session).requests_disconnect().await;
then(&mut counterparty).gets_disconnected().await;
}
}

/// Tests that when a counterparty requests a resend of both admin and business messages,
/// the session gap fills admin messages and resends business messages as expected.
#[tokio::test(start_paused = true)]
async fn test_resend_request_with_gap_fill_for_admin_messages() {
let (session, mut counterparty) = given_an_active_session().await;

// wait for a heartbeat to be sent automatically (this will be message sequence number 2)
when(Duration::from_secs(HEARTBEAT_INTERVAL + 1))
.elapses()
.await;
then(&mut counterparty)
.receives(|msg| assert_msg_type(msg, MsgType::Heartbeat))
.await;

// send an execution report from the session (this will be message sequence number 3)
when(&session)
.sends_message(TestMessage::dummy_execution_report())
.await;
then(&mut counterparty)
.receives(|msg| assert_msg_type(msg, MsgType::ExecutionReport))
.await;

// counterparty requests a resend of messages 2 and 3
let resend_request = ResendRequest::new(2, 3);
when(&mut counterparty).sends_message(resend_request).await;

// the session should send a SequenceReset-GapFill for the heartbeat (message 2)
then(&mut counterparty)
.receives(|msg| {
assert_msg_type(msg, MsgType::SequenceReset);
assert_eq!(msg.get::<&str>(GAP_FILL_FLAG).unwrap(), "Y");
// the gap fill's MsgSeqNum indicates the beginning of the gap
assert_eq!(
msg.header()
.get::<u64>(hotfix_message::fix44::MSG_SEQ_NUM)
.unwrap(),
2
);
// NewSeqNo indicates the next sequence number after the gap
assert_eq!(msg.get::<u64>(NEW_SEQ_NO).unwrap(), 3);
})
.await;

// the session should resend the execution report (message 3)
then(&mut counterparty)
.receives(|msg| assert_msg_type(msg, MsgType::ExecutionReport))
.await;

when(&session).requests_disconnect().await;
then(&mut counterparty).gets_disconnected().await;
}