diff --git a/Sources/SwiftGraphQL/HTTP+WebSockets.swift b/Sources/SwiftGraphQL/HTTP+WebSockets.swift index 7ab0db40..45d32ae5 100644 --- a/Sources/SwiftGraphQL/HTTP+WebSockets.swift +++ b/Sources/SwiftGraphQL/HTTP+WebSockets.swift @@ -22,6 +22,10 @@ public protocol GraphQLEnabledSocket { func receiveMessages(_ handler: @escaping (Result) -> Bool) } +final class SinglePingQueueToken { + var cancelled = false +} + /// https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md public class GraphQLSocket { @@ -45,6 +49,12 @@ public class GraphQLSocket { private var decoder = JSONDecoder() private var encoder = JSONEncoder() + // Every successful ping should be matched by a successful pong + private var pingPongMatches: Int = 0 + + // we should never have more than one ping queue token + var pingQueueToken: SinglePingQueueToken? + public init(_ params: S.InitParamaters, autoConnect: Bool = false, pingInterval: TimeInterval? = nil) { self.initParams = params self.autoConnect = autoConnect @@ -80,7 +90,7 @@ public class GraphQLSocket { state = .started socket = S.create(with: initParams, errorHandler: errorHandler) socket?.send(message: messageData, errorHandler: { [weak self] in - self?.stop() + self?.restart(errorHandler: errorHandler) errorHandler(.startError(.connectionInit(error: $0))) }) socket?.receiveMessages { [weak self] (message) -> Bool in @@ -99,19 +109,32 @@ public class GraphQLSocket { self?.state = .running // If we have a time interval, set up a ping thread if let pingInterval = self?.pingInterval { + // cancel the old token + self?.pingQueueToken?.cancelled = true + let token = SinglePingQueueToken() + self?.pingQueueToken = token self?.detachedPingQueue(interval: pingInterval, errorHandler: { error in errorHandler(.pingFailed(error)) - }) + }, pingHandler: { [weak self] in + guard let self = self else { return } + self.pingPongMatches += 1 + // If we pinged 5 times without a pong, try to restart + if self.pingPongMatches > 5 { + self.pingPongMatches = 0 + self.restart(errorHandler: errorHandler) + } + }, token: token) } case .ka: self?.state = .running - case .next, .error, .complete, .connection_error, .data: + case .next, .error, .complete, .data: guard let id = message.id else { return false } self?.subscriptions[id]?(message) - case .connection_terminate: - self?.stop() + case .connection_terminate, .connection_error: + self?.restart(errorHandler: errorHandler) return true case .pong: + self?.pingPongMatches -= 1 self?.state = .running case .subscribe, .connection_init, .ping: _ = "The server will never send these messages" @@ -123,9 +146,8 @@ public class GraphQLSocket { // Should we send this error to the start errorHandler? // This could happen during the entire lifetime of the socket so // it's not really a start error - - self?.stop() - errorHandler(.startError(.connectionInit(error: failure))) + //errorHandler(.startError(.connectionInit(error: failure))) + self?.restart(errorHandler: errorHandler) return true } return false @@ -137,13 +159,18 @@ public class GraphQLSocket { private func detachedPingQueue( interval: TimeInterval, - errorHandler: @escaping (Error) -> Void + errorHandler: @escaping (Error) -> Void, + pingHandler: @escaping () -> Void, + token: SinglePingQueueToken ) { + // if there is a new token, leave here, so there's always only one ping queue + guard !token.cancelled else { + return + } if state == .notRunning { - // In this case we probably came back from the background, try to connect again - start(connectionParams: lastConnectionParams, errorHandler: errorHandler) - DispatchQueue.global(qos: .utility).asyncAfter(deadline: .now() + 2.0) { [weak self] in - self?.detachedPingQueue(interval: interval, errorHandler: errorHandler) + // Try again while we're restarting + DispatchQueue.global(qos: .utility).asyncAfter(deadline: .now() + 3.0) { [weak self] in + self?.detachedPingQueue(interval: interval, errorHandler: errorHandler, pingHandler: pingHandler, token: token) } return } @@ -155,11 +182,17 @@ public class GraphQLSocket { let message = Message.ping() let messageData = try self.encoder.encode(message) self.socket?.send(message: messageData, errorHandler: errorHandler) + pingHandler() + os_log( + "Ping", + log: OSLog.subscription, + type: .debug + ) } catch let error { errorHandler(error) } // Schedule the next - self.detachedPingQueue(interval: interval, errorHandler: errorHandler) + self.detachedPingQueue(interval: interval, errorHandler: errorHandler, pingHandler: pingHandler, token: token) } } @@ -263,6 +296,15 @@ public class GraphQLSocket { socket = nil } + /// try to restart the socket after a brief delay + public func restart(errorHandler: @escaping (SubscribeError) -> Void) { + self.state = .notRunning + let params = lastConnectionParams + DispatchQueue.global(qos: .utility).asyncAfter(deadline: .now() + 3.0) { [weak self] in + self?.start(connectionParams: params, errorHandler: errorHandler) + } + } + private func complete(id: String) { subscriptions[id] = nil let message = Message.complete(id: id)