Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/lint-pr-title.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ on:
jobs:
main:
name: Validate PR title
runs-on: ubuntu-slim
runs-on: ubuntu-latest
permissions:
pull-requests: read
steps:
Expand Down
4 changes: 4 additions & 0 deletions crates/hotfix/src/store/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ pub enum StoreError {
/// Failed to reset the store.
#[error("failed to reset store")]
Reset(#[source] BoxError),

/// Failed to cleanup old sequences.
#[error("failed to cleanup old sequences")]
Cleanup(#[source] BoxError),
}

/// A specialized Result type for store operations.
Expand Down
60 changes: 55 additions & 5 deletions crates/hotfix/src/store/mongodb.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use chrono::{DateTime, Duration, TimeZone, Utc};
use futures::TryStreamExt;
use mongodb::bson::Binary;
use mongodb::bson::doc;
use mongodb::bson::oid::ObjectId;
use mongodb::bson::spec::BinarySubtype;
use mongodb::bson::{Binary, DateTime as BsonDateTime};
use mongodb::options::{FindOneOptions, IndexOptions, ReplaceOptions};
use mongodb::{Collection, Database, IndexModel};
use serde::{Deserialize, Serialize};
Expand All @@ -18,7 +18,7 @@ struct SequenceMeta {
#[serde(rename = "_id")]
object_id: ObjectId,
meta: bool,
creation_time: DateTime<Utc>,
creation_time: BsonDateTime,
sender_seq_number: u64,
target_seq_number: u64,
}
Expand Down Expand Up @@ -92,14 +92,61 @@ impl MongoDbMessageStore {
let initial_meta = SequenceMeta {
object_id: sequence_id,
meta: true,
creation_time: Utc::now(),
creation_time: BsonDateTime::now(),
sender_seq_number: 0,
target_seq_number: 0,
};
meta_collection.insert_one(&initial_meta).await?;

Ok(initial_meta)
}

/// Deletes sequences older than the specified age, along with their associated messages.
///
/// Returns the number of deleted sequences.
pub async fn cleanup_older_than(&self, age: Duration) -> Result<u64> {
let cutoff = BsonDateTime::from_millis((Utc::now() - age).timestamp_millis());

// Find old sequence IDs (excluding current sequence)
let filter = doc! {
"meta": true,
"creation_time": { "$lt": cutoff },
"_id": { "$ne": self.current_sequence.object_id }
};
let mut cursor = self
.meta_collection
.find(filter)
.await
.map_err(|e| StoreError::Cleanup(e.into()))?;

let mut old_sequence_ids = Vec::new();
while let Some(meta) = cursor
.try_next()
.await
.map_err(|e| StoreError::Cleanup(e.into()))?
{
old_sequence_ids.push(meta.object_id);
}

if old_sequence_ids.is_empty() {
return Ok(0);
}

// Delete messages first to avoid orphaned meta documents
self.message_collection
.delete_many(doc! { "sequence_id": { "$in": &old_sequence_ids } })
.await
.map_err(|e| StoreError::Cleanup(e.into()))?;

// Delete sequence metas
let result = self
.meta_collection
.delete_many(doc! { "_id": { "$in": &old_sequence_ids } })
.await
.map_err(|e| StoreError::Cleanup(e.into()))?;

Ok(result.deleted_count)
}
}

#[async_trait]
Expand Down Expand Up @@ -215,6 +262,9 @@ impl MessageStore for MongoDbMessageStore {
}

fn creation_time(&self) -> DateTime<Utc> {
self.current_sequence.creation_time
#[allow(clippy::expect_used)]
Utc.timestamp_millis_opt(self.current_sequence.creation_time.timestamp_millis())
.single()
.expect("BsonDateTime is guaranteed to store valid timestamp")
}
}
5 changes: 3 additions & 2 deletions crates/hotfix/tests/common_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,13 @@ async fn test_creation_time_gets_reset_correctly() {
for factory in create_test_store_factories().await {
let mut store = factory.create_store().await;

tokio::time::sleep(std::time::Duration::from_millis(10)).await;
tokio::time::sleep(std::time::Duration::from_millis(2)).await;
let after_sleep = Utc::now();
tokio::time::sleep(std::time::Duration::from_millis(2)).await;

store.reset().await.expect("failed to reset store");
let reset_creation_time = store.creation_time();
assert!(reset_creation_time >= after_sleep);
assert!(reset_creation_time > after_sleep);

if !factory.is_persistent() {
continue;
Expand Down
75 changes: 75 additions & 0 deletions crates/hotfix/tests/mongodb_store_tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![cfg(feature = "mongodb")]

use chrono::Duration;
use hotfix::store::mongodb::{Client, MongoDbMessageStore};
use hotfix::store::{MessageStore, StoreError};
use testcontainers::runners::AsyncRunner;
Expand Down Expand Up @@ -114,3 +115,77 @@ async fn test_state_preserved_after_failed_set_target() {
// State should be unchanged
assert_eq!(store.next_target_seq_number(), initial_target_seq);
}

#[tokio::test]
async fn test_cleanup_removes_old_sequences() {
let (container, mut store) = create_dedicated_container_and_store().await;

// Add a message to the initial sequence
store.add(1, b"message in sequence 1").await.unwrap();

// Reset creates a new sequence, making the first one "old"
store.reset().await.unwrap();
store.add(1, b"message in sequence 2").await.unwrap();

// Reset again to have two old sequences
store.reset().await.unwrap();
store.add(1, b"message in sequence 3").await.unwrap();

// Small delay to ensure old sequences have earlier timestamps than the cutoff
tokio::time::sleep(std::time::Duration::from_millis(1)).await;

// Cleanup with zero duration should delete all old sequences
let deleted = store.cleanup_older_than(Duration::zero()).await.unwrap();

assert_eq!(deleted, 2);

drop(container);
}

#[tokio::test]
async fn test_cleanup_preserves_current_sequence() {
let (container, mut store) = create_dedicated_container_and_store().await;

// Add messages to current sequence
store.add(1, b"message 1").await.unwrap();
store.add(2, b"message 2").await.unwrap();

// Cleanup with zero duration - current sequence should be preserved
let deleted = store.cleanup_older_than(Duration::zero()).await.unwrap();

assert_eq!(deleted, 0);

// Verify messages are still accessible
let messages = store.get_slice(1, 2).await.unwrap();
assert_eq!(messages.len(), 2);

drop(container);
}

#[tokio::test]
async fn test_cleanup_respects_age_threshold() {
let (container, mut store) = create_dedicated_container_and_store().await;

// Create an old sequence
store.reset().await.unwrap();

// Cleanup with a large duration should not delete anything
let deleted = store.cleanup_older_than(Duration::days(365)).await.unwrap();

assert_eq!(deleted, 0);

drop(container);
}

#[tokio::test]
async fn test_cleanup_after_connection_drop() {
let (container, store) = create_dedicated_container_and_store().await;

// Stop the container
container.stop().await.unwrap();

// Attempt cleanup - should fail
let result = store.cleanup_older_than(Duration::zero()).await;

assert!(matches!(result, Err(StoreError::Cleanup(_))));
}