Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions Sources/EventSource/EventParser.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
import Foundation

public protocol EventParser: Sendable {
func parse(_ data: Data) -> [EVEvent]
mutating func parse(_ data: Data) -> [EVEvent]
}

/// ``ServerEventParser`` is used to parse text data into ``ServerEvent``.
public struct ServerEventParser: EventParser {
let mode: EventSource.Mode
struct ServerEventParser: EventParser {
private let mode: EventSource.Mode
private var buffer = Data()

init(mode: EventSource.Mode = .default) {
self.mode = mode
Expand All @@ -23,20 +24,37 @@ public struct ServerEventParser: EventParser {
static let lf: UInt8 = 0x0A
static let colon: UInt8 = 0x3A

public func parse(_ data: Data) -> [EVEvent] {
// Split message with double newline
let rawMessages: [Data]
if #available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *) {
rawMessages = data.split(separator: [Self.lf, Self.lf])
} else {
rawMessages = data.split(by: [Self.lf, Self.lf])
}
mutating func parse(_ data: Data) -> [EVEvent] {
let (separatedMessages, remainingData) = splitBuffer(for: buffer + data)
buffer = remainingData
return parseBuffer(for: separatedMessages)
}

private func parseBuffer(for rawMessages: [Data]) -> [EVEvent] {
// Parse data to ServerMessage model
let messages: [ServerEvent] = rawMessages.compactMap { ServerEvent.parse(from: $0, mode: mode) }

return messages
}

private func splitBuffer(for data: Data) -> (completeData: [Data], remainingData: Data) {
let separator: [UInt8] = [Self.lf, Self.lf]
var rawMessages = [Data]()

// If event separator is not present do not parse any unfinished messages
guard let lastSeparator = data.lastRange(of: separator) else { return ([], data) }

let bufferRange = data.startIndex..<lastSeparator.upperBound
let remainingRange = lastSeparator.upperBound..<data.endIndex

if #available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, visionOS 1.0, *) {
rawMessages = data[bufferRange].split(separator: separator)
} else {
rawMessages = data[bufferRange].split(by: separator)
}

return (rawMessages, data[remainingRange])
}
}

fileprivate extension Data {
Expand Down
8 changes: 4 additions & 4 deletions Sources/EventSource/EventSource.swift
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public extension EventSource {
/// A URLRequest of the events source.
public let urlRequest: URLRequest

private let eventParser: EventParser
private var eventParser: EventParser

private let timeoutInterval: TimeInterval

Expand Down Expand Up @@ -136,7 +136,7 @@ public extension EventSource {

return AsyncStream { continuation in
let sessionDelegate = SessionDelegate()
let sesstionDelegateTask = Task { [weak self] in
let sessionDelegateTask = Task { [weak self] in
for await event in sessionDelegate.eventStream {
guard let self else { return }

Expand All @@ -153,12 +153,12 @@ public extension EventSource {

#if compiler(>=6.0)
continuation.onTermination = { @Sendable [weak self] _ in
sesstionDelegateTask.cancel()
sessionDelegateTask.cancel()
Task { await self?.close() }
}
#else
continuation.onTermination = { @Sendable _ in
sesstionDelegateTask.cancel()
sessionDelegateTask.cancel()
Task { [weak self] in
await self?.close()
}
Expand Down
62 changes: 53 additions & 9 deletions Tests/EventSourceTests/EventParserTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import Testing
@testable import EventSource

struct EventParserTests {
@Test func messagesParsing() throws {
let parser = ServerEventParser()
@Test func messagesParsing() async throws {
var parser = ServerEventParser()

let text = """
data: test 1
Expand All @@ -26,6 +26,8 @@ struct EventParserTests {
id: 5
event: ping
data: test 5


"""
let data = Data(text.utf8)

Expand Down Expand Up @@ -56,9 +58,9 @@ struct EventParserTests {
#expect(messages[4].event! == "ping")
#expect(messages[4].data! == "test 5")
}
@Test func emptyData() {
let parser = ServerEventParser()

@Test func emptyData() async {
var parser = ServerEventParser()

let text = """

Expand All @@ -71,8 +73,8 @@ struct EventParserTests {
#expect(messages.isEmpty)
}

@Test func otherMessageFormats() {
let parser = ServerEventParser()
@Test func otherMessageFormats() async {
var parser = ServerEventParser()

let text = """
data : test 1
Expand All @@ -91,6 +93,8 @@ struct EventParserTests {

message 6
message 6-1


"""
let data = Data(text.utf8)

Expand Down Expand Up @@ -124,15 +128,16 @@ struct EventParserTests {
#expect(messages[5].other!["message 6-1"] == "")
}

@Test func dataOnlyMode() throws {
let parser = ServerEventParser(mode: .dataOnly)
@Test func dataOnlyMode() async throws {
var parser = ServerEventParser(mode: .dataOnly)
let jsonDecoder = JSONDecoder()

let text = """
data: {"id":"abcd-1","type":"message","content":"\\ntest\\n"}

data: {"id":"abcd-2","type":"message","content":"\\n\\n"}


"""
let data = Data(text.utf8)

Expand All @@ -147,6 +152,45 @@ struct EventParserTests {
#expect(message1.content == "\ntest\n")
#expect(message2.content == "\n\n")
}

@Test func parseNotCompleteMessage() async throws {
var parser = ServerEventParser()

let text = """
data: test 1
"""
let data = Data(text.utf8)

let messages = parser.parse(data)

#expect(messages.count == 0)
}

@Test func parseSeparatedMessage() async throws {
var parser = ServerEventParser()

let textPart1 = """
event: add

"""
let dataPart1 = Data(textPart1.utf8)
let textPart2 = """
data: test 1


"""
let dataPart2 = Data(textPart2.utf8)

let _ = parser.parse(dataPart1)
let messages = parser.parse(dataPart2)

#expect(messages.count == 1)

#expect(messages[0].event != nil)
#expect(messages[0].data != nil)
#expect(messages[0].event! == "add")
#expect(messages[0].data! == "test 1")
}
}

fileprivate extension EventParserTests {
Expand Down