Skip to content
This repository was archived by the owner on Aug 18, 2020. It is now read-only.

Commit e543485

Browse files
authored
Merge pull request #78 from cesmec/feature/realtime-gui-updates
Added realtime updates
2 parents 702aef5 + fa77b69 commit e543485

File tree

7 files changed

+175
-1
lines changed

7 files changed

+175
-1
lines changed

src/main/scala/ScalatraBootstrap.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import javax.servlet.ServletContext
22
import org.codeoverflow.chatoverflow.ui.web.rest.config.ConfigController
33
import org.codeoverflow.chatoverflow.ui.web.rest.connector.ConnectorController
4+
import org.codeoverflow.chatoverflow.ui.web.rest.events.{EventsController, EventsDispatcher}
45
import org.codeoverflow.chatoverflow.ui.web.rest.plugin.PluginInstanceController
56
import org.codeoverflow.chatoverflow.ui.web.rest.types.TypeController
67
import org.codeoverflow.chatoverflow.ui.web.{CodeOverflowSwagger, OpenAPIServlet}
@@ -21,6 +22,9 @@ class ScalatraBootstrap extends LifeCycle {
2122
context.initParameters("org.scalatra.cors.allowedMethods") = "*"
2223

2324
// Add all servlets and controller
25+
val eventsController = new EventsController()
26+
EventsDispatcher.init(eventsController)
27+
context.mount(eventsController, "/events/*", "events")
2428
context.mount(new TypeController(), "/types/*", "types")
2529
context.mount(new ConfigController(), "/config/*", "config")
2630
context.mount(new PluginInstanceController(), "/instances/*", "instances")

src/main/scala/org/codeoverflow/chatoverflow/framework/manager/PluginManagerImpl.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import java.util
44

55
import org.codeoverflow.chatoverflow.WithLogger
66
import org.codeoverflow.chatoverflow.api.plugin.{PluginLogMessage, PluginManager}
7+
import org.codeoverflow.chatoverflow.ui.web.rest.events.{EventMessage, EventsDispatcher}
78

89
import scala.collection.JavaConverters._
910
import scala.collection.mutable.ListBuffer
@@ -23,11 +24,18 @@ class PluginManagerImpl(pluginInstanceName: String, logOutputOnConsole: Boolean)
2324
* @param message the message to show
2425
*/
2526
override def log(message: String): Unit = {
26-
logMessages += new PluginLogMessage(message)
27+
val logMessage = new PluginLogMessage(message)
28+
logMessages += logMessage
2729

2830
if (logOutputOnConsole) {
2931
logger info s"[$pluginInstanceName] $message"
3032
}
33+
34+
EventsDispatcher.broadcast("instance", EventMessage("log", Map(
35+
("name", pluginInstanceName),
36+
("message", message),
37+
("timestamp", logMessage.getTimestamp.toString)
38+
)))
3139
}
3240

3341
/**

src/main/scala/org/codeoverflow/chatoverflow/instance/PluginInstance.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import org.codeoverflow.chatoverflow.api.plugin.{Plugin, PluginManager}
88
import org.codeoverflow.chatoverflow.framework.PluginCompatibilityState.PluginCompatibilityState
99
import org.codeoverflow.chatoverflow.framework.manager.{PluginManagerImpl, PluginManagerStub}
1010
import org.codeoverflow.chatoverflow.framework.{PluginCompatibilityState, PluginType}
11+
import org.codeoverflow.chatoverflow.ui.web.rest.events.{EventMessage, EventsDispatcher}
1112

1213
/**
1314
* A plugin instance holds all the general information of the plugin type and specific information of
@@ -149,6 +150,8 @@ class PluginInstance(val instanceName: String, pluginType: PluginType) extends W
149150
logger info s"Starting plugin '$instanceName' in new thread!"
150151
try {
151152
instanceThread = new Thread(() => {
153+
EventsDispatcher.broadcast("instance", EventMessage("start", Map(("name", instanceName))))
154+
152155
try {
153156

154157
// Execute plugin setup
@@ -191,6 +194,7 @@ class PluginInstance(val instanceName: String, pluginType: PluginType) extends W
191194
requirement.asInstanceOf[Requirement[Output]].get().shutdown()
192195
})
193196

197+
EventsDispatcher.broadcast("instance", EventMessage("stop", Map(("name", instanceName))))
194198
}
195199
})
196200
instanceThread.start()
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package org.codeoverflow.chatoverflow.ui.web.rest.events
2+
3+
case class EventMessage[T](action: String, data: T)
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package org.codeoverflow.chatoverflow.ui.web.rest.events
2+
3+
import java.io.PrintWriter
4+
import java.util.concurrent.ConcurrentHashMap
5+
6+
import javax.servlet.AsyncContext
7+
import javax.servlet.http.HttpServletRequest
8+
import org.codeoverflow.chatoverflow.ui.web.JsonServlet
9+
import org.scalatra.servlet.ScalatraAsyncSupport
10+
import org.scalatra.{BadRequest, Unauthorized}
11+
import org.scalatra.swagger.Swagger
12+
13+
class EventsController(implicit val swagger: Swagger) extends JsonServlet with ScalatraAsyncSupport with EventsControllerDefinition {
14+
private val connectionWriters = new ConcurrentHashMap[AsyncContext, PrintWriter]()
15+
16+
def broadcast(messageType: String, message: String = null): Unit = {
17+
connectionWriters.forEach((_, writer) => {
18+
try {
19+
sendMessage(writer, messageType, message)
20+
} catch {
21+
//probably lost or closed connection, remove from the list of connected clients
22+
case _: Throwable => connectionWriters.remove(writer)
23+
}
24+
})
25+
}
26+
27+
def closeConnections(): Unit = {
28+
connectionWriters.forEach((_, writer) => {
29+
try {
30+
sendMessage(writer, "close", null)
31+
writer.close()
32+
} finally {
33+
connectionWriters.remove(writer)
34+
}
35+
})
36+
}
37+
38+
private def sendMessage(writer: PrintWriter, messageType: String, message: String): Unit = {
39+
/*
40+
Every message has the following format and ends with two line feeds (\n):
41+
event: [name of event]
42+
data: [first line]
43+
data: [second line]
44+
...
45+
46+
See also: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#Examples
47+
*/
48+
49+
var msg = "event: " + messageType.replace("\n", "") + "\n"
50+
if (message != null)
51+
msg += "data: " + message.replace("\n", "\ndata: ") + "\n\n"
52+
writer.write(msg)
53+
writer.flush()
54+
}
55+
56+
get("/", operation(getEvents)) {
57+
val accept = request.getHeader("Accept")
58+
if (accept == null || !accept.replace(" ", "").split(",").contains("text/event-stream")) {
59+
status = 406
60+
} else {
61+
authParamRequired {
62+
contentType = "text/event-stream"
63+
64+
val asyncContext = request.startAsync()
65+
asyncContext.setTimeout(0)
66+
67+
val writer = asyncContext.getResponse.getWriter
68+
connectionWriters.put(asyncContext, writer)
69+
}
70+
}
71+
}
72+
73+
private def authParamRequired(func: => Any)(implicit request: HttpServletRequest): Any = {
74+
val authKeyKey = "authKey"
75+
76+
if (!request.parameters.contains(authKeyKey) || request.getParameter(authKeyKey).isEmpty) {
77+
BadRequest()
78+
} else if (request.getParameter(authKeyKey) != chatOverflow.credentialsService.generateAuthKey()) {
79+
Unauthorized()
80+
} else {
81+
func
82+
}
83+
}
84+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.codeoverflow.chatoverflow.ui.web.rest.events
2+
3+
import org.codeoverflow.chatoverflow.ui.web.rest.{AuthSupport, TagSupport}
4+
import org.scalatra.swagger.{SwaggerSupport, SwaggerSupportSyntax}
5+
import org.scalatra.swagger.SwaggerSupportSyntax.OperationBuilder
6+
7+
trait EventsControllerDefinition extends SwaggerSupport with TagSupport with AuthSupport {
8+
val getEvents: OperationBuilder =
9+
(apiOperation[Object]("getEvents")
10+
summary "Get events"
11+
description "Get events from chatoverflow using the EventSource API. Requires an Accept-header with the value text/event-stream."
12+
parameter authQuery
13+
tags controllerTag)
14+
15+
protected def authQuery: SwaggerSupportSyntax.ParameterBuilder[String] =
16+
queryParam[String]("authKey").description("connection auth key required")
17+
18+
override def controllerTag: String = "events"
19+
20+
override protected def applicationDescription: String = "Handles chatoverflow events."
21+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package org.codeoverflow.chatoverflow.ui.web.rest.events
2+
3+
import org.json4s.{DefaultFormats, Formats}
4+
import org.json4s.jackson.Serialization
5+
6+
/**
7+
* The EventsDispatcher is the central point for realtime communication to the clients
8+
*/
9+
object EventsDispatcher {
10+
private var controller: EventsController = _
11+
implicit val formats: Formats = DefaultFormats
12+
13+
/**
14+
* Initializes the EventsDispatcher with the registered controller
15+
* Only to be used from the bootstrap
16+
* @param eventsController registered controller that accepts the incoming connections
17+
*/
18+
def init(eventsController: EventsController): Unit = {
19+
if (controller == null)
20+
controller = eventsController
21+
}
22+
23+
/**
24+
* Sends the message to all connected clients
25+
* @param messageType type of the message / event
26+
* @param message the message to send
27+
* @tparam T type of the message data
28+
*/
29+
def broadcast[T](messageType: String, message: EventMessage[T]): Unit = {
30+
broadcast(messageType, Serialization.write(message))
31+
}
32+
33+
/**
34+
* Sends the message to all connected clients
35+
* @param messageType type of the message / event
36+
* @param message the message to send
37+
*/
38+
def broadcast(messageType: String, message: String = null): Unit = {
39+
if (controller != null)
40+
controller.broadcast(messageType, message)
41+
}
42+
43+
/**
44+
* Sends a close message to all connected clients and closes the connections
45+
*/
46+
def close(): Unit = {
47+
if (controller != null)
48+
controller.closeConnections()
49+
}
50+
}

0 commit comments

Comments
 (0)