From 4df3bb54c7d9e7cdf30dbeecc128dac9c5f7bbdb Mon Sep 17 00:00:00 2001 From: Bartosz Rozwarski Date: Thu, 22 Aug 2024 12:50:54 +0200 Subject: [PATCH] resubscribe after disconnecting socket --- Sources/WalletConnectRelay/RelayClient.swift | 15 ++++++++++- .../AutomaticSocketConnectionHandler.swift | 2 ++ .../SubscriptionsTracker.swift | 27 ++++++++++++++----- ...thResponseTopicResubscriptionService.swift | 17 +++++------- .../Engine/Common/SessionEngine.swift | 17 +++++------- 5 files changed, 48 insertions(+), 30 deletions(-) diff --git a/Sources/WalletConnectRelay/RelayClient.swift b/Sources/WalletConnectRelay/RelayClient.swift index 5212b8ccb..a3088bca9 100644 --- a/Sources/WalletConnectRelay/RelayClient.swift +++ b/Sources/WalletConnectRelay/RelayClient.swift @@ -47,6 +47,7 @@ public final class RelayClient { private var requestAcknowledgePublisher: AnyPublisher { requestAcknowledgePublisherSubject.eraseToAnyPublisher() } + private var publishers = [AnyCancellable]() private let clientIdStorage: ClientIdStoring @@ -77,15 +78,27 @@ public final class RelayClient { self.clientIdStorage = clientIdStorage self.subscriptionsTracker = subscriptionsTracker setUpBindings() + setupConnectionSubscriptions() } - private func setUpBindings() { dispatcher.onMessage = { [weak self] payload in self?.handlePayloadMessage(payload) } } + private func setupConnectionSubscriptions() { + socketConnectionStatusPublisher + .sink { [unowned self] status in + guard status == .connected else { return } + let topics = subscriptionsTracker.getTopics() + Task(priority: .high) { + try await batchSubscribe(topics: topics) + } + } + .store(in: &publishers) + } + public func setLogging(level: LoggingLevel) { logger.setLogging(level: level) } diff --git a/Sources/WalletConnectRelay/SocketConnectionHandler/AutomaticSocketConnectionHandler.swift b/Sources/WalletConnectRelay/SocketConnectionHandler/AutomaticSocketConnectionHandler.swift index 0e7b982bf..6727c373b 100644 --- a/Sources/WalletConnectRelay/SocketConnectionHandler/AutomaticSocketConnectionHandler.swift +++ b/Sources/WalletConnectRelay/SocketConnectionHandler/AutomaticSocketConnectionHandler.swift @@ -96,7 +96,9 @@ class AutomaticSocketConnectionHandler { } } + func reconnectIfNeeded() { + // Check if client has active subscriptions and only then subscribe if !socket.isConnected && subscriptionsTracker.isSubscribed() { connect() diff --git a/Sources/WalletConnectRelay/SubscriptionsTracker.swift b/Sources/WalletConnectRelay/SubscriptionsTracker.swift index 71aaeebf5..2684de202 100644 --- a/Sources/WalletConnectRelay/SubscriptionsTracker.swift +++ b/Sources/WalletConnectRelay/SubscriptionsTracker.swift @@ -5,6 +5,7 @@ protocol SubscriptionsTracking { func getSubscription(for topic: String) -> String? func removeSubscription(for topic: String) func isSubscribed() -> Bool + func getTopics() -> [String] } public final class SubscriptionsTracker: SubscriptionsTracking { @@ -12,32 +13,40 @@ public final class SubscriptionsTracker: SubscriptionsTracking { private let concurrentQueue = DispatchQueue(label: "com.walletconnect.sdk.subscriptions_tracker", attributes: .concurrent) func setSubscription(for topic: String, id: String) { - concurrentQueue.async(flags: .barrier) { + concurrentQueue.async(flags: .barrier) { [unowned self] in self.subscriptions[topic] = id } } func getSubscription(for topic: String) -> String? { var result: String? - concurrentQueue.sync { - result = self.subscriptions[topic] + concurrentQueue.sync { [unowned self] in + result = subscriptions[topic] } return result } func removeSubscription(for topic: String) { - concurrentQueue.async(flags: .barrier) { - self.subscriptions[topic] = nil + concurrentQueue.async(flags: .barrier) { [unowned self] in + subscriptions[topic] = nil } } func isSubscribed() -> Bool { var result = false - concurrentQueue.sync { - result = !self.subscriptions.isEmpty + concurrentQueue.sync { [unowned self] in + result = !subscriptions.isEmpty } return result } + + func getTopics() -> [String] { + var topics: [String] = [] + concurrentQueue.sync { [unowned self] in + topics = Array(subscriptions.keys) + } + return topics + } } #if DEBUG @@ -65,5 +74,9 @@ final class SubscriptionsTrackerMock: SubscriptionsTracking { subscriptions.removeAll() isSubscribedReturnValue = false } + + func getTopics() -> [String] { + return Array(subscriptions.keys) + } } #endif diff --git a/Sources/WalletConnectSign/Auth/Services/AuthResponseTopicResubscriptionService.swift b/Sources/WalletConnectSign/Auth/Services/AuthResponseTopicResubscriptionService.swift index 4d8af4005..e17ef6426 100644 --- a/Sources/WalletConnectSign/Auth/Services/AuthResponseTopicResubscriptionService.swift +++ b/Sources/WalletConnectSign/Auth/Services/AuthResponseTopicResubscriptionService.swift @@ -30,19 +30,14 @@ class AuthResponseTopicResubscriptionService { self.logger = logger self.authResponseTopicRecordsStore = authResponseTopicRecordsStore cleanExpiredRecordsIfNeeded() - setupConnectionSubscriptions() + subscribeResponsTopics() } - func setupConnectionSubscriptions() { - networkingInteractor.socketConnectionStatusPublisher - .sink { [unowned self] status in - guard status == .connected else { return } - let topics = authResponseTopicRecordsStore.getAll().map{$0.topic} - Task(priority: .high) { - try await networkingInteractor.batchSubscribe(topics: topics) - } - } - .store(in: &publishers) + func subscribeResponsTopics() { + let topics = authResponseTopicRecordsStore.getAll().map{$0.topic} + Task(priority: .background) { + try await networkingInteractor.batchSubscribe(topics: topics) + } } func cleanExpiredRecordsIfNeeded() { diff --git a/Sources/WalletConnectSign/Engine/Common/SessionEngine.swift b/Sources/WalletConnectSign/Engine/Common/SessionEngine.swift index a18c922c8..7bba80c19 100644 --- a/Sources/WalletConnectSign/Engine/Common/SessionEngine.swift +++ b/Sources/WalletConnectSign/Engine/Common/SessionEngine.swift @@ -45,7 +45,7 @@ final class SessionEngine { self.sessionRequestsProvider = sessionRequestsProvider self.invalidRequestsSanitiser = invalidRequestsSanitiser - setupConnectionSubscriptions() + subscribeActiveSessions() setupRequestSubscriptions() setupResponseSubscriptions() setupUpdateSubscriptions() @@ -88,16 +88,11 @@ final class SessionEngine { private extension SessionEngine { - func setupConnectionSubscriptions() { - networkingInteractor.socketConnectionStatusPublisher - .sink { [unowned self] status in - guard status == .connected else { return } - let topics = sessionStore.getAll().map{$0.topic} - Task(priority: .high) { - try await networkingInteractor.batchSubscribe(topics: topics) - } - } - .store(in: &publishers) + func subscribeActiveSessions() { + let topics = sessionStore.getAll().map{$0.topic} + Task(priority: .background) { + try await networkingInteractor.batchSubscribe(topics: topics) + } } func setupRequestSubscriptions() {