Skip to content

Minor SignalClient improvements #662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .nanpa/minor-signalclient-improvements.kdl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="changed" "Minor SignalClient improvements"
115 changes: 77 additions & 38 deletions Sources/LiveKit/Core/SignalClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,9 @@

// MARK: - Public

public private(set) var connectionState: ConnectionState = .disconnected {
didSet {
guard connectionState != oldValue else { return }
// connectionState Updated...
log("\(oldValue) -> \(connectionState)")
public var connectionState: ConnectionState { _state.connectionState }

_delegate.notifyDetached { await $0.signalClient(self, didUpdateConnectionState: self.connectionState, oldState: oldValue, disconnectError: self.disconnectError) }
}
}

public private(set) var disconnectError: LiveKitError?
public var disconnectError: LiveKitError? { _state.disconnectError }

// MARK: - Private

Expand Down Expand Up @@ -90,18 +82,32 @@
await self._process(signalResponse: response)
})

private var _webSocket: WebSocket?
private var _messageLoopTask: Task<Void, Never>?
private var _lastJoinResponse: Livekit_JoinResponse?

private let _connectResponseCompleter = AsyncCompleter<ConnectResponse>(label: "Join response", defaultTimeout: .defaultJoinResponse)
private let _addTrackCompleters = CompleterMapActor<Livekit_TrackInfo>(label: "Completers for add track", defaultTimeout: .defaultPublish)
private let _pingIntervalTimer = AsyncTimer(interval: 1)
private let _pingTimeoutTimer = AsyncTimer(interval: 1)

struct State {
var connectionState: ConnectionState = .disconnected
var disconnectError: LiveKitError?
var socket: WebSocket?
var messageLoopTask: Task<Void, Never>?
var lastJoinResponse: Livekit_JoinResponse?
var rtt: Int64 = 0
}

private var _pingIntervalTimer = AsyncTimer(interval: 1)
private var _pingTimeoutTimer = AsyncTimer(interval: 1)
let _state = StateSync(State())

init() {
log()
_state.onDidMutate = { [weak self] newState, oldState in
guard let self else { return }
// ConnectionState
if oldState.connectionState != newState.connectionState {
self.log("\(oldState.connectionState) -> \(newState.connectionState)")
self._delegate.notifyDetached { await $0.signalClient(self, didUpdateConnectionState: newState.connectionState, oldState: oldState.connectionState, disconnectError: self.disconnectError) }
}
}
}

deinit {
Expand Down Expand Up @@ -135,12 +141,12 @@
log("Connecting with url: \(url)")
}

connectionState = (reconnectMode != nil ? .reconnecting : .connecting)
_state.mutate { $0.connectionState = (reconnectMode != nil ? .reconnecting : .connecting) }

do {
let socket = try await WebSocket(url: url, connectOptions: connectOptions)

_messageLoopTask = Task.detached {
let task = Task.detached {
self.log("Did enter WebSocket message loop...")
do {
for try await message in socket {
Expand All @@ -150,14 +156,17 @@
await self.cleanUp(withError: error)
}
}
_state.mutate { $0.messageLoopTask = task }

let connectResponse = try await _connectResponseCompleter.wait()
// Check cancellation after received join response
try Task.checkCancellation()

// Successfully connected
_webSocket = socket
connectionState = .connected
_state.mutate {
$0.socket = socket
$0.connectionState = .connected
}

return connectResponse
} catch {
Expand Down Expand Up @@ -194,24 +203,28 @@
func cleanUp(withError disconnectError: Error? = nil) async {
log("withError: \(String(describing: disconnectError))")

// Cancel ping/pong timers immediately to prevent stale timers from affecting future connections
_pingIntervalTimer.cancel()
_pingTimeoutTimer.cancel()

_messageLoopTask?.cancel()
_messageLoopTask = nil

_webSocket?.close()
_webSocket = nil
_state.mutate {
$0.messageLoopTask?.cancel()
$0.messageLoopTask = nil
$0.socket?.close()
$0.socket = nil
$0.lastJoinResponse = nil
}

_connectResponseCompleter.reset()
_lastJoinResponse = nil

await _addTrackCompleters.reset()
await _requestQueue.clear()
await _responseQueue.clear()

self.disconnectError = LiveKitError.from(error: disconnectError)
connectionState = .disconnected
_state.mutate {
$0.disconnectError = LiveKitError.from(error: disconnectError)
$0.connectionState = .disconnected
}
}
}

Expand All @@ -231,7 +244,7 @@
func _onWebSocketMessage(message: URLSessionWebSocketTask.Message) async {
let response: Livekit_SignalResponse? = {
switch message {
case let .data(data): return try? Livekit_SignalResponse(serializedData: data)

Check warning on line 247 in Sources/LiveKit/Core/SignalClient.swift

View workflow job for this annotation

GitHub Actions / Build & Test (macos-15, 16.2, macOS, true)

'init(serializedData:extensions:partial:options:)' is deprecated: replaced by 'init(serializedBytes:extensions:partial:options:)'

Check warning on line 247 in Sources/LiveKit/Core/SignalClient.swift

View workflow job for this annotation

GitHub Actions / Build & Test (macos-14, 15.4, macOS)

'init(serializedData:extensions:partial:options:)' is deprecated: replaced by 'init(serializedBytes:extensions:partial:options:)'

Check warning on line 247 in Sources/LiveKit/Core/SignalClient.swift

View workflow job for this annotation

GitHub Actions / Build & Test (macos-14, 15.4, macOS,variant=Mac Catalyst)

'init(serializedData:extensions:partial:options:)' is deprecated: replaced by 'init(serializedBytes:extensions:partial:options:)'

Check warning on line 247 in Sources/LiveKit/Core/SignalClient.swift

View workflow job for this annotation

GitHub Actions / Build & Test (macos-14, 15.4, tvOS Simulator,name=Apple TV)

'init(serializedData:extensions:partial:options:)' is deprecated: replaced by 'init(serializedBytes:extensions:partial:options:)'

Check warning on line 247 in Sources/LiveKit/Core/SignalClient.swift

View workflow job for this annotation

GitHub Actions / Build & Test (macos-14, 15.4, visionOS Simulator,name=Apple Vision Pro)

'init(serializedData:extensions:partial:options:)' is deprecated: replaced by 'init(serializedBytes:extensions:partial:options:)'

Check warning on line 247 in Sources/LiveKit/Core/SignalClient.swift

View workflow job for this annotation

GitHub Actions / Build & Test (macos-14, 15.4, iOS Simulator,OS=17.5,name=iPhone 15 Pro)

'init(serializedData:extensions:partial:options:)' is deprecated: replaced by 'init(serializedBytes:extensions:partial:options:)'
case let .string(string): return try? Livekit_SignalResponse(jsonString: string)
default: return nil
}
Expand Down Expand Up @@ -267,7 +280,7 @@

switch message {
case let .join(joinResponse):
_lastJoinResponse = joinResponse
_state.mutate { $0.lastJoinResponse = joinResponse }
_delegate.notifyDetached { await $0.signalClient(self, didReceiveConnectResponse: .join(joinResponse)) }
_connectResponseCompleter.resume(returning: .join(joinResponse))
await _restartPingTimer()
Expand Down Expand Up @@ -333,8 +346,8 @@
case let .pong(r):
await _onReceivedPong(r)

case .pongResp:
log("Received pongResp message")
case let .pongResp(pongResp):
await _onReceivedPongResp(pongResp)

case .subscriptionResponse:
log("Received subscriptionResponse message")
Expand Down Expand Up @@ -583,19 +596,36 @@
}

private func _sendPing() async throws {
let r = Livekit_SignalRequest.with {
$0.ping = Int64(Date().timeIntervalSince1970)
let timestamp = Int64(Date().timeIntervalSince1970 * 1000) // Convert to milliseconds
let rtt = _state.read { $0.rtt }

// Send both ping and pingReq for compatibility with older and newer servers
let pingRequest = Livekit_SignalRequest.with {
$0.ping = timestamp
}

try await _sendRequest(r)
// Include the current RTT value in pingReq to report back to server
let pingReqRequest = Livekit_SignalRequest.with {
$0.pingReq = Livekit_Ping.with {
$0.timestamp = timestamp
$0.rtt = rtt // Send current RTT back to server
}
}

// Log timestamp and RTT for debugging
log("Sending ping with timestamp: \(timestamp)ms, reporting RTT: \(rtt)ms", .trace)

// Send both requests
try await _sendRequest(pingRequest)
try await _sendRequest(pingReqRequest)
}
}

// MARK: - Server ping/pong logic

private extension SignalClient {
func _onPingIntervalTimer() async throws {
guard let jr = _lastJoinResponse else { return }
guard let jr = _state.lastJoinResponse else { return }
log("ping/pong sending ping...", .trace)
try await _sendPing()

Expand All @@ -606,7 +636,7 @@
await self.cleanUp(withError: LiveKitError(.serverPingTimedOut))
}

_pingTimeoutTimer.startIfStopped()
_pingTimeoutTimer.restart()
}

func _onReceivedPong(_: Int64) async {
Expand All @@ -615,13 +645,22 @@
_pingTimeoutTimer.cancel()
}

func _onReceivedPongResp(_ pongResp: Livekit_Pong) async {
let currentTimeMs = Int64(Date().timeIntervalSince1970 * 1000)
let rtt = currentTimeMs - pongResp.lastPingTimestamp
_state.mutate { $0.rtt = rtt }
log("ping/pong received pongResp from server with RTT: \(rtt)ms", .trace)
// Clear timeout timer
_pingTimeoutTimer.cancel()
}

func _restartPingTimer() async {
// Always cancel first...
_pingIntervalTimer.cancel()
_pingTimeoutTimer.cancel()

// Check previously received joinResponse
guard let jr = _lastJoinResponse,
guard let jr = _state.lastJoinResponse,
// Check if server supports ping/pong
jr.pingTimeout > 0,
jr.pingInterval > 0 else { return }
Expand Down Expand Up @@ -649,7 +688,7 @@

private extension SignalClient {
func requireWebSocket() async throws -> WebSocket {
guard let result = _webSocket else {
guard let result = _state.socket else {
log("WebSocket is nil", .error)
throw LiveKitError(.invalidState, message: "WebSocket is nil")
}
Expand Down
5 changes: 0 additions & 5 deletions Sources/LiveKit/Support/AsyncTimer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,4 @@ final class AsyncTimer: Sendable, Loggable {

Task { await scheduleNextInvocation() }
}

func startIfStopped() {
if _state.isStarted { return }
restart()
}
}
Loading