From 3f83e8c7cf58a472c5765998a97585445f86cbe3 Mon Sep 17 00:00:00 2001 From: mikasoukhov Date: Tue, 8 Sep 2015 23:43:13 +0300 Subject: [PATCH] Connector. Unsubscription fixes. --- Algo/BasketMessageAdapter.cs | 126 ++++++++++++++++++---- Algo/Connector_Subscription.cs | 52 ++------- Algo/Helper.cs | 10 +- Algo/Testing/HistoryEmulationConnector.cs | 4 +- 4 files changed, 125 insertions(+), 67 deletions(-) diff --git a/Algo/BasketMessageAdapter.cs b/Algo/BasketMessageAdapter.cs index fe9899fa2e..7b42b9aa44 100644 --- a/Algo/BasketMessageAdapter.cs +++ b/Algo/BasketMessageAdapter.cs @@ -94,6 +94,15 @@ public int this[IMessageAdapter adapter] } } + private enum SubscriptionStates + { + Subscribed, + Subscribing, + Unsubscribing, + } + + private readonly SynchronizedDictionary, SubscriptionStates> _subscriptionStates = new SynchronizedDictionary, SubscriptionStates>(); + private readonly SynchronizedPairSet, IEnumerator> _subscriptionQueue = new SynchronizedPairSet, IEnumerator>(); private readonly SynchronizedDictionary> _subscriptionKeys = new SynchronizedDictionary>(); private readonly SynchronizedDictionary, IMessageAdapter> _subscriptions = new SynchronizedDictionary, IMessageAdapter>(); @@ -205,6 +214,8 @@ private void ProcessReset(Message message) _hearbeatAdapters.Clear(); _subscriptionQueue.Clear(); _subscriptions.Clear(); + _subscriptionKeys.Clear(); + _subscriptionStates.Clear(); } /// @@ -290,18 +301,54 @@ protected override void OnSendInMessage(Message message) { var key = Tuple.Create(mdMsg.SecurityId, mdMsg.DataType); + var state = _subscriptionStates.TryGetValue2(key); + if (mdMsg.IsSubscribe) { - if (_subscriptionQueue.ContainsKey(key)) - return; + if (state != null) + { + RaiseMarketDataMessage(null, mdMsg.OriginalTransactionId, new InvalidOperationException(state.Value.ToString()), true); + break; + } + else + _subscriptionStates.Add(key, SubscriptionStates.Subscribing); + } + else + { + var canProcess = false; - var enumerator = adapters.Cache.Cast().GetEnumerator(); + switch (state) + { + case SubscriptionStates.Subscribed: + canProcess = true; + _subscriptionStates[key] = SubscriptionStates.Unsubscribing; + break; + case SubscriptionStates.Subscribing: + case SubscriptionStates.Unsubscribing: + RaiseMarketDataMessage(null, mdMsg.OriginalTransactionId, new InvalidOperationException(state.Value.ToString()), false); + break; + case null: + RaiseMarketDataMessage(null, mdMsg.OriginalTransactionId, null, false); + break; + default: + throw new ArgumentOutOfRangeException(); + } - _subscriptionQueue.Add(key, enumerator); + if (!canProcess) + break; + } + + if (mdMsg.TransactionId != 0) + _subscriptionKeys.Add(mdMsg.TransactionId, key); + + if (mdMsg.IsSubscribe) + { + //if (_subscriptionQueue.ContainsKey(key)) + // return; - if (mdMsg.TransactionId != 0) - _subscriptionKeys.Add(mdMsg.TransactionId, key); + var enumerator = adapters.Cache.Cast().GetEnumerator(); + _subscriptionQueue.Add(key, enumerator); ProcessSubscriptionAction(enumerator, mdMsg, mdMsg.TransactionId); } else @@ -428,7 +475,8 @@ private void ProcessSubscriptionAction(IEnumerator enumerator, else _subscriptionKeys.Remove(originalTransactionId); - RaiseMarketDataMessage(null, originalTransactionId, new ArgumentException(LocalizedStrings.Str629Params.Put(key.Item1 + " " + key.Item2), "message")); + _subscriptionStates.Remove(key); + RaiseMarketDataMessage(null, originalTransactionId, new ArgumentException(LocalizedStrings.Str629Params.Put(key.Item1 + " " + key.Item2), "message"), true); } } @@ -438,37 +486,77 @@ private void ProcessMarketDataMessage(IMessageAdapter adapter, MarketDataMessage ?? Tuple.Create(message.SecurityId, message.DataType); var enumerator = _subscriptionQueue.TryGetValue(key); + var state = _subscriptionStates.TryGetValue2(key); + var error = message.Error; + var isOk = !message.IsNotSupported && error == null; + + var isSubscribe = message.IsSubscribe; - if (message.Error == null) + switch (state) { - if (message.IsNotSupported) - { - if (enumerator != null) - ProcessSubscriptionAction(enumerator, message, message.OriginalTransactionId); - else - RaiseMarketDataMessage(adapter, message.OriginalTransactionId, new InvalidOperationException(LocalizedStrings.Str633Params.Put(message.SecurityId, message.DataType))); + case SubscriptionStates.Subscribed: + break; + case SubscriptionStates.Subscribing: + isSubscribe = true; + if (isOk) + { + _subscriptions.Add(key, adapter); + _subscriptionStates[key] = SubscriptionStates.Subscribed; + } + else if (error != null) + { + _subscriptions.Remove(key); + _subscriptionStates.Remove(key); + } + break; + case SubscriptionStates.Unsubscribing: + isSubscribe = false; + _subscriptions.Remove(key); + _subscriptionStates.Remove(key); + break; + case null: + if (isOk) + { + if (message.IsSubscribe) + { + _subscriptions.Add(key, adapter); + _subscriptionStates.Add(key, SubscriptionStates.Subscribed); + break; + } + } - return; - } + _subscriptions.Remove(key); + _subscriptionStates.Remove(key); + break; + default: + throw new ArgumentOutOfRangeException(); + } + + if (message.IsNotSupported) + { + if (enumerator != null) + ProcessSubscriptionAction(enumerator, message, message.OriginalTransactionId); else { - this.AddDebugLog(LocalizedStrings.Str630Params, message.SecurityId, adapter); + if (error == null) + error = new InvalidOperationException(LocalizedStrings.Str633Params.Put(message.SecurityId, message.DataType)); } } _subscriptionQueue.Remove(key); _subscriptionKeys.Remove(message.OriginalTransactionId); - RaiseMarketDataMessage(adapter, message.OriginalTransactionId, message.Error); + RaiseMarketDataMessage(adapter, message.OriginalTransactionId, error, isSubscribe); } - private void RaiseMarketDataMessage(IMessageAdapter adapter, long originalTransactionId, Exception error) + private void RaiseMarketDataMessage(IMessageAdapter adapter, long originalTransactionId, Exception error, bool isSubscribe) { SendOutMessage(new MarketDataMessage { OriginalTransactionId = originalTransactionId, Error = error, Adapter = adapter, + IsSubscribe = isSubscribe, }); } diff --git a/Algo/Connector_Subscription.cs b/Algo/Connector_Subscription.cs index 3943003ecb..1d38b75eb7 100644 --- a/Algo/Connector_Subscription.cs +++ b/Algo/Connector_Subscription.cs @@ -136,7 +136,7 @@ private void OnConnectorMarketDataSubscriptionSucceeded(Security security, Marke { var subscribers = GetSubscribers(type); - subscribers.ChangeSubscribers(security, 1); + subscribers.ChangeSubscribers(security, true); var types = _unsubscribeActions.TryGetValue(security); @@ -418,37 +418,18 @@ private void ConnectorOnMarketTimeChanged(TimeSpan diff) private static bool TrySubscribe(CachedSynchronizedDictionary subscribers, T subscriber) { - return subscribers.ChangeSubscribers(subscriber, 1) == 1; + return subscribers.ChangeSubscribers(subscriber, true) == 1; } private void TrySubscribe(Security subscriber, MarketDataTypes type) { - var subscribers = GetSubscribers(type); - //Если уже выполняется поиск данного инструмента, то нет необходимости в повторном вызове OnRegisterXXX. //Если на инструмент была подписка ранее, то просто вызываем событие SubscriptionSucceed. - bool? subscribed = false; - - lock (subscribers.SyncRoot) - { - var value = subscribers.TryGetValue2(subscriber); - - if (value == null) - { - subscribers[subscriber] = 0; - subscribed = null; - } - - if (value > 0) - { - subscribers[subscriber] = (int)value + 1; - subscribed = true; - } - } + var subscribersCount = GetSubscribers(type).ChangeSubscribers(subscriber, true); var securityId = _connector.GetSecurityId(subscriber); - if (subscribed == null) + if (subscribersCount == 1) { var lookupMessage = new SecurityLookupMessage { @@ -460,8 +441,7 @@ private void TrySubscribe(Security subscriber, MarketDataTypes type) _lookupMessages.Add(lookupMessage.TransactionId, Tuple.Create(lookupMessage, subscriber, type)); _connector.LookupSecurities(lookupMessage); } - - if (subscribed == true) + else// if (subscribed == true) { _connector.SendOutMessage(new MarketDataMessage { @@ -474,29 +454,13 @@ private void TrySubscribe(Security subscriber, MarketDataTypes type) private static bool TryUnSubscribe(CachedSynchronizedDictionary subscribers, T subscriber) { - return subscribers.ChangeSubscribers(subscriber, -1) == 0; + return subscribers.ChangeSubscribers(subscriber, false) == 0; } private void TryUnSubscribe(Security subscriber, MarketDataTypes type) { - var subscribers = GetSubscribers(type); - var subscribed = false; - - lock (subscribers.SyncRoot) - { - var value = subscribers.TryGetValue2(subscriber); - - if (value == 0) - _unsubscribeActions.SafeAdd(subscriber).Add(type); - - if (value > 0) - subscribed = true; - } - - if (!subscribed || !TryUnSubscribe(subscribers, subscriber)) - return; - - SendUnSubscribeMessage(subscriber, type); + if (TryUnSubscribe(GetSubscribers(type), subscriber)) + SendUnSubscribeMessage(subscriber, type); } private void SendUnSubscribeMessage(Security subscriber, MarketDataTypes type) diff --git a/Algo/Helper.cs b/Algo/Helper.cs index def8e4190e..ae699a1221 100644 --- a/Algo/Helper.cs +++ b/Algo/Helper.cs @@ -51,7 +51,7 @@ public static Security CheckPriceStep(this Security security) return security; } - public static int ChangeSubscribers(this CachedSynchronizedDictionary subscribers, T subscriber, int delta) + public static int ChangeSubscribers(this CachedSynchronizedDictionary subscribers, T subscriber, bool isSubscribe) { if (subscribers == null) throw new ArgumentNullException("subscribers"); @@ -60,7 +60,13 @@ public static int ChangeSubscribers(this CachedSynchronizedDictionary { var value = subscribers.TryGetValue2(subscriber) ?? 0; - value += delta; + if (isSubscribe) + value++; + else + { + if (value > 0) + value--; + } if (value > 0) subscribers[subscriber] = value; diff --git a/Algo/Testing/HistoryEmulationConnector.cs b/Algo/Testing/HistoryEmulationConnector.cs index 852354590c..4e6deca052 100644 --- a/Algo/Testing/HistoryEmulationConnector.cs +++ b/Algo/Testing/HistoryEmulationConnector.cs @@ -611,7 +611,7 @@ void IExternalCandleSource.SubscribeCandles(CandleSeries series, DateTimeOffset var securityId = GetSecurityId(series.Security); var dataType = series.CandleType.ToCandleMessageType().ToCandleMarketDataType(); - if (_subscribedCandles.ChangeSubscribers(Tuple.Create(securityId, dataType, series.Arg), 1) != 1) + if (_subscribedCandles.ChangeSubscribers(Tuple.Create(securityId, dataType, series.Arg), true) != 1) return; MarketDataAdapter.SendInMessage(new MarketDataMessage @@ -630,7 +630,7 @@ void IExternalCandleSource.UnSubscribeCandles(CandleSeries series) var securityId = GetSecurityId(series.Security); var dataType = series.CandleType.ToCandleMessageType().ToCandleMarketDataType(); - if (_subscribedCandles.ChangeSubscribers(Tuple.Create(securityId, dataType, series.Arg), -1) != 0) + if (_subscribedCandles.ChangeSubscribers(Tuple.Create(securityId, dataType, series.Arg), false) != 0) return; MarketDataAdapter.SendInMessage(new MarketDataMessage