Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement implement IPersistTimeoutsV2 interface #131

Merged
merged 7 commits into from
Oct 9, 2015
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
implement IPersistTimeoutsV2
  • Loading branch information
Tim Bussmann committed Oct 8, 2015
commit 60c0221b3b195a1617b3c7312c99bbae16c26d81
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace NServiceBus.TimeoutPersisters.NHibernate.Tests
class When_fetching_timeouts_from_storage : InMemoryDBFixture
{
[Test]
public void Should_return_the_complete_list_of_timeouts()
public void GetNextChunk_should_return_the_complete_list_of_timeouts()
{
const int numberOfTimeoutsToAdd = 10;

Expand All @@ -32,7 +32,7 @@ public void Should_return_the_complete_list_of_timeouts()
}

[Test]
public void Should_return_the_next_time_of_retrieval()
public void GetNextChunk_should_return_the_next_time_of_retrieval()
{
var nextTime = DateTime.UtcNow.AddHours(1);

Expand All @@ -43,7 +43,7 @@ public void Should_return_the_next_time_of_retrieval()
SagaId = Guid.NewGuid(),
State = new byte[] { 0, 0, 133 },
Headers = new Dictionary<string, string> { { "Bar", "34234" }, { "Foo", "aString1" }, { "Super", "aString2" } },
OwningTimeoutManager = "MyTestEndpoint",
OwningTimeoutManager = "MyTestEndpoint"
});


Expand All @@ -53,5 +53,48 @@ public void Should_return_the_next_time_of_retrieval()

Assert.IsTrue((nextTime - nextTimeToRunQuery).TotalSeconds < 1);
}

[Test]
public void Peek_should_return_timeout_with_id()
{
var timeoutData = new TimeoutData
{
Time = DateTime.UtcNow.AddHours(-1),
Destination = new Address("timeouts", RuntimeEnvironment.MachineName),
SagaId = Guid.NewGuid(),
State = new byte[] { 0, 0, 133 },
Headers = new Dictionary<string, string> { { "Bar", "34234" }, { "Foo", "aString1" }, { "Super", "aString2" } },
OwningTimeoutManager = "MyTestEndpoint"
};
persister.Add(timeoutData);

var result = persister.Peek(timeoutData.Id);

Assert.AreEqual(timeoutData.Id, result.Id);
Assert.AreEqual(timeoutData.Time.ToString("G"), result.Time.ToString("G"));
Assert.AreEqual(timeoutData.Destination, result.Destination);
Assert.AreEqual(timeoutData.SagaId, result.SagaId);
Assert.AreEqual(timeoutData.State, result.State);
Assert.AreEqual(timeoutData.Headers, result.Headers);
}

[Test]
public void Peek_should_return_null_when_no_existing_timeout_with_id()
{
var timeoutData = new TimeoutData
{
Time = DateTime.UtcNow.AddHours(-1),
Destination = new Address("timeouts", RuntimeEnvironment.MachineName),
SagaId = Guid.NewGuid(),
State = new byte[] { 0, 0, 133 },
Headers = new Dictionary<string, string> { { "Bar", "34234" }, { "Foo", "aString1" }, { "Super", "aString2" } },
OwningTimeoutManager = "MyTestEndpoint"
};
persister.Add(timeoutData);

var result = persister.Peek(Guid.NewGuid().ToString());

Assert.IsNull(result);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ namespace NServiceBus.TimeoutPersisters.NHibernate.Tests
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
using NUnit.Framework;
using Support;
using Timeout.Core;
Expand All @@ -10,7 +13,7 @@ namespace NServiceBus.TimeoutPersisters.NHibernate.Tests
class When_removing_timeouts_from_the_storage : InMemoryDBFixture
{
[Test]
public void Should_return_the_correct_headers()
public void TryRemove_should_return_the_correct_headers()
{
var headers = new Dictionary<string, string>
{
Expand All @@ -20,14 +23,14 @@ public void Should_return_the_correct_headers()
};

var timeout = new TimeoutData
{
Time = DateTime.UtcNow.AddHours(-1),
Destination = new Address("timeouts", RuntimeEnvironment.MachineName),
SagaId = Guid.NewGuid(),
State = new byte[] {1, 1, 133, 200},
Headers = headers,
OwningTimeoutManager = "MyTestEndpoint",
};
{
Time = DateTime.UtcNow.AddHours(-1),
Destination = new Address("timeouts", RuntimeEnvironment.MachineName),
SagaId = Guid.NewGuid(),
State = new byte[] { 1, 1, 133, 200 },
Headers = headers,
OwningTimeoutManager = "MyTestEndpoint",
};
persister.Add(timeout);

TimeoutData timeoutData;
Expand All @@ -37,26 +40,67 @@ public void Should_return_the_correct_headers()
}

[Test]
public void Should_remove_timeouts_by_id()
public void TryRemove_should_remove_timeouts_by_id()
{
var t1 = new TimeoutData
{
Time = DateTime.Now.AddYears(-1),
OwningTimeoutManager = "MyTestEndpoint",
Headers = new Dictionary<string, string>
{
Time = DateTime.Now.AddYears(-1),
OwningTimeoutManager = "MyTestEndpoint",
Headers = new Dictionary<string, string>
{
{"Header1", "Value1"}
}
};
};
var t2 = new TimeoutData
{
Time = DateTime.Now.AddYears(-1),
OwningTimeoutManager = "MyTestEndpoint",
Headers = new Dictionary<string, string>
{
Time = DateTime.Now.AddYears(-1),
OwningTimeoutManager = "MyTestEndpoint",
Headers = new Dictionary<string, string>
{
{"Header1", "Value1"}
}
};

persister.Add(t1);
persister.Add(t2);

DateTime nextTimeToRunQuery;
var timeouts = persister.GetNextChunk(DateTime.UtcNow.AddYears(-3), out nextTimeToRunQuery);

foreach (var timeout in timeouts)
{
var timeoutRemoved = persister.TryRemove(timeout.Item1);
Assert.IsTrue(timeoutRemoved);
}

using (var session = sessionFactory.OpenSession())
{
Assert.Null(session.Get<TimeoutEntity>(new Guid(t1.Id)));
Assert.Null(session.Get<TimeoutEntity>(new Guid(t2.Id)));
}
}

[Test]
public void TryRemove_should_remove_timeouts_by_id_and_return_removed_timeout()
{
var t1 = new TimeoutData
{
Time = DateTime.Now.AddYears(-1),
OwningTimeoutManager = "MyTestEndpoint",
Headers = new Dictionary<string, string>
{
{"Header1", "Value1"}
}
};
var t2 = new TimeoutData
{
Time = DateTime.Now.AddYears(-1),
OwningTimeoutManager = "MyTestEndpoint",
Headers = new Dictionary<string, string>
{
{"Header1", "Value1"}
}
};
};

persister.Add(t1);
persister.Add(t2);
Expand All @@ -67,7 +111,9 @@ public void Should_remove_timeouts_by_id()
foreach (var timeout in timeouts)
{
TimeoutData timeoutData;
persister.TryRemove(timeout.Item1, out timeoutData);
var timeoutRemoved = persister.TryRemove(timeout.Item1, out timeoutData);
Assert.IsTrue(timeoutRemoved);
Assert.IsNotNull(timeoutData);
}

using (var session = sessionFactory.OpenSession())
Expand All @@ -78,35 +124,110 @@ public void Should_remove_timeouts_by_id()
}

[Test]
public void Should_remove_timeouts_by_sagaid()
public void TryRemove_should_return_false_when_timeout_already_deleted()
{
var timeout = new TimeoutData();

persister.Add(timeout);

Assert.IsTrue(persister.TryRemove(timeout.Id));
Assert.IsFalse(persister.TryRemove(timeout.Id));
}

[Test]
public void TryRemove_should_return_false_and_no_timeout_when_timeout_already_deleted()
{
var timeout = new TimeoutData();

persister.Add(timeout);

Assert.IsTrue(persister.TryRemove(timeout.Id));

TimeoutData timeoutData;
var timeoutRemoved = persister.TryRemove(timeout.Id, out timeoutData);
Assert.IsFalse(timeoutRemoved);
Assert.IsNull(timeoutData);
}

[Test]
public void TryRemove_should_work_with_concurrent_transactions()
{
var timeout = new TimeoutData
{
Time = DateTime.Now
};

persister.Add(timeout);

var t1EnteredTx = new AutoResetEvent(false);
var t2EnteredTx = new AutoResetEvent(false);

var task1 = Task.Run(() =>
{
using (var tx = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions
{
IsolationLevel = IsolationLevel.ReadCommitted
}))
{
t1EnteredTx.Set();
t2EnteredTx.WaitOne();
var result = persister.TryRemove(timeout.Id);
tx.Complete();
return result;
}
});

var task2 = Task.Run(() =>
{
using (var tx = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions
{
IsolationLevel = IsolationLevel.ReadCommitted
}))
{
t2EnteredTx.Set();
t1EnteredTx.WaitOne();
var result = persister.TryRemove(timeout.Id);
tx.Complete();
return result;
}
});

Assert.IsTrue(Task.WaitAll(new[] { task1, task2 }, TimeSpan.FromSeconds(30)));

// one delete should succeed, the other one shouldn't
Assert.IsTrue(task1.Result || task2.Result);
Assert.IsFalse(task1.Result && task2.Result);
}

[Test]
public void RemoveTimeoutBy_should_remove_timeouts_by_sagaid()
{
var sagaId1 = Guid.NewGuid();
var sagaId2 = Guid.NewGuid();
var t1 = new TimeoutData
{
SagaId = sagaId1,
Time = DateTime.Now.AddYears(1),
OwningTimeoutManager = "MyTestEndpoint",
Headers = new Dictionary<string, string>
{
SagaId = sagaId1,
Time = DateTime.Now.AddYears(1),
OwningTimeoutManager = "MyTestEndpoint",
Headers = new Dictionary<string, string>
{
{"Header1", "Value1"}
}
};
};
var t2 = new TimeoutData
{
SagaId = sagaId2,
Time = DateTime.Now.AddYears(1),
OwningTimeoutManager = "MyTestEndpoint",
Headers = new Dictionary<string, string>
{
SagaId = sagaId2,
Time = DateTime.Now.AddYears(1),
OwningTimeoutManager = "MyTestEndpoint",
Headers = new Dictionary<string, string>
{
{"Header1", "Value1"}
}
};
};

persister.Add(t1);
persister.Add(t2);


persister.RemoveTimeoutBy(sagaId1);
persister.RemoveTimeoutBy(sagaId2);

Expand Down
Loading