Skip to content

Commit

Permalink
Connector. Unsubscription fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
mikasoukhov committed Sep 8, 2015
1 parent eefb9d0 commit 3f83e8c
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 67 deletions.
126 changes: 107 additions & 19 deletions Algo/BasketMessageAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ public int this[IMessageAdapter adapter]
}
}

private enum SubscriptionStates
{
Subscribed,
Subscribing,
Unsubscribing,
}

private readonly SynchronizedDictionary<Tuple<SecurityId, MarketDataTypes>, SubscriptionStates> _subscriptionStates = new SynchronizedDictionary<Tuple<SecurityId, MarketDataTypes>, SubscriptionStates>();

private readonly SynchronizedPairSet<Tuple<SecurityId, MarketDataTypes>, IEnumerator<IMessageAdapter>> _subscriptionQueue = new SynchronizedPairSet<Tuple<SecurityId, MarketDataTypes>, IEnumerator<IMessageAdapter>>();
private readonly SynchronizedDictionary<long, Tuple<SecurityId, MarketDataTypes>> _subscriptionKeys = new SynchronizedDictionary<long, Tuple<SecurityId, MarketDataTypes>>();
private readonly SynchronizedDictionary<Tuple<SecurityId, MarketDataTypes>, IMessageAdapter> _subscriptions = new SynchronizedDictionary<Tuple<SecurityId, MarketDataTypes>, IMessageAdapter>();
Expand Down Expand Up @@ -205,6 +214,8 @@ private void ProcessReset(Message message)
_hearbeatAdapters.Clear();
_subscriptionQueue.Clear();
_subscriptions.Clear();
_subscriptionKeys.Clear();
_subscriptionStates.Clear();
}

/// <summary>
Expand Down Expand Up @@ -290,18 +301,54 @@ protected override void OnSendInMessage(Message message)
{
var key = Tuple.Create(mdMsg.SecurityId, mdMsg.DataType);

var state = _subscriptionStates.TryGetValue2(key);

if (mdMsg.IsSubscribe)
{
if (_subscriptionQueue.ContainsKey(key))
return;
if (state != null)
{
RaiseMarketDataMessage(null, mdMsg.OriginalTransactionId, new InvalidOperationException(state.Value.ToString()), true);
break;
}
else
_subscriptionStates.Add(key, SubscriptionStates.Subscribing);
}
else
{
var canProcess = false;

var enumerator = adapters.Cache.Cast<IMessageAdapter>().GetEnumerator();
switch (state)
{
case SubscriptionStates.Subscribed:
canProcess = true;
_subscriptionStates[key] = SubscriptionStates.Unsubscribing;
break;
case SubscriptionStates.Subscribing:
case SubscriptionStates.Unsubscribing:
RaiseMarketDataMessage(null, mdMsg.OriginalTransactionId, new InvalidOperationException(state.Value.ToString()), false);
break;
case null:
RaiseMarketDataMessage(null, mdMsg.OriginalTransactionId, null, false);
break;
default:
throw new ArgumentOutOfRangeException();
}

_subscriptionQueue.Add(key, enumerator);
if (!canProcess)
break;
}

if (mdMsg.TransactionId != 0)
_subscriptionKeys.Add(mdMsg.TransactionId, key);

if (mdMsg.IsSubscribe)
{
//if (_subscriptionQueue.ContainsKey(key))
// return;

if (mdMsg.TransactionId != 0)
_subscriptionKeys.Add(mdMsg.TransactionId, key);
var enumerator = adapters.Cache.Cast<IMessageAdapter>().GetEnumerator();

_subscriptionQueue.Add(key, enumerator);
ProcessSubscriptionAction(enumerator, mdMsg, mdMsg.TransactionId);
}
else
Expand Down Expand Up @@ -428,7 +475,8 @@ private void ProcessSubscriptionAction(IEnumerator<IMessageAdapter> enumerator,
else
_subscriptionKeys.Remove(originalTransactionId);

RaiseMarketDataMessage(null, originalTransactionId, new ArgumentException(LocalizedStrings.Str629Params.Put(key.Item1 + " " + key.Item2), "message"));
_subscriptionStates.Remove(key);
RaiseMarketDataMessage(null, originalTransactionId, new ArgumentException(LocalizedStrings.Str629Params.Put(key.Item1 + " " + key.Item2), "message"), true);
}
}

Expand All @@ -438,37 +486,77 @@ private void ProcessMarketDataMessage(IMessageAdapter adapter, MarketDataMessage
?? Tuple.Create(message.SecurityId, message.DataType);

var enumerator = _subscriptionQueue.TryGetValue(key);
var state = _subscriptionStates.TryGetValue2(key);
var error = message.Error;
var isOk = !message.IsNotSupported && error == null;

var isSubscribe = message.IsSubscribe;

if (message.Error == null)
switch (state)
{
if (message.IsNotSupported)
{
if (enumerator != null)
ProcessSubscriptionAction(enumerator, message, message.OriginalTransactionId);
else
RaiseMarketDataMessage(adapter, message.OriginalTransactionId, new InvalidOperationException(LocalizedStrings.Str633Params.Put(message.SecurityId, message.DataType)));
case SubscriptionStates.Subscribed:
break;
case SubscriptionStates.Subscribing:
isSubscribe = true;
if (isOk)
{
_subscriptions.Add(key, adapter);
_subscriptionStates[key] = SubscriptionStates.Subscribed;
}
else if (error != null)
{
_subscriptions.Remove(key);
_subscriptionStates.Remove(key);
}
break;
case SubscriptionStates.Unsubscribing:
isSubscribe = false;
_subscriptions.Remove(key);
_subscriptionStates.Remove(key);
break;
case null:
if (isOk)
{
if (message.IsSubscribe)
{
_subscriptions.Add(key, adapter);
_subscriptionStates.Add(key, SubscriptionStates.Subscribed);
break;
}
}

return;
}
_subscriptions.Remove(key);
_subscriptionStates.Remove(key);
break;
default:
throw new ArgumentOutOfRangeException();
}

if (message.IsNotSupported)
{
if (enumerator != null)
ProcessSubscriptionAction(enumerator, message, message.OriginalTransactionId);
else
{
this.AddDebugLog(LocalizedStrings.Str630Params, message.SecurityId, adapter);
if (error == null)
error = new InvalidOperationException(LocalizedStrings.Str633Params.Put(message.SecurityId, message.DataType));
}
}

_subscriptionQueue.Remove(key);
_subscriptionKeys.Remove(message.OriginalTransactionId);

RaiseMarketDataMessage(adapter, message.OriginalTransactionId, message.Error);
RaiseMarketDataMessage(adapter, message.OriginalTransactionId, error, isSubscribe);
}

private void RaiseMarketDataMessage(IMessageAdapter adapter, long originalTransactionId, Exception error)
private void RaiseMarketDataMessage(IMessageAdapter adapter, long originalTransactionId, Exception error, bool isSubscribe)
{
SendOutMessage(new MarketDataMessage
{
OriginalTransactionId = originalTransactionId,
Error = error,
Adapter = adapter,
IsSubscribe = isSubscribe,
});
}

Expand Down
52 changes: 8 additions & 44 deletions Algo/Connector_Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private void OnConnectorMarketDataSubscriptionSucceeded(Security security, Marke
{
var subscribers = GetSubscribers(type);

subscribers.ChangeSubscribers(security, 1);
subscribers.ChangeSubscribers(security, true);

var types = _unsubscribeActions.TryGetValue(security);

Expand Down Expand Up @@ -418,37 +418,18 @@ private void ConnectorOnMarketTimeChanged(TimeSpan diff)

private static bool TrySubscribe<T>(CachedSynchronizedDictionary<T, int> subscribers, T subscriber)
{
return subscribers.ChangeSubscribers(subscriber, 1) == 1;
return subscribers.ChangeSubscribers(subscriber, true) == 1;
}

private void TrySubscribe(Security subscriber, MarketDataTypes type)
{
var subscribers = GetSubscribers(type);

//Если уже выполняется поиск данного инструмента, то нет необходимости в повторном вызове OnRegisterXXX.
//Если на инструмент была подписка ранее, то просто вызываем событие SubscriptionSucceed.
bool? subscribed = false;

lock (subscribers.SyncRoot)
{
var value = subscribers.TryGetValue2(subscriber);

if (value == null)
{
subscribers[subscriber] = 0;
subscribed = null;
}

if (value > 0)
{
subscribers[subscriber] = (int)value + 1;
subscribed = true;
}
}
var subscribersCount = GetSubscribers(type).ChangeSubscribers(subscriber, true);

var securityId = _connector.GetSecurityId(subscriber);

if (subscribed == null)
if (subscribersCount == 1)
{
var lookupMessage = new SecurityLookupMessage
{
Expand All @@ -460,8 +441,7 @@ private void TrySubscribe(Security subscriber, MarketDataTypes type)
_lookupMessages.Add(lookupMessage.TransactionId, Tuple.Create(lookupMessage, subscriber, type));
_connector.LookupSecurities(lookupMessage);
}

if (subscribed == true)
else// if (subscribed == true)
{
_connector.SendOutMessage(new MarketDataMessage
{
Expand All @@ -474,29 +454,13 @@ private void TrySubscribe(Security subscriber, MarketDataTypes type)

private static bool TryUnSubscribe<T>(CachedSynchronizedDictionary<T, int> subscribers, T subscriber)
{
return subscribers.ChangeSubscribers(subscriber, -1) == 0;
return subscribers.ChangeSubscribers(subscriber, false) == 0;
}

private void TryUnSubscribe(Security subscriber, MarketDataTypes type)
{
var subscribers = GetSubscribers(type);
var subscribed = false;

lock (subscribers.SyncRoot)
{
var value = subscribers.TryGetValue2(subscriber);

if (value == 0)
_unsubscribeActions.SafeAdd(subscriber).Add(type);

if (value > 0)
subscribed = true;
}

if (!subscribed || !TryUnSubscribe(subscribers, subscriber))
return;

SendUnSubscribeMessage(subscriber, type);
if (TryUnSubscribe(GetSubscribers(type), subscriber))
SendUnSubscribeMessage(subscriber, type);
}

private void SendUnSubscribeMessage(Security subscriber, MarketDataTypes type)
Expand Down
10 changes: 8 additions & 2 deletions Algo/Helper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static Security CheckPriceStep(this Security security)
return security;
}

public static int ChangeSubscribers<T>(this CachedSynchronizedDictionary<T, int> subscribers, T subscriber, int delta)
public static int ChangeSubscribers<T>(this CachedSynchronizedDictionary<T, int> subscribers, T subscriber, bool isSubscribe)
{
if (subscribers == null)
throw new ArgumentNullException("subscribers");
Expand All @@ -60,7 +60,13 @@ public static int ChangeSubscribers<T>(this CachedSynchronizedDictionary<T, int>
{
var value = subscribers.TryGetValue2(subscriber) ?? 0;

value += delta;
if (isSubscribe)
value++;
else
{
if (value > 0)
value--;
}

if (value > 0)
subscribers[subscriber] = value;
Expand Down
4 changes: 2 additions & 2 deletions Algo/Testing/HistoryEmulationConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ void IExternalCandleSource.SubscribeCandles(CandleSeries series, DateTimeOffset
var securityId = GetSecurityId(series.Security);
var dataType = series.CandleType.ToCandleMessageType().ToCandleMarketDataType();

if (_subscribedCandles.ChangeSubscribers(Tuple.Create(securityId, dataType, series.Arg), 1) != 1)
if (_subscribedCandles.ChangeSubscribers(Tuple.Create(securityId, dataType, series.Arg), true) != 1)
return;

MarketDataAdapter.SendInMessage(new MarketDataMessage
Expand All @@ -630,7 +630,7 @@ void IExternalCandleSource.UnSubscribeCandles(CandleSeries series)
var securityId = GetSecurityId(series.Security);
var dataType = series.CandleType.ToCandleMessageType().ToCandleMarketDataType();

if (_subscribedCandles.ChangeSubscribers(Tuple.Create(securityId, dataType, series.Arg), -1) != 0)
if (_subscribedCandles.ChangeSubscribers(Tuple.Create(securityId, dataType, series.Arg), false) != 0)
return;

MarketDataAdapter.SendInMessage(new MarketDataMessage
Expand Down

0 comments on commit 3f83e8c

Please sign in to comment.