Skip to content

Commit

Permalink
publish with ack
Browse files Browse the repository at this point in the history
  • Loading branch information
flypaper0 committed Jan 17, 2024
1 parent 42ab7ce commit 3e5add4
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 5 deletions.
39 changes: 34 additions & 5 deletions Sources/WalletConnectRelay/RelayClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public final class RelayClient {
subscriptionResponsePublisherSubject.eraseToAnyPublisher()
}

private let requestAcknowledgePublisherSubject = PassthroughSubject<RPCID?, Never>()
private var requestAcknowledgePublisher: AnyPublisher<RPCID?, Never> {
requestAcknowledgePublisherSubject.eraseToAnyPublisher()
}

private let clientIdStorage: ClientIdStoring

private var dispatcher: Dispatching
Expand Down Expand Up @@ -86,13 +91,35 @@ public final class RelayClient {
try dispatcher.disconnect(closeCode: closeCode)
}

/// Completes when networking client sends a request, error if it fails on client side
/// Completes with an acknowledgement from the relay network
public func publish(topic: String, payload: String, tag: Int, prompt: Bool, ttl: Int) async throws {
let request = Publish(params: .init(topic: topic, message: payload, ttl: ttl, prompt: prompt, tag: tag))
.asRPCRequest()
let request = Publish(params: .init(topic: topic, message: payload, ttl: ttl, prompt: prompt, tag: tag)).asRPCRequest()
let message = try request.asJSONEncodedString()
logger.debug("Publishing payload on topic: \(topic)")

logger.debug("[Publish] Sending payload on topic: \(topic)")

try await dispatcher.protectedSend(message)

return try await withUnsafeThrowingContinuation { continuation in
var cancellable: AnyCancellable?
cancellable = requestAcknowledgePublisher
.filter { $0 == request.id }
.setFailureType(to: RelayError.self)
.timeout(.seconds(10), scheduler: concurrentQueue, customError: { .requestTimeout })
.sink(receiveCompletion: { [unowned self] result in
switch result {
case .failure(let error):
cancellable?.cancel()
logger.debug("[Publish] Relay request timeout for topic: \(topic)")
continuation.resume(throwing: error)
case .finished: break
}
}, receiveValue: { [unowned self] _ in
cancellable?.cancel()
logger.debug("[Publish] Published payload on topic: \(topic)")
continuation.resume(returning: ())
})
}
}

public func subscribe(topic: String) async throws {
Expand Down Expand Up @@ -204,7 +231,9 @@ public final class RelayClient {
} else if let response = tryDecode(RPCResponse.self, from: payload) {
switch response.outcome {
case .response(let anyCodable):
if let subscriptionId = try? anyCodable.get(String.self) {
if let _ = try? anyCodable.get(Bool.self) {
requestAcknowledgePublisherSubject.send(response.id)
} else if let subscriptionId = try? anyCodable.get(String.self) {
subscriptionResponsePublisherSubject.send((response.id, [subscriptionId]))
} else if let subscriptionIds = try? anyCodable.get([String].self) {
subscriptionResponsePublisherSubject.send((response.id, subscriptionIds))
Expand Down
16 changes: 16 additions & 0 deletions Sources/WalletConnectRelay/RelayError.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import Foundation

enum RelayError: Error, LocalizedError {
case requestTimeout

var errorDescription: String? {
return localizedDescription
}

var localizedDescription: String {
switch self {
case .requestTimeout:
return "Relay request timeout"
}
}
}

0 comments on commit 3e5add4

Please sign in to comment.