Skip to content

Commit

Permalink
Merge pull request #658 from nporter-adbe/AMSDK-11514-2
Browse files Browse the repository at this point in the history
[AMSDK-11514] Prevent possible crash at shutdown in EventHub
  • Loading branch information
nporter-adbe authored Jun 25, 2021
2 parents e7c0826 + 269b734 commit c0781f1
Showing 1 changed file with 57 additions and 53 deletions.
110 changes: 57 additions & 53 deletions AEPCore/Sources/eventhub/EventHub.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,12 @@ final class EventHub {
registerExtension(EventHubPlaceholderExtension.self, completion: { _ in })

// Setup eventQueue handler for the main OperationOrderer
eventQueue.setHandler { (event) -> Bool in

let processedEvent = self.preprocessors.shallowCopy.reduce(event) { event, preprocessor in
preprocessor(event)
}
eventQueue.setHandler { [weak self] (event) -> Bool in
guard let processedEvent = self?.preprocessors.shallowCopy.reduce(event, {$1($0)}) else { return true }

// Handle response event listeners first
if let responseID = processedEvent.responseID {
_ = self.responseEventListeners.filterRemove { (eventListenerContainer: EventListenerContainer) -> Bool in
_ = self?.responseEventListeners.filterRemove { (eventListenerContainer: EventListenerContainer) -> Bool in
guard eventListenerContainer.triggerEventId == responseID else { return false }
eventListenerContainer.timeoutTask?.cancel()
eventListenerContainer.listener(processedEvent)
Expand All @@ -62,7 +59,7 @@ final class EventHub {
}

// Send event to each ExtensionContainer
self.registeredExtensions.shallowCopy.values.forEach {
self?.registeredExtensions.shallowCopy.values.forEach {
$0.eventOrderer.add(processedEvent)
}

Expand All @@ -72,22 +69,23 @@ final class EventHub {

/// When this API is invoked the `EventHub` will begin processing `Event`s
func start() {
eventHubQueue.async {
self.started = true
self.eventQueue.start()
self.shareEventHubSharedState() // share state of all registered extensions
Log.debug(label: self.LOG_TAG, "Event Hub successfully started")
eventHubQueue.async { [weak self] in
self?.started = true
self?.eventQueue.start()
self?.shareEventHubSharedState() // share state of all registered extensions
Log.debug(label: self?.LOG_TAG ?? "EventHub", "Event Hub successfully started")
}
}

/// Dispatches a new `Event` to the `EventHub`. This `Event` is sent to all listeners who have registered for the `EventType`and `EventSource`
/// - Parameter event: An `Event` to be dispatched to listeners
func dispatch(event: Event) {
eventHubQueue.async {
eventHubQueue.async { [weak self] in
// Set an event number for the event
self.eventNumberMap[event.id] = self.eventNumberCounter.incrementAndGet()
self.eventQueue.add(event)
Log.trace(label: self.LOG_TAG, "Dispatching Event #\(String(describing: self.eventNumberMap[event.id] ?? 0)) - \(event)")
self?.eventNumberMap[event.id] = self?.eventNumberCounter.incrementAndGet()
self?.eventQueue.add(event)
Log.trace(label: self?.LOG_TAG ?? "EventHub",
"Dispatching Event #\(String(describing: self?.eventNumberMap[event.id] ?? 0)) - \(event)")
}
}

Expand All @@ -96,23 +94,23 @@ final class EventHub {
/// - type: The type of extension to register
/// - completion: Invoked when the extension has been registered or failed to register
func registerExtension(_ type: Extension.Type, completion: @escaping (_ error: EventHubError?) -> Void) {
eventHubQueue.async {
eventHubQueue.async { [weak self] in
guard !type.typeName.isEmpty else {
Log.warning(label: self.LOG_TAG, "Extension name must not be empty.")
Log.warning(label: self?.LOG_TAG ?? "EventHub", "Extension name must not be empty.")
completion(.invalidExtensionName)
return
}
guard self.registeredExtensions[type.typeName] == nil else {
Log.warning(label: "\(self.LOG_TAG):\(#function)", "Cannot register an extension multiple times.")
guard self?.registeredExtensions[type.typeName] == nil else {
Log.warning(label: "\(self?.LOG_TAG ?? "EventHub"):\(#function)", "Cannot register an extension multiple times.")
completion(.duplicateExtensionName)
return
}

// Init the extension on a dedicated queue
let extensionQueue = DispatchQueue(label: "com.adobe.eventhub.extension.\(type.typeName)")
let extensionContainer = ExtensionContainer(type, extensionQueue, completion: completion)
self.registeredExtensions[type.typeName] = extensionContainer
Log.debug(label: self.LOG_TAG, "\(type.typeName) successfully registered.")
self?.registeredExtensions[type.typeName] = extensionContainer
Log.debug(label: self?.LOG_TAG ?? "EventHub", "\(type.typeName) successfully registered.")
}
}

Expand All @@ -121,16 +119,16 @@ final class EventHub {
/// - type: The extension to be unregistered
/// - completion: A closure invoked when the extension has been unregistered
func unregisterExtension(_ type: Extension.Type, completion: @escaping (_ error: EventHubError?) -> Void) {
eventHubQueue.async {
guard self.registeredExtensions[type.typeName] != nil else {
Log.error(label: self.LOG_TAG, "Cannot unregister an extension that is not registered.")
eventHubQueue.async { [weak self] in
guard self?.registeredExtensions[type.typeName] != nil else {
Log.error(label: self?.LOG_TAG ?? "EventHub", "Cannot unregister an extension that is not registered.")
completion(.extensionNotRegistered)
return
}

let extensionContainer = self.registeredExtensions.removeValue(forKey: type.typeName) // remove the corresponding extension container
let extensionContainer = self?.registeredExtensions.removeValue(forKey: type.typeName) // remove the corresponding extension container
extensionContainer?.exten?.onUnregistered() // invoke the onUnregistered delegate function
self.shareEventHubSharedState()
self?.shareEventHubSharedState()
completion(nil)
}
}
Expand All @@ -142,10 +140,11 @@ final class EventHub {
/// - listener: An `EventResponseListener` which will be invoked whenever the `EventHub` receives the response `Event` for `triggerEvent`
func registerResponseListener(triggerEvent: Event, timeout: TimeInterval, listener: @escaping EventResponseListener) {
var responseListenerContainer: EventListenerContainer? // initialized here so we can use in timeout block
responseListenerContainer = EventListenerContainer(listener: listener, triggerEventId: triggerEvent.id, timeout: DispatchWorkItem {
responseListenerContainer = EventListenerContainer(listener: listener, triggerEventId: triggerEvent.id, timeout: DispatchWorkItem { [weak self] in
listener(nil)
_ = self.responseEventListeners.filterRemove { $0 == responseListenerContainer }
_ = self?.responseEventListeners.filterRemove { $0 == responseListenerContainer }
})

DispatchQueue.global().asyncAfter(deadline: DispatchTime.now() + timeout, execute: responseListenerContainer!.timeoutTask!)
responseEventListeners.append(responseListenerContainer!)
}
Expand All @@ -156,12 +155,13 @@ final class EventHub {
/// - source: A `String` indicating the event source the current listener is listening for
/// - listener: An `EventResponseListener` which will be invoked whenever the `EventHub` receives a event with matched type and source
func registerEventListener(type: String, source: String, listener: @escaping EventListener) {
eventHubQueue.async {
eventHubQueue.async { [weak self] in
// use the event hub placeholder extension to hold all the listeners registered from the public API
guard let eventHubExtension = self.registeredExtensions.first(where: { $1.sharedStateName.caseInsensitiveCompare(EventHubConstants.NAME) == .orderedSame })?.value else {
Log.warning(label: self.LOG_TAG, "Error registering event listener")
guard let eventHubExtension = self?.registeredExtensions.first(where: { $1.sharedStateName.caseInsensitiveCompare(EventHubConstants.NAME) == .orderedSame })?.value else {
Log.warning(label: self?.LOG_TAG ?? "EventHub", "Error registering event listener")
return
}

eventHubExtension.registerListener(type: type, source: source, listener: listener)
}
}
Expand All @@ -175,15 +175,17 @@ final class EventHub {
/// - data: Data for the `SharedState`
/// - event: `Event` for which the `SharedState` should be versioned
func createSharedState(extensionName: String, data: [String: Any]?, event: Event?, sharedStateType: SharedStateType = .standard) {
eventHubQueue.async {
guard let (sharedState, version) = self.versionSharedState(extensionName: extensionName, event: event, sharedStateType: sharedStateType) else {
Log.warning(label: self.LOG_TAG, "Error creating \(sharedStateType.rawValue) shared state for \(extensionName)")
eventHubQueue.async { [weak self] in
guard let (sharedState, version) = self?.versionSharedState(extensionName: extensionName, event: event, sharedStateType: sharedStateType) else {
Log.warning(label: self?.LOG_TAG ?? "EventHub", "Error creating \(sharedStateType.rawValue) shared state for \(extensionName)")
return
}

sharedState.set(version: version, data: data)
self.dispatch(event: self.createSharedStateEvent(extensionName: extensionName, sharedStatetype: sharedStateType))
Log.debug(label: self.LOG_TAG, "\(sharedStateType.rawValue.capitalized) shared state created for \(extensionName) with version \(version) and data: \n\(PrettyDictionary.prettify(data))")
if let toBeDispatched = self?.createSharedStateEvent(extensionName: extensionName, sharedStatetype: sharedStateType) {
self?.dispatch(event: toBeDispatched)
Log.debug(label: self?.LOG_TAG ?? "EventHub", "\(sharedStateType.rawValue.capitalized) shared state created for \(extensionName) with version \(version) and data: \n\(PrettyDictionary.prettify(data))")
}
}
}

Expand All @@ -197,13 +199,13 @@ final class EventHub {
/// - sharedStateType: The type of shared state to be read from, if not provided defaults to `.standard`
/// - Returns: A `SharedStateResolver` which is invoked to set pending the `SharedState` versioned at `event`
func createPendingSharedState(extensionName: String, event: Event?, sharedStateType: SharedStateType = .standard) -> SharedStateResolver {
return eventHubQueue.sync {
return eventHubQueue.sync { [weak self] in
var pendingVersion: Int?

if let (sharedState, version) = versionSharedState(extensionName: extensionName, event: event, sharedStateType: sharedStateType) {
if let (sharedState, version) = self?.versionSharedState(extensionName: extensionName, event: event, sharedStateType: sharedStateType) {
pendingVersion = version
sharedState.addPending(version: version)
Log.debug(label: LOG_TAG, "Pending \(sharedStateType.rawValue) shared state created for \(extensionName) with version \(version)")
Log.debug(label: self?.LOG_TAG ?? "EventHub", "Pending \(sharedStateType.rawValue) shared state created for \(extensionName) with version \(version)")
}

return { [weak self] data in
Expand All @@ -221,21 +223,21 @@ final class EventHub {
/// - sharedStateType: The type of shared state to be read from, if not provided defaults to `.standard`
/// - Returns: The `SharedState` data and status for the extension with `extensionName`
func getSharedState(extensionName: String, event: Event?, barrier: Bool = true, sharedStateType: SharedStateType = .standard) -> SharedStateResult? {
return eventHubQueue.sync {
guard let container = registeredExtensions.first(where: { $1.sharedStateName.caseInsensitiveCompare(extensionName) == .orderedSame })?.value, let sharedState = container.sharedState(for: sharedStateType) else {
Log.debug(label: LOG_TAG, "Unable to retrieve \(sharedStateType.rawValue) shared state for \(extensionName). No such extension is registered.")
return eventHubQueue.sync { [weak self] in
guard let container = self?.registeredExtensions.first(where: { $1.sharedStateName.caseInsensitiveCompare(extensionName) == .orderedSame })?.value, let sharedState = container.sharedState(for: sharedStateType) else {
Log.warning(label: LOG_TAG, "Unable to retrieve \(sharedStateType.rawValue) shared state for \(extensionName). No such extension is registered.")
return nil
}

var version = Int.max // default to version max if event nil
if let event = event {
// default to latest version if event is non-nil but not yet versioned
version = eventNumberMap[event.id] ?? Int.max
version = self?.eventNumberMap[event.id] ?? Int.max
}

let result = sharedState.resolve(version: version)

let stateProviderLastVersion = eventNumberFor(event: container.lastProcessedEvent)
let stateProviderLastVersion = self?.eventNumberFor(event: container.lastProcessedEvent) ?? 0
// shared state is still considered pending if barrier is used and the state provider has not processed past the previous event
let hasProcessedEvent = event == nil ? true : stateProviderLastVersion > version - 1
if barrier && !hasProcessedEvent && result.status == .set {
Expand All @@ -250,8 +252,8 @@ final class EventHub {
/// - Parameter type: The `Extension` class to find the `ExtensionContainer` for
/// - Returns: The `ExtensionContainer` instance if the `Extension` type was found, nil otherwise
func getExtensionContainer(_ type: Extension.Type) -> ExtensionContainer? {
return eventHubQueue.sync {
return registeredExtensions[type.typeName]
return eventHubQueue.sync { [weak self] in
return self?.registeredExtensions[type.typeName]
}
}

Expand Down Expand Up @@ -298,15 +300,17 @@ final class EventHub {
/// shut down the event hub, wait for the event queue to stop and unregister all the extensions
func shutdown() {
eventQueue.waitToStop()
eventHubQueue.sync {
for ext in registeredExtensions.shallowCopy.values {
ext.unregisterExtension()
ext.shutdown()
eventHubQueue.sync { [weak self] in
if let extensions = self?.registeredExtensions.shallowCopy.values {
for ext in extensions {
ext.unregisterExtension()
ext.shutdown()
}
}
}
eventHubQueue.sync {
eventHubQueue.sync { [weak self] in
// just wait
registeredExtensions = ThreadSafeDictionary<String, ExtensionContainer>(identifier: "com.adobe.eventHub.registeredExtensions.queue")
self?.registeredExtensions = ThreadSafeDictionary<String, ExtensionContainer>(identifier: "com.adobe.eventHub.registeredExtensions.queue")
}
}

Expand Down

0 comments on commit c0781f1

Please sign in to comment.