Skip to content

Commit

Permalink
Merge pull request #4 from unstoppablefi/bugfix/unstoppable_socket
Browse files Browse the repository at this point in the history
multiple changes to make sure a connection_error or a connection_term…
  • Loading branch information
terhechte authored Jul 25, 2022
2 parents 566549b + fbe9a81 commit 5f688ec
Showing 1 changed file with 56 additions and 14 deletions.
70 changes: 56 additions & 14 deletions Sources/SwiftGraphQL/HTTP+WebSockets.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ public protocol GraphQLEnabledSocket {
func receiveMessages(_ handler: @escaping (Result<Data, Error>) -> Bool)
}

final class SinglePingQueueToken {
var cancelled = false
}

/// https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md
public class GraphQLSocket<S: GraphQLEnabledSocket> {

Expand All @@ -45,6 +49,12 @@ public class GraphQLSocket<S: GraphQLEnabledSocket> {
private var decoder = JSONDecoder()
private var encoder = JSONEncoder()

// Every successful ping should be matched by a successful pong
private var pingPongMatches: Int = 0

// we should never have more than one ping queue token
var pingQueueToken: SinglePingQueueToken?

public init(_ params: S.InitParamaters, autoConnect: Bool = false, pingInterval: TimeInterval? = nil) {
self.initParams = params
self.autoConnect = autoConnect
Expand Down Expand Up @@ -80,7 +90,7 @@ public class GraphQLSocket<S: GraphQLEnabledSocket> {
state = .started
socket = S.create(with: initParams, errorHandler: errorHandler)
socket?.send(message: messageData, errorHandler: { [weak self] in
self?.stop()
self?.restart(errorHandler: errorHandler)
errorHandler(.startError(.connectionInit(error: $0)))
})
socket?.receiveMessages { [weak self] (message) -> Bool in
Expand All @@ -99,19 +109,32 @@ public class GraphQLSocket<S: GraphQLEnabledSocket> {
self?.state = .running
// If we have a time interval, set up a ping thread
if let pingInterval = self?.pingInterval {
// cancel the old token
self?.pingQueueToken?.cancelled = true
let token = SinglePingQueueToken()
self?.pingQueueToken = token
self?.detachedPingQueue(interval: pingInterval, errorHandler: { error in
errorHandler(.pingFailed(error))
})
}, pingHandler: { [weak self] in
guard let self = self else { return }
self.pingPongMatches += 1
// If we pinged 5 times without a pong, try to restart
if self.pingPongMatches > 5 {
self.pingPongMatches = 0
self.restart(errorHandler: errorHandler)
}
}, token: token)
}
case .ka:
self?.state = .running
case .next, .error, .complete, .connection_error, .data:
case .next, .error, .complete, .data:
guard let id = message.id else { return false }
self?.subscriptions[id]?(message)
case .connection_terminate:
self?.stop()
case .connection_terminate, .connection_error:
self?.restart(errorHandler: errorHandler)
return true
case .pong:
self?.pingPongMatches -= 1
self?.state = .running
case .subscribe, .connection_init, .ping:
_ = "The server will never send these messages"
Expand All @@ -123,9 +146,8 @@ public class GraphQLSocket<S: GraphQLEnabledSocket> {
// Should we send this error to the start errorHandler?
// This could happen during the entire lifetime of the socket so
// it's not really a start error

self?.stop()
errorHandler(.startError(.connectionInit(error: failure)))
//errorHandler(.startError(.connectionInit(error: failure)))
self?.restart(errorHandler: errorHandler)
return true
}
return false
Expand All @@ -137,13 +159,18 @@ public class GraphQLSocket<S: GraphQLEnabledSocket> {

private func detachedPingQueue(
interval: TimeInterval,
errorHandler: @escaping (Error) -> Void
errorHandler: @escaping (Error) -> Void,
pingHandler: @escaping () -> Void,
token: SinglePingQueueToken
) {
// if there is a new token, leave here, so there's always only one ping queue
guard !token.cancelled else {
return
}
if state == .notRunning {
// In this case we probably came back from the background, try to connect again
start(connectionParams: lastConnectionParams, errorHandler: errorHandler)
DispatchQueue.global(qos: .utility).asyncAfter(deadline: .now() + 2.0) { [weak self] in
self?.detachedPingQueue(interval: interval, errorHandler: errorHandler)
// Try again while we're restarting
DispatchQueue.global(qos: .utility).asyncAfter(deadline: .now() + 3.0) { [weak self] in
self?.detachedPingQueue(interval: interval, errorHandler: errorHandler, pingHandler: pingHandler, token: token)
}
return
}
Expand All @@ -155,11 +182,17 @@ public class GraphQLSocket<S: GraphQLEnabledSocket> {
let message = Message.ping()
let messageData = try self.encoder.encode(message)
self.socket?.send(message: messageData, errorHandler: errorHandler)
pingHandler()
os_log(
"Ping",
log: OSLog.subscription,
type: .debug
)
} catch let error {
errorHandler(error)
}
// Schedule the next
self.detachedPingQueue(interval: interval, errorHandler: errorHandler)
self.detachedPingQueue(interval: interval, errorHandler: errorHandler, pingHandler: pingHandler, token: token)
}
}

Expand Down Expand Up @@ -263,6 +296,15 @@ public class GraphQLSocket<S: GraphQLEnabledSocket> {
socket = nil
}

/// try to restart the socket after a brief delay
public func restart(errorHandler: @escaping (SubscribeError) -> Void) {
self.state = .notRunning
let params = lastConnectionParams
DispatchQueue.global(qos: .utility).asyncAfter(deadline: .now() + 3.0) { [weak self] in
self?.start(connectionParams: params, errorHandler: errorHandler)
}
}

private func complete(id: String) {
subscriptions[id] = nil
let message = Message.complete(id: id)
Expand Down

0 comments on commit 5f688ec

Please sign in to comment.