From 0ae6f86fc17d9ccc7f16beb9dd2411c40de9f600 Mon Sep 17 00:00:00 2001 From: Sam Smallman Date: Fri, 21 Jan 2022 23:46:45 +0000 Subject: [PATCH] First attempt at making UDPServer thread safe - Created private properties for each of the items available publicy and potentially used from a different thread (.main). These properties are then computed on the servers queue syncronously from the original properties, hopefully forcing thread safety and removing a race condition and probaby a data race... Signed-off-by: Sam Smallman Add thread safety mechanism to UdpServer delegate Signed-off-by: Sam Smallman Tidy Signed-off-by: Sam Smallman --- .../xcshareddata/xcschemes/OSCKit.xcscheme | 92 +++++++++++++++++++ Package.swift | 2 +- Sources/OSCKit/OSCTcp.swift | 4 +- Sources/OSCKit/OSCTcpClient.swift | 4 +- Sources/OSCKit/OSCTcpServer.swift | 2 +- Sources/OSCKit/OSCUdpServer.swift | 88 +++++++++++++----- Tests/OSCKitTests/File.swift | 8 ++ .../OSCTcpStreamFramingTests.swift | 5 - 8 files changed, 172 insertions(+), 33 deletions(-) create mode 100644 .swiftpm/xcode/xcshareddata/xcschemes/OSCKit.xcscheme create mode 100644 Tests/OSCKitTests/File.swift diff --git a/.swiftpm/xcode/xcshareddata/xcschemes/OSCKit.xcscheme b/.swiftpm/xcode/xcshareddata/xcschemes/OSCKit.xcscheme new file mode 100644 index 0000000..1343aaa --- /dev/null +++ b/.swiftpm/xcode/xcshareddata/xcschemes/OSCKit.xcscheme @@ -0,0 +1,92 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/Package.swift b/Package.swift index d8a4481..ade2984 100644 --- a/Package.swift +++ b/Package.swift @@ -1,4 +1,4 @@ -// swift-tools-version:5.3 +// swift-tools-version:5.5 import PackageDescription let package = Package( diff --git a/Sources/OSCKit/OSCTcp.swift b/Sources/OSCKit/OSCTcp.swift index 5b829fa..b7557f8 100644 --- a/Sources/OSCKit/OSCTcp.swift +++ b/Sources/OSCKit/OSCTcp.swift @@ -29,10 +29,10 @@ import CocoaAsyncSocket import CoreOSC /// The possible stream framing of TCP data. -/// - .SLIP - [Serial Line IP ](http://www.rfc-editor.org/rfc/rfc1055.txt) +/// - .SLIP - [Serial Line IP](http://www.rfc-editor.org/rfc/rfc1055.txt) /// - .PLH - Packet Length Headers public enum OSCTcpStreamFraming: Int, Codable { - /// [Serial Line IP ](http://www.rfc-editor.org/rfc/rfc1055.txt) TCP stream framing. + /// [Serial Line IP](http://www.rfc-editor.org/rfc/rfc1055.txt) TCP stream framing. case SLIP = 0 /// Packet Length Header TCP stream framing. case PLH diff --git a/Sources/OSCKit/OSCTcpClient.swift b/Sources/OSCKit/OSCTcpClient.swift index 6eb4159..acead24 100644 --- a/Sources/OSCKit/OSCTcpClient.swift +++ b/Sources/OSCKit/OSCTcpClient.swift @@ -206,7 +206,7 @@ public class OSCTcpClient: NSObject { /// - Parameter packet: The packet to be sent, either an `OSCMessage` or `OSCBundle`. /// - Throws: An error if the client is not already connected and connecting causes an error. /// - /// If the client is not already connected to a server `connect()` will be called first. + /// If the client is not already connected to a server `connect()` will be called first. public func send(_ packet: OSCPacket) throws { try connect() guard isConnected else { return } @@ -228,7 +228,7 @@ public class OSCTcpClient: NSObject { /// - Throws: An error if a packet can't be parsed from the data or if the client is not /// already connected and connecting causes an error. /// - /// If the client is not already connected to a server `connect()` will be called first. + /// If the client is not already connected to a server `connect()` will be called first. public func send(_ data: Data) throws { try connect() let packet = try OSCParser.packet(from: data) diff --git a/Sources/OSCKit/OSCTcpServer.swift b/Sources/OSCKit/OSCTcpServer.swift index b03fc28..ef61843 100644 --- a/Sources/OSCKit/OSCTcpServer.swift +++ b/Sources/OSCKit/OSCTcpServer.swift @@ -134,7 +134,7 @@ public class OSCTcpServer: NSObject { configInterface.isEmpty == false { self.interface = configInterface } else { - interface = nil + self.interface = nil } port = configuration.port self.delegate = delegate diff --git a/Sources/OSCKit/OSCUdpServer.swift b/Sources/OSCKit/OSCUdpServer.swift index bc56b8e..314373f 100644 --- a/Sources/OSCKit/OSCUdpServer.swift +++ b/Sources/OSCKit/OSCUdpServer.swift @@ -68,11 +68,23 @@ public class OSCUdpServer: NSObject { /// A `Set` of multicast groups that should be joined automatically when the server starts listening. public var multicastGroups: Set + /// The private real `joinedMulticastGroups` property to force thread safety. + private var _joinedMulticastGroups: Set = [] /// A `Set` of multicast groups that have been joined by the server. - public private(set) var joinedMulticastGroups: Set = [] + public var joinedMulticastGroups: Set { + queue.sync { + _joinedMulticastGroups + } + } + /// The private real `isListening` property to force thread safety. + private var _isListening: Bool = false /// A boolean value that indicates whether the server is listening for OSC packets. - public private(set) var isListening: Bool = false + public var isListening: Bool { + queue.sync { + _isListening + } + } /// The interface may be a name (e.g. "en1" or "lo0") or the corresponding IP address (e.g. "192.168.1.15"). /// If the value of this is nil the server will listen on all interfaces. @@ -96,17 +108,36 @@ public class OSCUdpServer: NSObject { } } + /// The private real `reusePort` property to force thread safety. + private var _reusePort: Bool = false /// A boolean value that indicates whether the servers socket has been enabled /// to allow for multiple processes to simultaneously bind to the same port. - public private(set) var reusePort: Bool = false + public var reusePort: Bool { + queue.sync { + _reusePort + } + } /// The dispatch queue that the server executes all delegate callbacks on. private let queue: DispatchQueue + /// The private real `delegate` property to force thread safety. + public weak var _delegate: OSCUdpServerDelegate? /// The servers delegate. /// /// The delegate must conform to the `OSCUdpServerDelegate` protocol. - public weak var delegate: OSCUdpServerDelegate? + public weak var delegate: OSCUdpServerDelegate? { + get { + queue.sync { + _delegate + } + } + set { + queue.sync { + _delegate = newValue + } + } + } /// An OSC UDP Server. /// - Parameters: @@ -121,11 +152,11 @@ public class OSCUdpServer: NSObject { configInterface.isEmpty == false { self.interface = configInterface } else { - interface = nil + self.interface = nil } port = configuration.port multicastGroups = configuration.multicastGroups - self.delegate = delegate + self._delegate = delegate self.queue = queue super.init() socket.setDelegate(self, delegateQueue: queue) @@ -153,7 +184,6 @@ public class OSCUdpServer: NSObject { } deinit { - joinedMulticastGroups.forEach { try? leave(multicastGroup: $0) } stopListening() socket.synchronouslySetDelegate(nil) } @@ -176,18 +206,24 @@ public class OSCUdpServer: NSObject { guard isListening == false else { return } try socket.bind(toPort: port, interface: interface) try socket.beginReceiving() - isListening = true + queue.async { + self._isListening = true + } for multicastGroup in multicastGroups { do { try join(multicastGroup: multicastGroup) - joinedMulticastGroups.insert(multicastGroup) + queue.async { + self._joinedMulticastGroups.insert(multicastGroup) + } } catch { // The error here is purposefully not thrown. // The purpose of this function is to set the server listening // and the automation of joining multicast groups is a bonus. // joinedMulticastGroups can be used after startListening() has // been called to work out which groups have been joined or not. - joinedMulticastGroups.remove(multicastGroup) + queue.async { + self._joinedMulticastGroups.remove(multicastGroup) + } } } } @@ -199,7 +235,9 @@ public class OSCUdpServer: NSObject { guard isListening else { return } joinedMulticastGroups.forEach { try? leave(multicastGroup: $0) } socket.close() - joinedMulticastGroups.removeAll() + queue.async { + self._joinedMulticastGroups.removeAll() + } } // MARK: Reuse Port @@ -216,7 +254,9 @@ public class OSCUdpServer: NSObject { public func enableReusePort(_ flag: Bool) throws { stopListening() try socket.enableBroadcast(flag) - reusePort = flag + queue.async { + self._reusePort = flag + } } // MARK: Multicasting @@ -230,7 +270,9 @@ public class OSCUdpServer: NSObject { public func join(multicastGroup: String) throws { if isListening { try socket.joinMulticastGroup(multicastGroup, onInterface: interface) - joinedMulticastGroups.insert(multicastGroup) + queue.async { + self._joinedMulticastGroups.insert(multicastGroup) + } } else { throw OSCUdpServerError.serverSocketIsNotBound } @@ -246,7 +288,9 @@ public class OSCUdpServer: NSObject { if isListening { if joinedMulticastGroups.contains(multicastGroup) { try socket.leaveMulticastGroup(multicastGroup, onInterface: interface) - joinedMulticastGroups.remove(multicastGroup) + queue.async { + self._joinedMulticastGroups.remove(multicastGroup) + } } else { throw OSCUdpServerError.multicastGroupNotJoined } @@ -267,27 +311,27 @@ extension OSCUdpServer: GCDAsyncUdpSocketDelegate { guard let host = GCDAsyncUdpSocket.host(fromAddress: address) else { return } do { let packet = try OSCParser.packet(from: data) - delegate?.server(self, + _delegate?.server(self, didReceivePacket: packet, fromHost: host, port: GCDAsyncUdpSocket.port(fromAddress: address)) } catch { - delegate?.server(self, + _delegate?.server(self, didReadData: data, with: error) } - if !isListening { - isListening = true + if !_isListening { + _isListening = true } } public func udpSocketDidClose(_ sock: GCDAsyncUdpSocket, withError error: Error?) { - isListening = false - if joinedMulticastGroups.isEmpty == false { - joinedMulticastGroups.removeAll() + _isListening = false + if _joinedMulticastGroups.isEmpty == false { + _joinedMulticastGroups.removeAll() } - delegate?.server(self, socketDidCloseWithError: error) + _delegate?.server(self, socketDidCloseWithError: error) } } diff --git a/Tests/OSCKitTests/File.swift b/Tests/OSCKitTests/File.swift new file mode 100644 index 0000000..561adbd --- /dev/null +++ b/Tests/OSCKitTests/File.swift @@ -0,0 +1,8 @@ +// +// File.swift +// +// +// Created by Sam Smallman on 21/01/2022. +// + +import Foundation diff --git a/Tests/OSCKitTests/OSCTcpStreamFramingTests.swift b/Tests/OSCKitTests/OSCTcpStreamFramingTests.swift index 37e53fd..31fd50a 100644 --- a/Tests/OSCKitTests/OSCTcpStreamFramingTests.swift +++ b/Tests/OSCKitTests/OSCTcpStreamFramingTests.swift @@ -29,11 +29,6 @@ import XCTest final class OSCTcpStreamFramingTests: XCTestCase { - static var allTests = [ - ("testSLIP", testSLIP), - ("testPLH", testPLH) - ] - func testSLIP() { let slip: Int = 0 let streamFraming = OSCTcpStreamFraming(rawValue: slip)