Skip to content

Commit

Permalink
resubscribe after disconnecting socket
Browse files Browse the repository at this point in the history
  • Loading branch information
llbartekll committed Aug 22, 2024
1 parent 83b3bea commit 4df3bb5
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 30 deletions.
15 changes: 14 additions & 1 deletion Sources/WalletConnectRelay/RelayClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public final class RelayClient {
private var requestAcknowledgePublisher: AnyPublisher<RPCID?, Never> {
requestAcknowledgePublisherSubject.eraseToAnyPublisher()
}
private var publishers = [AnyCancellable]()

private let clientIdStorage: ClientIdStoring

Expand Down Expand Up @@ -77,15 +78,27 @@ public final class RelayClient {
self.clientIdStorage = clientIdStorage
self.subscriptionsTracker = subscriptionsTracker
setUpBindings()
setupConnectionSubscriptions()
}


private func setUpBindings() {
dispatcher.onMessage = { [weak self] payload in
self?.handlePayloadMessage(payload)
}
}

private func setupConnectionSubscriptions() {
socketConnectionStatusPublisher
.sink { [unowned self] status in
guard status == .connected else { return }
let topics = subscriptionsTracker.getTopics()
Task(priority: .high) {
try await batchSubscribe(topics: topics)
}
}
.store(in: &publishers)
}

public func setLogging(level: LoggingLevel) {
logger.setLogging(level: level)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ class AutomaticSocketConnectionHandler {
}
}


func reconnectIfNeeded() {

// Check if client has active subscriptions and only then subscribe
if !socket.isConnected && subscriptionsTracker.isSubscribed() {
connect()
Expand Down
27 changes: 20 additions & 7 deletions Sources/WalletConnectRelay/SubscriptionsTracker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,48 @@ protocol SubscriptionsTracking {
func getSubscription(for topic: String) -> String?
func removeSubscription(for topic: String)
func isSubscribed() -> Bool
func getTopics() -> [String]
}

public final class SubscriptionsTracker: SubscriptionsTracking {
private var subscriptions: [String: String] = [:]
private let concurrentQueue = DispatchQueue(label: "com.walletconnect.sdk.subscriptions_tracker", attributes: .concurrent)

func setSubscription(for topic: String, id: String) {
concurrentQueue.async(flags: .barrier) {
concurrentQueue.async(flags: .barrier) { [unowned self] in
self.subscriptions[topic] = id
}
}

func getSubscription(for topic: String) -> String? {
var result: String?
concurrentQueue.sync {
result = self.subscriptions[topic]
concurrentQueue.sync { [unowned self] in
result = subscriptions[topic]
}
return result
}

func removeSubscription(for topic: String) {
concurrentQueue.async(flags: .barrier) {
self.subscriptions[topic] = nil
concurrentQueue.async(flags: .barrier) { [unowned self] in
subscriptions[topic] = nil
}
}

func isSubscribed() -> Bool {
var result = false
concurrentQueue.sync {
result = !self.subscriptions.isEmpty
concurrentQueue.sync { [unowned self] in
result = !subscriptions.isEmpty
}
return result
}

func getTopics() -> [String] {
var topics: [String] = []
concurrentQueue.sync { [unowned self] in
topics = Array(subscriptions.keys)
}
return topics
}
}

#if DEBUG
Expand Down Expand Up @@ -65,5 +74,9 @@ final class SubscriptionsTrackerMock: SubscriptionsTracking {
subscriptions.removeAll()
isSubscribedReturnValue = false
}

func getTopics() -> [String] {
return Array(subscriptions.keys)
}
}
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,14 @@ class AuthResponseTopicResubscriptionService {
self.logger = logger
self.authResponseTopicRecordsStore = authResponseTopicRecordsStore
cleanExpiredRecordsIfNeeded()
setupConnectionSubscriptions()
subscribeResponsTopics()
}

func setupConnectionSubscriptions() {
networkingInteractor.socketConnectionStatusPublisher
.sink { [unowned self] status in
guard status == .connected else { return }
let topics = authResponseTopicRecordsStore.getAll().map{$0.topic}
Task(priority: .high) {
try await networkingInteractor.batchSubscribe(topics: topics)
}
}
.store(in: &publishers)
func subscribeResponsTopics() {
let topics = authResponseTopicRecordsStore.getAll().map{$0.topic}
Task(priority: .background) {
try await networkingInteractor.batchSubscribe(topics: topics)
}
}

func cleanExpiredRecordsIfNeeded() {
Expand Down
17 changes: 6 additions & 11 deletions Sources/WalletConnectSign/Engine/Common/SessionEngine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ final class SessionEngine {
self.sessionRequestsProvider = sessionRequestsProvider
self.invalidRequestsSanitiser = invalidRequestsSanitiser

setupConnectionSubscriptions()
subscribeActiveSessions()
setupRequestSubscriptions()
setupResponseSubscriptions()
setupUpdateSubscriptions()
Expand Down Expand Up @@ -88,16 +88,11 @@ final class SessionEngine {

private extension SessionEngine {

func setupConnectionSubscriptions() {
networkingInteractor.socketConnectionStatusPublisher
.sink { [unowned self] status in
guard status == .connected else { return }
let topics = sessionStore.getAll().map{$0.topic}
Task(priority: .high) {
try await networkingInteractor.batchSubscribe(topics: topics)
}
}
.store(in: &publishers)
func subscribeActiveSessions() {
let topics = sessionStore.getAll().map{$0.topic}
Task(priority: .background) {
try await networkingInteractor.batchSubscribe(topics: topics)
}
}

func setupRequestSubscriptions() {
Expand Down

0 comments on commit 4df3bb5

Please sign in to comment.