Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SessionState.Store (FileStore) is not threadsafe on .Set and .Get #259

Merged
merged 1 commit into from
Apr 23, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion QuickFIXn/SessionState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,10 @@ public ResendRange GetResendRange()

public void Get(int begSeqNo, int endSeqNo, List<string> messages)
{
MessageStore.Get(begSeqNo, endSeqNo, messages);
lock (sync_)
{
MessageStore.Get(begSeqNo, endSeqNo, messages);
}
}

public void SetResendRange(int begin, int end, int chunkEnd=-1)
Expand Down
97 changes: 96 additions & 1 deletion UnitTests/SessionStateTest.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
using NUnit.Framework;
using System;
using NUnit.Framework;
using QuickFix;
using System.Collections;
using System.Collections.Generic;
using System.Threading;

namespace UnitTests
{
Expand Down Expand Up @@ -125,5 +129,96 @@ public void WithinHeartbeat()
lastReceivedTime = lastReceivedTime.AddMilliseconds(1);
Assert.True(SessionState.WithinHeartbeat(now, heartBtIntMillis, lastSentTime, lastReceivedTime));
}

[Test]
public void ThreadSafeSetAndGet() {
//Set up store
if (System.IO.Directory.Exists("store")) {
System.IO.Directory.Delete("store", true);
}

SessionID sessionId = new SessionID("FIX.4.2", "SENDERCOMP", "TARGETCOMP");

Dictionary config = new Dictionary();
config.SetString(SessionSettings.CONNECTION_TYPE, "initiator");
config.SetString(SessionSettings.FILE_STORE_PATH, "store");

SessionSettings settings = new SessionSettings();
settings.Set(sessionId, config);
FileStoreFactory factory = new FileStoreFactory(settings);

FileStore store = (FileStore)factory.Create(sessionId);

NullLog log = new NullLog();

//Set up sessionstate
SessionState state = new SessionState(log, 1) {MessageStore = store};

Hashtable errorsTable = Hashtable.Synchronized(new Hashtable());//used in more than 1 thread at a time
Hashtable setTable = new Hashtable(1000);//only used in 1 thread at a time
Hashtable getTable = new Hashtable(1000);//only used in 1 thread at a time

//Synchronously populate 1000 messages
for (int i = 1; i < 1000; i++) {
string msg = "msg" + i;
state.Set(i, msg);
setTable[i] = msg;
}

//Simulate background sending of messages that populate into the store
AutoResetEvent setEvent = new AutoResetEvent(false);
ThreadPool.QueueUserWorkItem(delegate(object stateObject) {
AutoResetEvent internalSetEvent = (AutoResetEvent)((object[])stateObject)[0];
SessionState internalState = (SessionState)((object[])stateObject)[1];
for (int i = 1001; i < 2000; i++) {
try {
internalState.Set(i, "msg" + i);
}
catch (System.IO.IOException ex) {
errorsTable[ex.Message] = ex;
}
}

internalSetEvent.Set();
}
, new object[] { setEvent, state });

//Simulate background reading of messages from the store - like is done in a resend request answer
AutoResetEvent getEvent = new AutoResetEvent(false);
ThreadPool.QueueUserWorkItem(delegate(object stateObject){
AutoResetEvent internalGetEvent = (AutoResetEvent)((object[])stateObject)[0];
SessionState internalState = (SessionState)((object[])stateObject)[1];
for (int i = 1; i < 1000; i++) {
try {
List<string> lst = new List<string>(1);
internalState.Get(i, i, lst);
if (lst.Count == 0) {
getTable[i] = "nothing read";
}
else {
getTable[i] = lst[0];
}
}
catch (System.IO.IOException ex) {
errorsTable[ex.Message] = ex;
}
}

internalGetEvent.Set();
}
, new object[]{getEvent, state});

//wait till done and assert results
Assert.True(setEvent.WaitOne(10000), "Get or Set hung/timed out during concurrent usage");
Assert.True(getEvent.WaitOne(10000), "Get or Set hung/timed out during concurrent usage");
Assert.AreEqual(setTable, getTable, "Garbled data read in concurrent set and get (like between resendrequest and send)");
Assert.AreEqual(errorsTable.Count, 0, "IOException occured in concurrent set and get (like between resendrequest and send)");

//Tear down filestore
state.Dispose();
store.Dispose();

}

}
}