Skip to content
This repository was archived by the owner on Aug 18, 2020. It is now read-only.

Added realtime updates #78

Merged
merged 3 commits into from
Jul 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/main/scala/ScalatraBootstrap.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import javax.servlet.ServletContext
import org.codeoverflow.chatoverflow.ui.web.rest.config.ConfigController
import org.codeoverflow.chatoverflow.ui.web.rest.connector.ConnectorController
import org.codeoverflow.chatoverflow.ui.web.rest.events.{EventsController, EventsDispatcher}
import org.codeoverflow.chatoverflow.ui.web.rest.plugin.PluginInstanceController
import org.codeoverflow.chatoverflow.ui.web.rest.types.TypeController
import org.codeoverflow.chatoverflow.ui.web.{CodeOverflowSwagger, OpenAPIServlet}
Expand All @@ -21,6 +22,9 @@ class ScalatraBootstrap extends LifeCycle {
context.initParameters("org.scalatra.cors.allowedMethods") = "*"

// Add all servlets and controller
val eventsController = new EventsController()
EventsDispatcher.init(eventsController)
context.mount(eventsController, "/events/*", "events")
context.mount(new TypeController(), "/types/*", "types")
context.mount(new ConfigController(), "/config/*", "config")
context.mount(new PluginInstanceController(), "/instances/*", "instances")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util

import org.codeoverflow.chatoverflow.WithLogger
import org.codeoverflow.chatoverflow.api.plugin.{PluginLogMessage, PluginManager}
import org.codeoverflow.chatoverflow.ui.web.rest.events.{EventMessage, EventsDispatcher}

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
Expand All @@ -23,11 +24,18 @@ class PluginManagerImpl(pluginInstanceName: String, logOutputOnConsole: Boolean)
* @param message the message to show
*/
override def log(message: String): Unit = {
logMessages += new PluginLogMessage(message)
val logMessage = new PluginLogMessage(message)
logMessages += logMessage

if (logOutputOnConsole) {
logger info s"[$pluginInstanceName] $message"
}

EventsDispatcher.broadcast("instance", EventMessage("log", Map(
("name", pluginInstanceName),
("message", message),
("timestamp", logMessage.getTimestamp.toString)
)))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.codeoverflow.chatoverflow.api.plugin.{Plugin, PluginManager}
import org.codeoverflow.chatoverflow.framework.PluginCompatibilityState.PluginCompatibilityState
import org.codeoverflow.chatoverflow.framework.manager.{PluginManagerImpl, PluginManagerStub}
import org.codeoverflow.chatoverflow.framework.{PluginCompatibilityState, PluginType}
import org.codeoverflow.chatoverflow.ui.web.rest.events.{EventMessage, EventsDispatcher}

/**
* A plugin instance holds all the general information of the plugin type and specific information of
Expand Down Expand Up @@ -149,6 +150,8 @@ class PluginInstance(val instanceName: String, pluginType: PluginType) extends W
logger info s"Starting plugin '$instanceName' in new thread!"
try {
instanceThread = new Thread(() => {
EventsDispatcher.broadcast("instance", EventMessage("start", Map(("name", instanceName))))

try {

// Execute plugin setup
Expand Down Expand Up @@ -191,6 +194,7 @@ class PluginInstance(val instanceName: String, pluginType: PluginType) extends W
requirement.asInstanceOf[Requirement[Output]].get().shutdown()
})

EventsDispatcher.broadcast("instance", EventMessage("stop", Map(("name", instanceName))))
}
})
instanceThread.start()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package org.codeoverflow.chatoverflow.ui.web.rest.events

case class EventMessage[T](action: String, data: T)
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package org.codeoverflow.chatoverflow.ui.web.rest.events

import java.io.PrintWriter
import java.util.concurrent.ConcurrentHashMap

import javax.servlet.AsyncContext
import javax.servlet.http.HttpServletRequest
import org.codeoverflow.chatoverflow.ui.web.JsonServlet
import org.scalatra.servlet.ScalatraAsyncSupport
import org.scalatra.{BadRequest, Unauthorized}
import org.scalatra.swagger.Swagger

class EventsController(implicit val swagger: Swagger) extends JsonServlet with ScalatraAsyncSupport with EventsControllerDefinition {
private val connectionWriters = new ConcurrentHashMap[AsyncContext, PrintWriter]()

def broadcast(messageType: String, message: String = null): Unit = {
connectionWriters.forEach((_, writer) => {
try {
sendMessage(writer, messageType, message)
} catch {
//probably lost or closed connection, remove from the list of connected clients
case _: Throwable => connectionWriters.remove(writer)
}
})
}

def closeConnections(): Unit = {
connectionWriters.forEach((_, writer) => {
try {
sendMessage(writer, "close", null)
writer.close()
} finally {
connectionWriters.remove(writer)
}
})
}

private def sendMessage(writer: PrintWriter, messageType: String, message: String): Unit = {
/*
Every message has the following format and ends with two line feeds (\n):
event: [name of event]
data: [first line]
data: [second line]
...

See also: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#Examples
*/

var msg = "event: " + messageType.replace("\n", "") + "\n"
if (message != null)
msg += "data: " + message.replace("\n", "\ndata: ") + "\n\n"
writer.write(msg)
writer.flush()
}

get("/", operation(getEvents)) {
val accept = request.getHeader("Accept")
if (accept == null || !accept.replace(" ", "").split(",").contains("text/event-stream")) {
status = 406
} else {
authParamRequired {
contentType = "text/event-stream"

val asyncContext = request.startAsync()
asyncContext.setTimeout(0)

val writer = asyncContext.getResponse.getWriter
connectionWriters.put(asyncContext, writer)
}
}
}

private def authParamRequired(func: => Any)(implicit request: HttpServletRequest): Any = {
val authKeyKey = "authKey"

if (!request.parameters.contains(authKeyKey) || request.getParameter(authKeyKey).isEmpty) {
BadRequest()
} else if (request.getParameter(authKeyKey) != chatOverflow.credentialsService.generateAuthKey()) {
Unauthorized()
} else {
func
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.codeoverflow.chatoverflow.ui.web.rest.events

import org.codeoverflow.chatoverflow.ui.web.rest.{AuthSupport, TagSupport}
import org.scalatra.swagger.{SwaggerSupport, SwaggerSupportSyntax}
import org.scalatra.swagger.SwaggerSupportSyntax.OperationBuilder

trait EventsControllerDefinition extends SwaggerSupport with TagSupport with AuthSupport {
val getEvents: OperationBuilder =
(apiOperation[Object]("getEvents")
summary "Get events"
description "Get events from chatoverflow using the EventSource API. Requires an Accept-header with the value text/event-stream."
parameter authQuery
tags controllerTag)

protected def authQuery: SwaggerSupportSyntax.ParameterBuilder[String] =
queryParam[String]("authKey").description("connection auth key required")

override def controllerTag: String = "events"

override protected def applicationDescription: String = "Handles chatoverflow events."
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.codeoverflow.chatoverflow.ui.web.rest.events

import org.json4s.{DefaultFormats, Formats}
import org.json4s.jackson.Serialization

/**
* The EventsDispatcher is the central point for realtime communication to the clients
*/
object EventsDispatcher {
private var controller: EventsController = _
implicit val formats: Formats = DefaultFormats

/**
* Initializes the EventsDispatcher with the registered controller
* Only to be used from the bootstrap
* @param eventsController registered controller that accepts the incoming connections
*/
def init(eventsController: EventsController): Unit = {
if (controller == null)
controller = eventsController
}

/**
* Sends the message to all connected clients
* @param messageType type of the message / event
* @param message the message to send
* @tparam T type of the message data
*/
def broadcast[T](messageType: String, message: EventMessage[T]): Unit = {
broadcast(messageType, Serialization.write(message))
}

/**
* Sends the message to all connected clients
* @param messageType type of the message / event
* @param message the message to send
*/
def broadcast(messageType: String, message: String = null): Unit = {
if (controller != null)
controller.broadcast(messageType, message)
}

/**
* Sends a close message to all connected clients and closes the connections
*/
def close(): Unit = {
if (controller != null)
controller.closeConnections()
}
}