Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions crates/hotfix-store-mongodb/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[package]
name = "hotfix-store-mongodb"
description = "MongoDB message store implementation for the hotfix FIX engine"
version = "0.1.0"
authors.workspace = true
edition.workspace = true
license.workspace = true
readme = "README.md"
homepage.workspace = true
repository.workspace = true
keywords.workspace = true
categories.workspace = true

[lints]
workspace = true

[dependencies]
hotfix-store = { version = "0.1.0", path = "../hotfix-store" }
async-trait = { workspace = true }
chrono = { workspace = true }
futures = { workspace = true }
mongodb = { workspace = true }
serde = { workspace = true, features = ["derive"] }

[dev-dependencies]
hotfix-store = { version = "0.1.0", path = "../hotfix-store", features = ["test-utils"] }
testcontainers = { workspace = true }
tokio = { workspace = true, features = ["test-util", "rt-multi-thread", "macros"] }
uuid = { workspace = true, features = ["v4"] }
38 changes: 38 additions & 0 deletions crates/hotfix-store-mongodb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# hotfix-store-mongodb

MongoDB message store implementation for the [HotFIX](https://github.com/Validus-Risk-Management/hotfix) FIX engine.

## Overview

This crate provides `MongoDbMessageStore`, a persistent message store backed by MongoDB. It implements the
`MessageStore` trait from [hotfix-store](https://crates.io/crates/hotfix-store).

## Usage

```rust
use hotfix_store_mongodb::{Client, MongoDbMessageStore};

// Connect to MongoDB
let client = Client::with_uri_str("mongodb://localhost:27017").await?;
let db = client.database("myapp");

// Create the store
let store = MongoDbMessageStore::new(db, Some("fix_messages")).await?;
```

## Features

- Persistent storage of FIX messages and sequence numbers
- Automatic index creation for efficient queries
- Session cleanup with `cleanup_older_than()` method

## Cleanup

The store provides a method to clean up old session data:

```rust
use chrono::Duration;

// Delete sequences older than 30 days
let deleted_count = store.cleanup_older_than(Duration::days(30)).await?;
```
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
//! MongoDB message store implementation for the hotfix FIX engine.
//!
//! This crate provides [`MongoDbMessageStore`], a persistent message store
//! backed by MongoDB.
//!
//! # Example
//!
//! ```ignore
//! use hotfix_store_mongodb::{Client, MongoDbMessageStore};
//!
//! let client = Client::with_uri_str("mongodb://localhost:27017").await?;
//! let db = client.database("myapp");
//! let store = MongoDbMessageStore::new(db, Some("fix_messages")).await?;
//! ```

use async_trait::async_trait;
use chrono::{DateTime, Duration, TimeZone, Utc};
use futures::TryStreamExt;
Expand All @@ -9,9 +24,10 @@ use mongodb::options::{FindOneOptions, IndexOptions, ReplaceOptions};
use mongodb::{Collection, Database, IndexModel};
use serde::{Deserialize, Serialize};

pub use mongodb::Client;
use hotfix_store::MessageStore;
use hotfix_store::error::{Result, StoreError};

use crate::store::{MessageStore, Result, StoreError};
pub use mongodb::Client;

#[derive(Debug, Deserialize, Serialize)]
struct SequenceMeta {
Expand All @@ -30,14 +46,37 @@ struct Message {
data: Binary,
}

/// A MongoDB-backed message store implementation.
///
/// This store persists messages and sequence numbers to MongoDB,
/// allowing session state to survive application restarts.
pub struct MongoDbMessageStore {
meta_collection: Collection<SequenceMeta>,
message_collection: Collection<Message>,
current_sequence: SequenceMeta,
}

impl MongoDbMessageStore {
pub async fn new(db: Database, collection_name: Option<&str>) -> anyhow::Result<Self> {
/// Creates a new MongoDB message store.
///
/// # Arguments
///
/// * `db` - The MongoDB database to use
/// * `collection_name` - Optional collection name (defaults to "messages")
///
/// # Errors
///
/// Returns `StoreError::Initialization` if the store cannot be initialized.
pub async fn new(db: Database, collection_name: Option<&str>) -> Result<Self> {
Self::new_inner(db, collection_name)
.await
.map_err(|e| StoreError::Initialization(e.into()))
}

async fn new_inner(
db: Database,
collection_name: Option<&str>,
) -> mongodb::error::Result<Self> {
let collection_name = collection_name.unwrap_or("messages");
let meta_collection = db.collection(collection_name);
let message_collection = db.collection(collection_name);
Expand All @@ -53,7 +92,9 @@ impl MongoDbMessageStore {
Ok(store)
}

async fn ensure_indexes(meta_collection: &Collection<SequenceMeta>) -> anyhow::Result<()> {
async fn ensure_indexes(
meta_collection: &Collection<SequenceMeta>,
) -> mongodb::error::Result<()> {
let meta_index = IndexModel::builder()
.keys(doc! { "meta": 1, "_id": -1 })
.build();
Expand All @@ -71,7 +112,7 @@ impl MongoDbMessageStore {

async fn get_or_default_sequence(
meta_collection: &Collection<SequenceMeta>,
) -> anyhow::Result<SequenceMeta> {
) -> mongodb::error::Result<SequenceMeta> {
let options = FindOneOptions::builder().sort(doc! { "_id": -1 }).build();
let res = meta_collection
.find_one(doc! { "meta": true })
Expand All @@ -87,7 +128,7 @@ impl MongoDbMessageStore {

async fn new_sequence(
meta_collection: &Collection<SequenceMeta>,
) -> anyhow::Result<SequenceMeta> {
) -> mongodb::error::Result<SequenceMeta> {
let sequence_id = ObjectId::new();
let initial_meta = SequenceMeta {
object_id: sequence_id,
Expand All @@ -103,7 +144,20 @@ impl MongoDbMessageStore {

/// Deletes sequences older than the specified age, along with their associated messages.
///
/// Returns the number of deleted sequences.
/// This method is useful for cleaning up old session data from MongoDB.
/// The current active sequence is never deleted, even if it matches the age criteria.
///
/// # Arguments
///
/// * `age` - The minimum age of sequences to delete
///
/// # Returns
///
/// The number of deleted sequences.
///
/// # Errors
///
/// Returns `StoreError::Cleanup` if the cleanup operation fails.
pub async fn cleanup_older_than(&self, age: Duration) -> Result<u64> {
let cutoff = BsonDateTime::from_millis((Utc::now() - age).timestamp_millis());

Expand Down
48 changes: 48 additions & 0 deletions crates/hotfix-store-mongodb/tests/conformance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//! Conformance tests for MongoDbMessageStore using the test harness from hotfix-store.

use hotfix_store::MessageStore;
use hotfix_store::test_utils::TestStoreFactory;
use hotfix_store_mongodb::{Client, MongoDbMessageStore};
use testcontainers::runners::AsyncRunner;
use testcontainers::{ContainerAsync, GenericImage};
use tokio::sync::OnceCell;

static MONGO_CONTAINER: OnceCell<ContainerAsync<GenericImage>> = OnceCell::const_new();
const MONGO_PORT: u16 = 27017;

struct MongodbTestStoreFactory {
client: Client,
collection_name: String,
}

impl MongodbTestStoreFactory {
async fn new() -> Self {
let container = MONGO_CONTAINER.get_or_init(Self::init_container).await;
let host = container.get_host().await.unwrap();
let port = container.get_host_port_ipv4(MONGO_PORT).await.unwrap();
let uri = format!("mongodb://{host}:{port}");
let client = Client::with_uri_str(&uri).await.unwrap();

Self {
client,
collection_name: uuid::Uuid::new_v4().to_string(),
}
}

async fn init_container() -> ContainerAsync<GenericImage> {
GenericImage::new("mongo", "8.0").start().await.unwrap()
}
}

#[async_trait::async_trait]
impl TestStoreFactory for MongodbTestStoreFactory {
async fn create_store(&self) -> Box<dyn MessageStore> {
let db = self.client.database("hotfixConformanceTests");
let store = MongoDbMessageStore::new(db, Some(&self.collection_name))
.await
.unwrap();
Box::new(store)
}
}

hotfix_store::conformance_tests!(mongodb, MongodbTestStoreFactory::new().await);
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
#![cfg(feature = "mongodb")]
//! MongoDB-specific tests for MongoDbMessageStore.
//!
//! These tests cover MongoDB-specific functionality such as connection failure handling
//! and the cleanup_older_than method.

use chrono::Duration;
use hotfix::store::mongodb::{Client, MongoDbMessageStore};
use hotfix::store::{MessageStore, StoreError};
use hotfix_store::MessageStore;
use hotfix_store::error::StoreError;
use hotfix_store_mongodb::{Client, MongoDbMessageStore};
use testcontainers::runners::AsyncRunner;
use testcontainers::{ContainerAsync, GenericImage};

Expand Down
30 changes: 30 additions & 0 deletions crates/hotfix-store/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "hotfix-store"
description = "Message store traits and implementations for the HotFIX engine"
version = "0.1.0"
authors.workspace = true
edition.workspace = true
license.workspace = true
readme = "README.md"
homepage.workspace = true
repository.workspace = true
keywords.workspace = true
categories.workspace = true

[features]
default = ["test-utils"]
test-utils = ["dep:tokio"]

[lints]
workspace = true

[dependencies]
async-trait = { workspace = true }
chrono = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["time"], optional = true }

[dev-dependencies]
tokio = { workspace = true, features = ["test-util", "rt-multi-thread", "macros", "time"] }
uuid = { workspace = true, features = ["v4"] }
tempfile = "3"
50 changes: 50 additions & 0 deletions crates/hotfix-store/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# hotfix-store

Message store traits and implementations for the [HotFIX](https://github.com/Validus-Risk-Management/hotfix) FIX engine.

## Overview

This crate provides the `MessageStore` trait and core implementations for persisting FIX session state, including
messages and sequence numbers.

## Implementations

- **InMemoryMessageStore**: A non-persistent store for testing
- **FileStore**: A file-based store for simple persistence

Additional implementations are available in separate crates:

- [hotfix-store-mongodb](https://crates.io/crates/hotfix-store-mongodb): MongoDB-backed store

## Usage

```rust
use hotfix_store::{MessageStore, InMemoryMessageStore, FileStore};

// In-memory store (for testing)
let store = InMemoryMessageStore::default ();

// File-based store (for persistence)
let store = FileStore::new("/path/to/store", "session_name") ?;
```

## Test Utilities

The `test-utils` feature provides a test harness for verifying custom `MessageStore` implementations:

```rust
use hotfix_store::test_utils::TestStoreFactory;
use hotfix_store::conformance_tests;

struct MyStoreFactory;

#[async_trait::async_trait]
impl TestStoreFactory for MyStoreFactory {
async fn create_store(&self) -> Box<dyn MessageStore> {
Box::new(MyStore::new())
}
}

// Generates all conformance tests for your implementation
conformance_tests!(my_store, MyStoreFactory);
```
Loading