Skip to content
This repository was archived by the owner on Aug 18, 2020. It is now read-only.

Commit 0d87f09

Browse files
committed
Merge branch 'develop' of https://github.com/codeoverflow-org/chatoverflow into develop
2 parents c1507d1 + e543485 commit 0d87f09

File tree

14 files changed

+240
-54
lines changed

14 files changed

+240
-54
lines changed

src/main/scala/ScalatraBootstrap.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import javax.servlet.ServletContext
22
import org.codeoverflow.chatoverflow.ui.web.rest.config.ConfigController
33
import org.codeoverflow.chatoverflow.ui.web.rest.connector.ConnectorController
4+
import org.codeoverflow.chatoverflow.ui.web.rest.events.{EventsController, EventsDispatcher}
45
import org.codeoverflow.chatoverflow.ui.web.rest.plugin.PluginInstanceController
56
import org.codeoverflow.chatoverflow.ui.web.rest.types.TypeController
67
import org.codeoverflow.chatoverflow.ui.web.{CodeOverflowSwagger, OpenAPIServlet}
@@ -21,6 +22,9 @@ class ScalatraBootstrap extends LifeCycle {
2122
context.initParameters("org.scalatra.cors.allowedMethods") = "*"
2223

2324
// Add all servlets and controller
25+
val eventsController = new EventsController()
26+
EventsDispatcher.init(eventsController)
27+
context.mount(eventsController, "/events/*", "events")
2428
context.mount(new TypeController(), "/types/*", "types")
2529
context.mount(new ConfigController(), "/config/*", "config")
2630
context.mount(new PluginInstanceController(), "/instances/*", "instances")

src/main/scala/org/codeoverflow/chatoverflow/connector/actor/JsonActor.scala

Lines changed: 0 additions & 35 deletions
This file was deleted.

src/main/scala/org/codeoverflow/chatoverflow/framework/manager/PluginManagerImpl.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import java.util
44

55
import org.codeoverflow.chatoverflow.WithLogger
66
import org.codeoverflow.chatoverflow.api.plugin.{PluginLogMessage, PluginManager}
7+
import org.codeoverflow.chatoverflow.ui.web.rest.events.{EventMessage, EventsDispatcher}
78

89
import scala.collection.JavaConverters._
910
import scala.collection.mutable.ListBuffer
@@ -23,11 +24,18 @@ class PluginManagerImpl(pluginInstanceName: String, logOutputOnConsole: Boolean)
2324
* @param message the message to show
2425
*/
2526
override def log(message: String): Unit = {
26-
logMessages += new PluginLogMessage(message)
27+
val logMessage = new PluginLogMessage(message)
28+
logMessages += logMessage
2729

2830
if (logOutputOnConsole) {
2931
logger info s"[$pluginInstanceName] $message"
3032
}
33+
34+
EventsDispatcher.broadcast("instance", EventMessage("log", Map(
35+
("name", pluginInstanceName),
36+
("message", message),
37+
("timestamp", logMessage.getTimestamp.toString)
38+
)))
3139
}
3240

3341
/**

src/main/scala/org/codeoverflow/chatoverflow/instance/PluginInstance.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import org.codeoverflow.chatoverflow.api.plugin.{Plugin, PluginManager}
88
import org.codeoverflow.chatoverflow.framework.PluginCompatibilityState.PluginCompatibilityState
99
import org.codeoverflow.chatoverflow.framework.manager.{PluginManagerImpl, PluginManagerStub}
1010
import org.codeoverflow.chatoverflow.framework.{PluginCompatibilityState, PluginType}
11+
import org.codeoverflow.chatoverflow.ui.web.rest.events.{EventMessage, EventsDispatcher}
1112

1213
/**
1314
* A plugin instance holds all the general information of the plugin type and specific information of
@@ -149,6 +150,8 @@ class PluginInstance(val instanceName: String, pluginType: PluginType) extends W
149150
logger info s"Starting plugin '$instanceName' in new thread!"
150151
try {
151152
instanceThread = new Thread(() => {
153+
EventsDispatcher.broadcast("instance", EventMessage("start", Map(("name", instanceName))))
154+
152155
try {
153156

154157
// Execute plugin setup
@@ -191,6 +194,7 @@ class PluginInstance(val instanceName: String, pluginType: PluginType) extends W
191194
requirement.asInstanceOf[Requirement[Output]].get().shutdown()
192195
})
193196

197+
EventsDispatcher.broadcast("instance", EventMessage("stop", Map(("name", instanceName))))
194198
}
195199
})
196200
instanceThread.start()

src/main/scala/org/codeoverflow/chatoverflow/registry/TypeRegistry.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import org.codeoverflow.chatoverflow.WithLogger
44
import org.codeoverflow.chatoverflow.connector.Connector
55
import org.reflections.Reflections
66
import org.reflections.scanners.{SubTypesScanner, TypeAnnotationsScanner}
7-
import org.reflections.util.{ClasspathHelper, ConfigurationBuilder}
7+
import org.reflections.util.{ClasspathHelper, ConfigurationBuilder, FilterBuilder}
88

99
import scala.collection.mutable
1010
import scala.collection.mutable.ListBuffer
@@ -55,6 +55,8 @@ class TypeRegistry(requirementPackage: String) extends WithLogger {
5555
// Use reflection magic to get all impl-annotated classes
5656
val reflections: Reflections = new Reflections(new ConfigurationBuilder()
5757
.setUrls(ClasspathHelper.forPackage(requirementPackage))
58+
.filterInputsBy(new FilterBuilder()
59+
.includePackage(requirementPackage))
5860
.setScanners(new SubTypesScanner(), new TypeAnnotationsScanner()))
5961
val classes = reflections.getTypesAnnotatedWith(classOf[Impl])
6062

src/main/scala/org/codeoverflow/chatoverflow/requirement/service/tipeeestream/TipeeestreamConnector.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ class TipeeestreamConnector(override val sourceIdentifier: String) extends Conne
3535
val thread = Thread.currentThread
3636
socket = Some(IO.socket(SOCKET_URL).connect())
3737
socket.get.on(Socket.EVENT_CONNECT, (_: Any) => {
38-
logger info "Connected to TipeeStream Socket.io"
38+
logger info "Connected to TipeeeStream Socket.io"
3939
socket.get.emit("join-room", AUTH_OBJECT)
40-
logger info "emitted credentials to TipeeSetream Socket.io api"
40+
logger info "emitted credentials to TipeeeStream Socket.io api"
4141
socket.get.on("new-event", (objects: Array[AnyRef]) => {
4242
tipeeeStreamListener.onSocketEvent(objects)
4343
})
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@ import java.util.Currency
77
import org.codeoverflow.chatoverflow.WithLogger
88
import org.codeoverflow.chatoverflow.api.io.dto.User
99
import org.codeoverflow.chatoverflow.api.io.dto.stat.stream.tipeeestream.{TipeeestreamDonation, TipeeestreamFollow, TipeeestreamProvider, TipeeestreamSubscription}
10-
import org.codeoverflow.chatoverflow.api.io.event.stream.tipeeestream.{TipeeestramFollowEvent, TipeeestreamDonationEvent, TipeeestreamEvent, TipeeestreamSubscriptionEvent}
10+
import org.codeoverflow.chatoverflow.api.io.event.stream.tipeeestream.{TipeeestreamFollowEvent, TipeeestreamDonationEvent, TipeeestreamEvent, TipeeestreamSubscriptionEvent}
1111
import org.codeoverflow.chatoverflow.api.io.input.event.TipeeestreamEventInput
1212
import org.codeoverflow.chatoverflow.registry.Impl
1313
import org.codeoverflow.chatoverflow.requirement.impl.EventInputImpl
1414
import org.codeoverflow.chatoverflow.requirement.service.tipeeestream.TipeeestreamConnector
1515
import org.json.{JSONException, JSONObject}
1616

1717
@Impl(impl = classOf[TipeeestreamEventInput], connector = classOf[TipeeestreamConnector])
18-
class TipeestreamEventInputImpl extends EventInputImpl[TipeeestreamEvent, TipeeestreamConnector] with TipeeestreamEventInput with WithLogger {
18+
class TipeeestreamEventInputImpl extends EventInputImpl[TipeeestreamEvent, TipeeestreamConnector] with TipeeestreamEventInput with WithLogger {
1919
private val DATE_FORMATTER = new DateTimeFormatterBuilder()
2020
.parseCaseInsensitive().append(DateTimeFormatter.ISO_LOCAL_DATE_TIME).appendOffset("+HHMM", "Z").toFormatter
2121

@@ -72,7 +72,7 @@ class TipeestreamEventInputImpl extends EventInputImpl[TipeeestreamEvent, Tipeee
7272
val time = OffsetDateTime.parse(event.getString("created_at"), DATE_FORMATTER)
7373
val provider = TipeeestreamProvider.parse(event.getString("origin"))
7474
val follow = new TipeeestreamFollow(user, time, provider)
75-
call(new TipeeestramFollowEvent(follow))
75+
call(new TipeeestreamFollowEvent(follow))
7676
} catch {
7777
case e: JSONException =>
7878
logger warn "Error while parsing follow json:"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package org.codeoverflow.chatoverflow.requirement.service.twitch.chat
2+
3+
import org.pircbotx.hooks.events.{ConnectAttemptFailedEvent, ConnectEvent, NoticeEvent}
4+
import org.pircbotx.hooks.{Event, ListenerAdapter}
5+
6+
/**
7+
* Handles connection events for the TwitchChatConnector.
8+
* Calls the callback function once the bot connected and reports connection errors.
9+
* @param fn the callback which will be called once suitable event has been received.
10+
* The first param informs whether the connection could be established successfully
11+
* and the second param includes a error description if something has gone wrong.
12+
*/
13+
class TwitchChatConnectListener(fn: (Boolean, String) => Unit) extends ListenerAdapter {
14+
override def onEvent(event: Event): Unit = {
15+
event match {
16+
case _: ConnectEvent => fn(true, "")
17+
case e: ConnectAttemptFailedEvent => fn(false, "couldn't connect to irc chat server")
18+
case e: NoticeEvent =>
19+
if (e.getNotice.contains("authentication failed")) {
20+
fn(false, "authentication failed")
21+
}
22+
case _ =>
23+
}
24+
}
25+
}

src/main/scala/org/codeoverflow/chatoverflow/requirement/service/twitch/chat/TwitchChatConnector.scala

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@ import scala.collection.mutable.ListBuffer
1515
*/
1616
class TwitchChatConnector(override val sourceIdentifier: String) extends Connector(sourceIdentifier) with WithLogger {
1717
private val twitchChatListener = new TwitchChatListener
18+
private val connectionListener = new TwitchChatConnectListener(onConnect)
1819
private val oauthKey = "oauth"
1920
override protected var requiredCredentialKeys: List[String] = List(oauthKey)
2021
override protected var optionalCredentialKeys: List[String] = List()
2122
private var bot: PircBotX = _
23+
private var status: Option[(Boolean, String)] = None
2224
private val channels = ListBuffer[String]()
2325

2426
def addMessageEventListener(listener: MessageEvent => Unit): Unit = {
@@ -63,6 +65,7 @@ class TwitchChatConnector(override val sourceIdentifier: String) extends Connect
6365
.setName(credentials.get.credentialsIdentifier)
6466
.setServerPassword(password.getOrElse(""))
6567
.addListener(twitchChatListener)
68+
.addListener(connectionListener)
6669
.buildConfiguration()
6770
} else {
6871
logger error "No credentials set!"
@@ -71,33 +74,47 @@ class TwitchChatConnector(override val sourceIdentifier: String) extends Connect
7174

7275
}
7376

77+
/**
78+
* Gets called by the TwitchChatConnectListener when the bot has connected.
79+
* Saves the passed information into the status variable.
80+
*/
81+
private def onConnect(success: Boolean, msg: String): Unit = {
82+
status.synchronized {
83+
// tell the thread which starts the connector that the status has been reported
84+
status.notify()
85+
status = Some((success, msg))
86+
}
87+
}
88+
7489
/**
7590
* Starts the connector, e.g. creates a connection with its platform.
7691
*/
7792
override def start(): Boolean = {
7893
bot = new PircBotX(getConfig)
7994
startBot()
80-
true
8195
}
8296

83-
private def startBot(): Unit = {
84-
85-
var errorCount = 0
86-
97+
private def startBot(): Boolean = {
8798
new Thread(() => {
8899
bot.startBot()
89100
}).start()
90101

91-
while (bot.getState != PircBotX.State.CONNECTED && errorCount < 30) {
92-
logger info "Waiting while the bot is connecting..."
93-
Thread.sleep(100)
94-
errorCount += 1
102+
logger info "Waiting while the bot is connecting and logging in..."
103+
status.synchronized {
104+
status.wait(10000)
105+
}
106+
107+
if (status.isEmpty) {
108+
logger error "Bot couldn't connect within timeout of 10 seconds."
109+
return false
95110
}
96111

97-
if (errorCount >= 30) {
98-
logger error "Fatal. Unable to start bot."
112+
val (success, msg) = status.get
113+
if (!success) {
114+
logger error s"Bot couldn't connect. Reason: $msg."
99115
}
100116

117+
success
101118
}
102119

103120
/**
@@ -106,6 +123,8 @@ class TwitchChatConnector(override val sourceIdentifier: String) extends Connect
106123
override def stop(): Boolean = {
107124
bot.sendIRC().quitServer()
108125
bot.close()
126+
status = None
127+
channels.clear()
109128
true
110129
}
111130
}

src/main/scala/org/codeoverflow/chatoverflow/requirement/service/twitch/chat/impl/TwitchChatInputImpl.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ class TwitchChatInputImpl extends EventInputImpl[TwitchEvent, chat.TwitchChatCon
4545
val moderator = event.getV3Tags.get("mod") == "1"
4646
val broadcaster = event.getV3Tags.get("badges").contains("broadcaster/1")
4747
val turbo = event.getV3Tags.get("badges").contains("turbo/1")
48-
val author = new TwitchChatMessageAuthor(event.getUser.getNick, color, broadcaster, moderator, subscriber, turbo)
48+
val vip = event.getV3Tags.get("badges").contains("vip/1")
49+
val author = new TwitchChatMessageAuthor(event.getUser.getNick, color, broadcaster, moderator, subscriber, turbo, vip)
4950
val time = OffsetDateTime.ofInstant(Instant.ofEpochMilli(event.getTimestamp), ZoneOffset.UTC)
5051
val channel = new TextChannel(event.getChannelSource)
5152
val emoticons = new java.util.ArrayList[ChatEmoticon]()
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package org.codeoverflow.chatoverflow.ui.web.rest.events
2+
3+
case class EventMessage[T](action: String, data: T)
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package org.codeoverflow.chatoverflow.ui.web.rest.events
2+
3+
import java.io.PrintWriter
4+
import java.util.concurrent.ConcurrentHashMap
5+
6+
import javax.servlet.AsyncContext
7+
import javax.servlet.http.HttpServletRequest
8+
import org.codeoverflow.chatoverflow.ui.web.JsonServlet
9+
import org.scalatra.servlet.ScalatraAsyncSupport
10+
import org.scalatra.{BadRequest, Unauthorized}
11+
import org.scalatra.swagger.Swagger
12+
13+
class EventsController(implicit val swagger: Swagger) extends JsonServlet with ScalatraAsyncSupport with EventsControllerDefinition {
14+
private val connectionWriters = new ConcurrentHashMap[AsyncContext, PrintWriter]()
15+
16+
def broadcast(messageType: String, message: String = null): Unit = {
17+
connectionWriters.forEach((_, writer) => {
18+
try {
19+
sendMessage(writer, messageType, message)
20+
} catch {
21+
//probably lost or closed connection, remove from the list of connected clients
22+
case _: Throwable => connectionWriters.remove(writer)
23+
}
24+
})
25+
}
26+
27+
def closeConnections(): Unit = {
28+
connectionWriters.forEach((_, writer) => {
29+
try {
30+
sendMessage(writer, "close", null)
31+
writer.close()
32+
} finally {
33+
connectionWriters.remove(writer)
34+
}
35+
})
36+
}
37+
38+
private def sendMessage(writer: PrintWriter, messageType: String, message: String): Unit = {
39+
/*
40+
Every message has the following format and ends with two line feeds (\n):
41+
event: [name of event]
42+
data: [first line]
43+
data: [second line]
44+
...
45+
46+
See also: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#Examples
47+
*/
48+
49+
var msg = "event: " + messageType.replace("\n", "") + "\n"
50+
if (message != null)
51+
msg += "data: " + message.replace("\n", "\ndata: ") + "\n\n"
52+
writer.write(msg)
53+
writer.flush()
54+
}
55+
56+
get("/", operation(getEvents)) {
57+
val accept = request.getHeader("Accept")
58+
if (accept == null || !accept.replace(" ", "").split(",").contains("text/event-stream")) {
59+
status = 406
60+
} else {
61+
authParamRequired {
62+
contentType = "text/event-stream"
63+
64+
val asyncContext = request.startAsync()
65+
asyncContext.setTimeout(0)
66+
67+
val writer = asyncContext.getResponse.getWriter
68+
connectionWriters.put(asyncContext, writer)
69+
}
70+
}
71+
}
72+
73+
private def authParamRequired(func: => Any)(implicit request: HttpServletRequest): Any = {
74+
val authKeyKey = "authKey"
75+
76+
if (!request.parameters.contains(authKeyKey) || request.getParameter(authKeyKey).isEmpty) {
77+
BadRequest()
78+
} else if (request.getParameter(authKeyKey) != chatOverflow.credentialsService.generateAuthKey()) {
79+
Unauthorized()
80+
} else {
81+
func
82+
}
83+
}
84+
}

0 commit comments

Comments
 (0)