diff --git a/Sources/WalletConnectRelay/RelayClient.swift b/Sources/WalletConnectRelay/RelayClient.swift index 441f40314..24cae47c0 100644 --- a/Sources/WalletConnectRelay/RelayClient.swift +++ b/Sources/WalletConnectRelay/RelayClient.swift @@ -34,6 +34,11 @@ public final class RelayClient { subscriptionResponsePublisherSubject.eraseToAnyPublisher() } + private let requestAcknowledgePublisherSubject = PassthroughSubject() + private var requestAcknowledgePublisher: AnyPublisher { + requestAcknowledgePublisherSubject.eraseToAnyPublisher() + } + private let clientIdStorage: ClientIdStoring private var dispatcher: Dispatching @@ -86,13 +91,35 @@ public final class RelayClient { try dispatcher.disconnect(closeCode: closeCode) } - /// Completes when networking client sends a request, error if it fails on client side + /// Completes with an acknowledgement from the relay network public func publish(topic: String, payload: String, tag: Int, prompt: Bool, ttl: Int) async throws { - let request = Publish(params: .init(topic: topic, message: payload, ttl: ttl, prompt: prompt, tag: tag)) - .asRPCRequest() + let request = Publish(params: .init(topic: topic, message: payload, ttl: ttl, prompt: prompt, tag: tag)).asRPCRequest() let message = try request.asJSONEncodedString() - logger.debug("Publishing payload on topic: \(topic)") + + logger.debug("[Publish] Sending payload on topic: \(topic)") + try await dispatcher.protectedSend(message) + + return try await withUnsafeThrowingContinuation { continuation in + var cancellable: AnyCancellable? + cancellable = requestAcknowledgePublisher + .filter { $0 == request.id } + .setFailureType(to: RelayError.self) + .timeout(.seconds(10), scheduler: concurrentQueue, customError: { .requestTimeout }) + .sink(receiveCompletion: { [unowned self] result in + switch result { + case .failure(let error): + cancellable?.cancel() + logger.debug("[Publish] Relay request timeout for topic: \(topic)") + continuation.resume(throwing: error) + case .finished: break + } + }, receiveValue: { [unowned self] _ in + cancellable?.cancel() + logger.debug("[Publish] Published payload on topic: \(topic)") + continuation.resume(returning: ()) + }) + } } public func subscribe(topic: String) async throws { @@ -204,7 +231,9 @@ public final class RelayClient { } else if let response = tryDecode(RPCResponse.self, from: payload) { switch response.outcome { case .response(let anyCodable): - if let subscriptionId = try? anyCodable.get(String.self) { + if let _ = try? anyCodable.get(Bool.self) { + requestAcknowledgePublisherSubject.send(response.id) + } else if let subscriptionId = try? anyCodable.get(String.self) { subscriptionResponsePublisherSubject.send((response.id, [subscriptionId])) } else if let subscriptionIds = try? anyCodable.get([String].self) { subscriptionResponsePublisherSubject.send((response.id, subscriptionIds)) diff --git a/Sources/WalletConnectRelay/RelayError.swift b/Sources/WalletConnectRelay/RelayError.swift new file mode 100644 index 000000000..39d725d7c --- /dev/null +++ b/Sources/WalletConnectRelay/RelayError.swift @@ -0,0 +1,16 @@ +import Foundation + +enum RelayError: Error, LocalizedError { + case requestTimeout + + var errorDescription: String? { + return localizedDescription + } + + var localizedDescription: String { + switch self { + case .requestTimeout: + return "Relay request timeout" + } + } +}