Skip to content

Commit

Permalink
syncronize Event processing (#436)
Browse files Browse the repository at this point in the history
Co-authored-by: Steve Benedick <sbenedic@adobe.com>
  • Loading branch information
nporter-adbe and sbenedicadb authored Nov 4, 2020
1 parent bcee274 commit d6edec4
Showing 1 changed file with 43 additions and 35 deletions.
78 changes: 43 additions & 35 deletions AEPCore/Sources/eventhub/EventHub.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,12 @@ final class EventHub {
/// Dispatches a new `Event` to the `EventHub`. This `Event` is sent to all listeners who have registered for the `EventType`and `EventSource`
/// - Parameter event: An `Event` to be dispatched to listeners
func dispatch(event: Event) {
// Set an event number for the event
eventNumberMap[event.id] = eventNumberCounter.incrementAndGet()
eventQueue.add(event)
Log.trace(label: LOG_TAG, "Dispatching Event #\(String(describing: eventNumberMap[event.id] ?? 0)) - \(event)")
eventHubQueue.async {
// Set an event number for the event
self.eventNumberMap[event.id] = self.eventNumberCounter.incrementAndGet()
self.eventQueue.add(event)
Log.trace(label: self.LOG_TAG, "Dispatching Event #\(String(describing: self.eventNumberMap[event.id] ?? 0)) - \(event)")
}
}

/// Registers a new `Extension` to the `EventHub`. This `Extension` must implement `Extension`
Expand Down Expand Up @@ -155,14 +157,16 @@ final class EventHub {
/// - data: Data for the `SharedState`
/// - event: `Event` for which the `SharedState` should be versioned
func createSharedState(extensionName: String, data: [String: Any]?, event: Event?) {
guard let (sharedState, version) = versionSharedState(extensionName: extensionName, event: event) else {
Log.warning(label: LOG_TAG, "Error creating shared state for \(extensionName)")
return
}
eventHubQueue.async {
guard let (sharedState, version) = self.versionSharedState(extensionName: extensionName, event: event) else {
Log.warning(label: self.LOG_TAG, "Error creating shared state for \(extensionName)")
return
}

sharedState.set(version: version, data: data)
dispatch(event: createSharedStateEvent(extensionName: extensionName))
Log.debug(label: LOG_TAG, "Shared state created for \(extensionName) with version \(version) and data: \n\(data as AnyObject)")
sharedState.set(version: version, data: data)
self.dispatch(event: self.createSharedStateEvent(extensionName: extensionName))
Log.debug(label: self.LOG_TAG, "Shared state created for \(extensionName) with version \(version) and data: \n\(data as AnyObject)")
}
}

/// Sets the `SharedState` for the extension to pending at `event`'s version and returns a `SharedStateResolver` which is to be invoked with data for the `SharedState` once available.
Expand All @@ -174,17 +178,19 @@ final class EventHub {
/// - event: `Event` for which the `SharedState` should be versioned
/// - Returns: A `SharedStateResolver` which is invoked to set pending the `SharedState` versioned at `event`
func createPendingSharedState(extensionName: String, event: Event?) -> SharedStateResolver {
var pendingVersion: Int?
return eventHubQueue.sync {
var pendingVersion: Int?

if let (sharedState, version) = versionSharedState(extensionName: extensionName, event: event) {
pendingVersion = version
sharedState.addPending(version: version)
Log.debug(label: LOG_TAG, "Pending shared state created for \(extensionName) with version \(version)")
}
if let (sharedState, version) = versionSharedState(extensionName: extensionName, event: event) {
pendingVersion = version
sharedState.addPending(version: version)
Log.debug(label: LOG_TAG, "Pending shared state created for \(extensionName) with version \(version)")
}

return { [weak self] data in
self?.resolvePendingSharedState(extensionName: extensionName, version: pendingVersion, data: data)
Log.debug(label: self?.LOG_TAG ?? "EventHub", "Pending shared state resolved for \(extensionName) with version \(String(describing: pendingVersion)) and data: \n\(data as AnyObject)")
return { [weak self] data in
self?.resolvePendingSharedState(extensionName: extensionName, version: pendingVersion, data: data)
Log.debug(label: self?.LOG_TAG ?? "EventHub", "Pending shared state resolved for \(extensionName) with version \(String(describing: pendingVersion)) and data: \n\(data as AnyObject)")
}
}
}

Expand All @@ -195,25 +201,27 @@ final class EventHub {
/// - barrier: If true, the `EventHub` will only return `.set` if `extensionName` has moved past `event`
/// - Returns: The `SharedState` data and status for the extension with `extensionName`
func getSharedState(extensionName: String, event: Event?, barrier: Bool = true) -> SharedStateResult? {
guard let container = registeredExtensions.first(where: { $1.sharedStateName == extensionName })?.value, let sharedState = container.sharedState else {
Log.warning(label: LOG_TAG, "Unable to retrieve shared state for \(extensionName). No such extension is registered.")
return nil
}
return eventHubQueue.sync {
guard let container = registeredExtensions.first(where: { $1.sharedStateName == extensionName })?.value, let sharedState = container.sharedState else {
Log.warning(label: LOG_TAG, "Unable to retrieve shared state for \(extensionName). No such extension is registered.")
return nil
}

var version = 0 // default to version 0 if event nil
if let event = event {
version = eventNumberMap[event.id] ?? 0
}
var version = 0 // default to version 0 if event nil
if let event = event {
version = eventNumberMap[event.id] ?? 0
}

let result = sharedState.resolve(version: version)
let result = sharedState.resolve(version: version)

let stateProviderLastVersion = eventNumberFor(event: container.lastProcessedEvent)
// shared state is still considered pending if barrier is used and the state provider has not processed past the previous event
if barrier && stateProviderLastVersion < version - 1 && result.status == .set {
return SharedStateResult(status: .pending, value: result.value)
}
let stateProviderLastVersion = eventNumberFor(event: container.lastProcessedEvent)
// shared state is still considered pending if barrier is used and the state provider has not processed past the previous event
if barrier && stateProviderLastVersion < version - 1 && result.status == .set {
return SharedStateResult(status: .pending, value: result.value)
}

return SharedStateResult(status: result.status, value: result.value)
return SharedStateResult(status: result.status, value: result.value)
}
}

/// Retrieves the `ExtensionContainer` wrapper for the given extension type
Expand Down

0 comments on commit d6edec4

Please sign in to comment.