Skip to content

Commit

Permalink
First attempt at making UDPServer thread safe
Browse files Browse the repository at this point in the history
- Created private properties for each of the items available publicy and potentially used from a different thread (.main). These properties are then computed on the servers queue syncronously from the original properties, hopefully forcing thread safety and removing a race condition and probaby a data race...

Signed-off-by: Sam Smallman <srsmallman@mac.com>

Add thread safety mechanism to UdpServer delegate

Signed-off-by: Sam Smallman <srsmallman@mac.com>

Tidy

Signed-off-by: Sam Smallman <srsmallman@mac.com>
  • Loading branch information
sammysmallman committed Feb 3, 2022
1 parent 8657543 commit 0ae6f86
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 33 deletions.
92 changes: 92 additions & 0 deletions .swiftpm/xcode/xcshareddata/xcschemes/OSCKit.xcscheme
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
<?xml version="1.0" encoding="UTF-8"?>
<Scheme
LastUpgradeVersion = "1320"
version = "1.3">
<BuildAction
parallelizeBuildables = "YES"
buildImplicitDependencies = "YES">
<BuildActionEntries>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "YES"
buildForArchiving = "YES"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "OSCKit"
BuildableName = "OSCKit"
BlueprintName = "OSCKit"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
<BuildActionEntry
buildForTesting = "YES"
buildForRunning = "YES"
buildForProfiling = "NO"
buildForArchiving = "NO"
buildForAnalyzing = "YES">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "OSCKitTests"
BuildableName = "OSCKitTests"
BlueprintName = "OSCKitTests"
ReferencedContainer = "container:">
</BuildableReference>
</BuildActionEntry>
</BuildActionEntries>
</BuildAction>
<TestAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES">
<Testables>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "OSCKitTests"
BuildableName = "OSCKitTests"
BlueprintName = "OSCKitTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
</Testables>
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
enableThreadSanitizer = "YES"
launchStyle = "0"
useCustomWorkingDirectory = "NO"
ignoresPersistentStateOnLaunch = "NO"
debugDocumentVersioning = "YES"
debugServiceExtension = "internal"
allowLocationSimulation = "YES">
</LaunchAction>
<ProfileAction
buildConfiguration = "Release"
shouldUseLaunchSchemeArgsEnv = "YES"
savedToolIdentifier = ""
useCustomWorkingDirectory = "NO"
debugDocumentVersioning = "YES">
<MacroExpansion>
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "OSCKit"
BuildableName = "OSCKit"
BlueprintName = "OSCKit"
ReferencedContainer = "container:">
</BuildableReference>
</MacroExpansion>
</ProfileAction>
<AnalyzeAction
buildConfiguration = "Debug">
</AnalyzeAction>
<ArchiveAction
buildConfiguration = "Release"
revealArchiveInOrganizer = "YES">
</ArchiveAction>
</Scheme>
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version:5.3
// swift-tools-version:5.5
import PackageDescription

let package = Package(
Expand Down
4 changes: 2 additions & 2 deletions Sources/OSCKit/OSCTcp.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import CocoaAsyncSocket
import CoreOSC

/// The possible stream framing of TCP data.
/// - .SLIP - [Serial Line IP ](http://www.rfc-editor.org/rfc/rfc1055.txt)
/// - .SLIP - [Serial Line IP](http://www.rfc-editor.org/rfc/rfc1055.txt)
/// - .PLH - Packet Length Headers
public enum OSCTcpStreamFraming: Int, Codable {
/// [Serial Line IP ](http://www.rfc-editor.org/rfc/rfc1055.txt) TCP stream framing.
/// [Serial Line IP](http://www.rfc-editor.org/rfc/rfc1055.txt) TCP stream framing.
case SLIP = 0
/// Packet Length Header TCP stream framing.
case PLH
Expand Down
4 changes: 2 additions & 2 deletions Sources/OSCKit/OSCTcpClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public class OSCTcpClient: NSObject {
/// - Parameter packet: The packet to be sent, either an `OSCMessage` or `OSCBundle`.
/// - Throws: An error if the client is not already connected and connecting causes an error.
///
/// If the client is not already connected to a server `connect()` will be called first.
/// If the client is not already connected to a server `connect()` will be called first.
public func send(_ packet: OSCPacket) throws {
try connect()
guard isConnected else { return }
Expand All @@ -228,7 +228,7 @@ public class OSCTcpClient: NSObject {
/// - Throws: An error if a packet can't be parsed from the data or if the client is not
/// already connected and connecting causes an error.
///
/// If the client is not already connected to a server `connect()` will be called first.
/// If the client is not already connected to a server `connect()` will be called first.
public func send(_ data: Data) throws {
try connect()
let packet = try OSCParser.packet(from: data)
Expand Down
2 changes: 1 addition & 1 deletion Sources/OSCKit/OSCTcpServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public class OSCTcpServer: NSObject {
configInterface.isEmpty == false {
self.interface = configInterface
} else {
interface = nil
self.interface = nil
}
port = configuration.port
self.delegate = delegate
Expand Down
88 changes: 66 additions & 22 deletions Sources/OSCKit/OSCUdpServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,23 @@ public class OSCUdpServer: NSObject {
/// A `Set` of multicast groups that should be joined automatically when the server starts listening.
public var multicastGroups: Set<String>

/// The private real `joinedMulticastGroups` property to force thread safety.
private var _joinedMulticastGroups: Set<String> = []
/// A `Set` of multicast groups that have been joined by the server.
public private(set) var joinedMulticastGroups: Set<String> = []
public var joinedMulticastGroups: Set<String> {
queue.sync {
_joinedMulticastGroups
}
}

/// The private real `isListening` property to force thread safety.
private var _isListening: Bool = false
/// A boolean value that indicates whether the server is listening for OSC packets.
public private(set) var isListening: Bool = false
public var isListening: Bool {
queue.sync {
_isListening
}
}

/// The interface may be a name (e.g. "en1" or "lo0") or the corresponding IP address (e.g. "192.168.1.15").
/// If the value of this is nil the server will listen on all interfaces.
Expand All @@ -96,17 +108,36 @@ public class OSCUdpServer: NSObject {
}
}

/// The private real `reusePort` property to force thread safety.
private var _reusePort: Bool = false
/// A boolean value that indicates whether the servers socket has been enabled
/// to allow for multiple processes to simultaneously bind to the same port.
public private(set) var reusePort: Bool = false
public var reusePort: Bool {
queue.sync {
_reusePort
}
}

/// The dispatch queue that the server executes all delegate callbacks on.
private let queue: DispatchQueue

/// The private real `delegate` property to force thread safety.
public weak var _delegate: OSCUdpServerDelegate?
/// The servers delegate.
///
/// The delegate must conform to the `OSCUdpServerDelegate` protocol.
public weak var delegate: OSCUdpServerDelegate?
public weak var delegate: OSCUdpServerDelegate? {
get {
queue.sync {
_delegate
}
}
set {
queue.sync {
_delegate = newValue
}
}
}

/// An OSC UDP Server.
/// - Parameters:
Expand All @@ -121,11 +152,11 @@ public class OSCUdpServer: NSObject {
configInterface.isEmpty == false {
self.interface = configInterface
} else {
interface = nil
self.interface = nil
}
port = configuration.port
multicastGroups = configuration.multicastGroups
self.delegate = delegate
self._delegate = delegate
self.queue = queue
super.init()
socket.setDelegate(self, delegateQueue: queue)
Expand Down Expand Up @@ -153,7 +184,6 @@ public class OSCUdpServer: NSObject {
}

deinit {
joinedMulticastGroups.forEach { try? leave(multicastGroup: $0) }
stopListening()
socket.synchronouslySetDelegate(nil)
}
Expand All @@ -176,18 +206,24 @@ public class OSCUdpServer: NSObject {
guard isListening == false else { return }
try socket.bind(toPort: port, interface: interface)
try socket.beginReceiving()
isListening = true
queue.async {
self._isListening = true
}
for multicastGroup in multicastGroups {
do {
try join(multicastGroup: multicastGroup)
joinedMulticastGroups.insert(multicastGroup)
queue.async {
self._joinedMulticastGroups.insert(multicastGroup)
}
} catch {
// The error here is purposefully not thrown.
// The purpose of this function is to set the server listening
// and the automation of joining multicast groups is a bonus.
// joinedMulticastGroups can be used after startListening() has
// been called to work out which groups have been joined or not.
joinedMulticastGroups.remove(multicastGroup)
queue.async {
self._joinedMulticastGroups.remove(multicastGroup)
}
}
}
}
Expand All @@ -199,7 +235,9 @@ public class OSCUdpServer: NSObject {
guard isListening else { return }
joinedMulticastGroups.forEach { try? leave(multicastGroup: $0) }
socket.close()
joinedMulticastGroups.removeAll()
queue.async {
self._joinedMulticastGroups.removeAll()
}
}

// MARK: Reuse Port
Expand All @@ -216,7 +254,9 @@ public class OSCUdpServer: NSObject {
public func enableReusePort(_ flag: Bool) throws {
stopListening()
try socket.enableBroadcast(flag)
reusePort = flag
queue.async {
self._reusePort = flag
}
}

// MARK: Multicasting
Expand All @@ -230,7 +270,9 @@ public class OSCUdpServer: NSObject {
public func join(multicastGroup: String) throws {
if isListening {
try socket.joinMulticastGroup(multicastGroup, onInterface: interface)
joinedMulticastGroups.insert(multicastGroup)
queue.async {
self._joinedMulticastGroups.insert(multicastGroup)
}
} else {
throw OSCUdpServerError.serverSocketIsNotBound
}
Expand All @@ -246,7 +288,9 @@ public class OSCUdpServer: NSObject {
if isListening {
if joinedMulticastGroups.contains(multicastGroup) {
try socket.leaveMulticastGroup(multicastGroup, onInterface: interface)
joinedMulticastGroups.remove(multicastGroup)
queue.async {
self._joinedMulticastGroups.remove(multicastGroup)
}
} else {
throw OSCUdpServerError.multicastGroupNotJoined
}
Expand All @@ -267,27 +311,27 @@ extension OSCUdpServer: GCDAsyncUdpSocketDelegate {
guard let host = GCDAsyncUdpSocket.host(fromAddress: address) else { return }
do {
let packet = try OSCParser.packet(from: data)
delegate?.server(self,
_delegate?.server(self,
didReceivePacket: packet,
fromHost: host,
port: GCDAsyncUdpSocket.port(fromAddress: address))
} catch {
delegate?.server(self,
_delegate?.server(self,
didReadData: data,
with: error)
}
if !isListening {
isListening = true
if !_isListening {
_isListening = true
}
}

public func udpSocketDidClose(_ sock: GCDAsyncUdpSocket,
withError error: Error?) {
isListening = false
if joinedMulticastGroups.isEmpty == false {
joinedMulticastGroups.removeAll()
_isListening = false
if _joinedMulticastGroups.isEmpty == false {
_joinedMulticastGroups.removeAll()
}
delegate?.server(self, socketDidCloseWithError: error)
_delegate?.server(self, socketDidCloseWithError: error)
}

}
8 changes: 8 additions & 0 deletions Tests/OSCKitTests/File.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//
// File.swift
//
//
// Created by Sam Smallman on 21/01/2022.
//

import Foundation
5 changes: 0 additions & 5 deletions Tests/OSCKitTests/OSCTcpStreamFramingTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ import XCTest

final class OSCTcpStreamFramingTests: XCTestCase {

static var allTests = [
("testSLIP", testSLIP),
("testPLH", testPLH)
]

func testSLIP() {
let slip: Int = 0
let streamFraming = OSCTcpStreamFraming(rawValue: slip)
Expand Down

0 comments on commit 0ae6f86

Please sign in to comment.