Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/common/Configuration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace common
{
public static class Configuration
{
public static MongoQueue<T> GetQueue<T>() where T : class
public static MongoQueue<T>GetQueue<T>() where T : class
{
var connectionString = ConfigurationManager.ConnectionStrings["mongo-queue"].ConnectionString;
var queueSize = long.Parse(ConfigurationManager.AppSettings["mongo-queue.size"]);
Expand Down
5 changes: 3 additions & 2 deletions src/common/example-common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="Common.Logging">
<HintPath>..\packages\Common.Logging.2.1.1\lib\net40\Common.Logging.dll</HintPath>
<Reference Include="Common.Logging, Version=2.1.2.0, Culture=neutral, PublicKeyToken=af08829b84f0328e, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Common.Logging.2.1.2\lib\net40\Common.Logging.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Configuration" />
Expand Down
3 changes: 1 addition & 2 deletions src/common/packages.config
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>

<packages>
<package id="Common.Logging" version="2.1.1" targetFramework="net40" />
<package id="Common.Logging" version="2.1.2" targetFramework="net40" />
</packages>
22 changes: 22 additions & 0 deletions src/mongo-queue.sln
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,19 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".nuget", ".nuget", "{12462A
.nuget\NuGet.targets = .nuget\NuGet.targets
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "test-mongo-queue", "test-mongo-queue\test-mongo-queue.csproj", "{2DA4B957-C4B8-41FF-9570-214F0BA76EB6}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{886BB0A1-73E8-4239-A584-60EF2E37DD3F}"
ProjectSection(SolutionItems) = preProject
Local.testsettings = Local.testsettings
mongo-queue.vsmdi = mongo-queue.vsmdi
TraceAndTestImpact.testsettings = TraceAndTestImpact.testsettings
EndProjectSection
EndProject
Global
GlobalSection(TestCaseManagementSettings) = postSolution
CategoryFile = mongo-queue.vsmdi
EndGlobalSection
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Debug|Mixed Platforms = Debug|Mixed Platforms
Expand Down Expand Up @@ -70,6 +82,16 @@ Global
{B2F1C92B-2302-4EBC-9222-D802F6CCC47B}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{B2F1C92B-2302-4EBC-9222-D802F6CCC47B}.Release|x86.ActiveCfg = Release|x86
{B2F1C92B-2302-4EBC-9222-D802F6CCC47B}.Release|x86.Build.0 = Release|x86
{2DA4B957-C4B8-41FF-9570-214F0BA76EB6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2DA4B957-C4B8-41FF-9570-214F0BA76EB6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2DA4B957-C4B8-41FF-9570-214F0BA76EB6}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{2DA4B957-C4B8-41FF-9570-214F0BA76EB6}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{2DA4B957-C4B8-41FF-9570-214F0BA76EB6}.Debug|x86.ActiveCfg = Debug|Any CPU
{2DA4B957-C4B8-41FF-9570-214F0BA76EB6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2DA4B957-C4B8-41FF-9570-214F0BA76EB6}.Release|Any CPU.Build.0 = Release|Any CPU
{2DA4B957-C4B8-41FF-9570-214F0BA76EB6}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{2DA4B957-C4B8-41FF-9570-214F0BA76EB6}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{2DA4B957-C4B8-41FF-9570-214F0BA76EB6}.Release|x86.ActiveCfg = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
3 changes: 3 additions & 0 deletions src/mongo-queue/MongoMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@ public MongoMessage(T message)
public DateTime Enqueued { get; private set; }
public T Message { get; private set; }
}

// A dummy mesage used to 'start' a capped collection. (since tailable cursors cannot operate on empty ones)
public class NOOPMessage { };
}
25 changes: 15 additions & 10 deletions src/mongo-queue/MongoQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ public class MongoQueue<T> : IPublish<T>, ISubscribe<T> where T : class
private static ILog Log = LogManager.GetCurrentClassLogger();

private readonly MongoDatabase _database;
private readonly MongoCollection<BsonDocument> _position; // used to record the current position
private readonly IMongoQuery _positionQuery;
//private readonly MongoCollection<BsonDocument> _position; // used to record the current position
//private readonly IMongoQuery _positionQuery;
private readonly MongoCollection<MongoMessage<T>> _queue; // the collection for the messages
private readonly string _queueName = typeof(T).Name; // name of collection (based on type name)

private MongoCursorEnumerator<MongoMessage<T>> _enumerator; // our cursor enumerator
private ObjectId _lastId = ObjectId.Empty; // the last _id read from the queue
private bool _startedReading; // initial query on an empty collection is a special case

private readonly DateTime _started = DateTime.UtcNow;

public MongoQueue(string connectionString, long queueSize)
{
// our queue name will be the same as the message class
Expand All @@ -38,6 +40,7 @@ public MongoQueue(string connectionString, long queueSize)
.SetMaxSize(queueSize); // limit the size of the collection and pre-allocated the space to this number of bytes

_database.CreateCollection(_queueName, options);
_database.GetCollection(_queueName).Save(new NOOPMessage()); // 'start' the collection
}
catch
{
Expand All @@ -49,12 +52,12 @@ public MongoQueue(string connectionString, long queueSize)
_queue = _database.GetCollection<MongoMessage<T>>(_queueName);

// check if we already have a 'last read' position to start from
_position = _database.GetCollection("_queueIndex");
var last = _position.FindOneById(_queueName);
if (last != null)
_lastId = last["last"].AsObjectId;
//_position = _database.GetCollection("_queueIndex");
//var last = _position.FindOneById(_queueName);
//if (last != null)
// _lastId = last["last"].AsObjectId;

_positionQuery = Query.EQ("_id", _queueName);
//_positionQuery = Query.EQ("_id", _queueName);
}

#region IPublish<T> Members
Expand Down Expand Up @@ -92,7 +95,7 @@ public T Receive()
// yes - record the current position and return it to the client
_startedReading = true;
_lastId = _enumerator.Current.Id;
_position.Update(_positionQuery, Update.Set("last", _lastId), UpdateFlags.Upsert, SafeMode.False);
//_position.Update(_positionQuery, Update.Set("last", _lastId), UpdateFlags.Upsert, SafeMode.False);

var message = _enumerator.Current.Message;
var delay = DateTime.UtcNow - _enumerator.Current.Enqueued;
Expand Down Expand Up @@ -133,9 +136,11 @@ public T Receive()
private MongoCursorEnumerator<MongoMessage<T>> InitializeCursor()
{
Log.Debug(m => m("Initializing Cursor from {0}", _lastId));

IMongoQuery findFirstMessageQuery = (_lastId == ObjectId.Empty || _lastId==null) ?
Query.GT("Enqueued", _started) : // First run
Query.GT("_id", _lastId); // Rest of iterations
var cursor = _queue
.Find(Query.GT("_id", _lastId))
.Find(findFirstMessageQuery)
.SetFlags(
QueryFlags.AwaitData |
QueryFlags.NoCursorTimeout |
Expand Down
15 changes: 9 additions & 6 deletions src/mongo-queue/mongo-queue.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="Common.Logging">
<HintPath>..\packages\Common.Logging.2.1.1\lib\net40\Common.Logging.dll</HintPath>
<Reference Include="Common.Logging, Version=2.1.2.0, Culture=neutral, PublicKeyToken=af08829b84f0328e, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Common.Logging.2.1.2\lib\net40\Common.Logging.dll</HintPath>
</Reference>
<Reference Include="MongoDB.Bson">
<HintPath>..\packages\mongocsharpdriver.1.5\lib\net35\MongoDB.Bson.dll</HintPath>
<Reference Include="MongoDB.Bson, Version=1.7.1.4791, Culture=neutral, PublicKeyToken=f686731cfb9cc103, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\mongocsharpdriver.1.7.1\lib\net35\MongoDB.Bson.dll</HintPath>
</Reference>
<Reference Include="MongoDB.Driver">
<HintPath>..\packages\mongocsharpdriver.1.5\lib\net35\MongoDB.Driver.dll</HintPath>
<Reference Include="MongoDB.Driver, Version=1.7.1.4791, Culture=neutral, PublicKeyToken=f686731cfb9cc103, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\mongocsharpdriver.1.7.1\lib\net35\MongoDB.Driver.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
Expand Down
5 changes: 2 additions & 3 deletions src/mongo-queue/packages.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>

<packages>
<package id="Common.Logging" version="2.1.1" targetFramework="net40" />
<package id="mongocsharpdriver" version="1.5" targetFramework="net40" />
<package id="Common.Logging" version="2.1.2" targetFramework="net40" />
<package id="mongocsharpdriver" version="1.7.1" targetFramework="net40" />
</packages>
2 changes: 0 additions & 2 deletions src/publisher/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ private static void Main(string[] args)
bool running = true;
while (running)
{
if (!Console.KeyAvailable) continue;

ConsoleKeyInfo keypress = Console.ReadKey(true);
switch (keypress.Key)
{
Expand Down
5 changes: 3 additions & 2 deletions src/publisher/example-publisher.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@
<CodeAnalysisIgnoreBuiltInRules>true</CodeAnalysisIgnoreBuiltInRules>
</PropertyGroup>
<ItemGroup>
<Reference Include="Common.Logging">
<HintPath>..\packages\Common.Logging.2.1.1\lib\net40\Common.Logging.dll</HintPath>
<Reference Include="Common.Logging, Version=2.1.2.0, Culture=neutral, PublicKeyToken=af08829b84f0328e, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Common.Logging.2.1.2\lib\net40\Common.Logging.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
Expand Down
3 changes: 1 addition & 2 deletions src/publisher/packages.config
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>

<packages>
<package id="Common.Logging" version="2.1.1" targetFramework="net40" />
<package id="Common.Logging" version="2.1.2" targetFramework="net40" />
</packages>
2 changes: 0 additions & 2 deletions src/subscriber/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ private static void Main(string[] args)
bool running = true;
while (running)
{
if (!Console.KeyAvailable) continue;

ConsoleKeyInfo keypress = Console.ReadKey(true);
switch (keypress.Key)
{
Expand Down
5 changes: 3 additions & 2 deletions src/subscriber/example-subscriber.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@
<CodeAnalysisIgnoreBuiltInRules>true</CodeAnalysisIgnoreBuiltInRules>
</PropertyGroup>
<ItemGroup>
<Reference Include="Common.Logging">
<HintPath>..\packages\Common.Logging.2.1.1\lib\net40\Common.Logging.dll</HintPath>
<Reference Include="Common.Logging, Version=2.1.2.0, Culture=neutral, PublicKeyToken=af08829b84f0328e, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\Common.Logging.2.1.2\lib\net40\Common.Logging.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
Expand Down
3 changes: 1 addition & 2 deletions src/subscriber/packages.config
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>

<packages>
<package id="Common.Logging" version="2.1.1" targetFramework="net40" />
<package id="Common.Logging" version="2.1.2" targetFramework="net40" />
</packages>
11 changes: 11 additions & 0 deletions src/test-mongo-queue/App.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings>
<!-- 10Mb -->
<add key="mongo-queue.size" value="10485760" />
</appSettings>

<connectionStrings>
<add name="mongo-queue" connectionString="mongodb://localhost/mongo-queue-test" />
</connectionStrings>
</configuration>
Loading