Skip to content
This repository was archived by the owner on Aug 18, 2020. It is now read-only.

Commit 5dddb24

Browse files
committed
Add Tipeeestream event input (See: #43)
1 parent b743a58 commit 5dddb24

File tree

6 files changed

+213
-195
lines changed

6 files changed

+213
-195
lines changed

src/main/scala/org/codeoverflow/chatoverflow/requirement/service/tipeeestream/TipeeeStreamConnector.scala

Lines changed: 0 additions & 128 deletions
This file was deleted.

src/main/scala/org/codeoverflow/chatoverflow/requirement/service/tipeeestream/TipeeeStreamListener.scala

Lines changed: 0 additions & 18 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package org.codeoverflow.chatoverflow.requirement.service.tipeeestream
2+
3+
import java.util.Calendar
4+
5+
import io.socket.client.Socket
6+
import org.codeoverflow.chatoverflow.WithLogger
7+
import org.codeoverflow.chatoverflow.connector.Connector
8+
import org.json.JSONObject
9+
10+
/**
11+
* The tipeeestream connector connects to the socket.io service to work with incoming events.
12+
*
13+
* @param sourceIdentifier the name of the tipeeestream account
14+
*/
15+
class TipeeestreamConnector(override val sourceIdentifier: String) extends Connector(sourceIdentifier) with WithLogger {
16+
private val TIMEOUT = 10000
17+
private val tipeeeStreamListener = new TipeeestreamListener
18+
override protected var requiredCredentialKeys: List[String] = List("apiKey", "username")
19+
override protected var optionalCredentialKeys: List[String] = List()
20+
private var socket: Option[Socket] = None
21+
22+
override def start(): Boolean = {
23+
//RestAPI doesn't need stratup methods
24+
startSocket()
25+
}
26+
27+
/**
28+
* Start the socket.io socket
29+
*
30+
* @return if the socket could start successfully
31+
*/
32+
private def startSocket(): Boolean = {
33+
var connected: Option[Boolean] = None
34+
val thread = Thread.currentThread
35+
socket.get.on(Socket.EVENT_CONNECT, (_: Any) => {
36+
logger info "Connected to TipeeStream Socket.io"
37+
socket.get.emit("join-room", AUTH_OBJECT)
38+
logger info "emitted credentials to TipeeSetream Socket.io api"
39+
socket.get.on("new-event", (objects: Array[AnyRef]) => {
40+
tipeeeStreamListener.onSocketEvent(objects)
41+
})
42+
connected = Some(true)
43+
connected.notifyAll()
44+
})
45+
socket.get.on(Socket.EVENT_CONNECT_ERROR, (e: Any) => {
46+
logger warn s"Could not connect to TipeeeStream socket:"
47+
logger warn e.asInstanceOf[Array[Object]].mkString(",")
48+
connected = Some(false)
49+
connected.notifyAll()
50+
})
51+
socket.get.on(Socket.EVENT_CONNECT_TIMEOUT, (_: Any) => {
52+
logger warn s"$sourceIdentifier socket timed out"
53+
})
54+
socket.get.on(Socket.EVENT_ERROR, (e: Any) => {
55+
logger warn s"$sourceIdentifier socket error:"
56+
e match {
57+
case array: Array[Any] => logger warn array.mkString(", ")
58+
case other => logger warn other.toString
59+
}
60+
})
61+
val start = Calendar.getInstance.getTimeInMillis
62+
while (connected.isEmpty && start + TIMEOUT > Calendar.getInstance.getTimeInMillis) connected.wait(TIMEOUT)
63+
connected.getOrElse({
64+
logger warn "Could not connect to TipeeeStream socket: Timed out!"
65+
false
66+
})
67+
}
68+
69+
private def AUTH_OBJECT: JSONObject = {
70+
val obj = new JSONObject()
71+
obj.put("room", credentials.get.getValue("apiKey").get)
72+
obj.put("username", credentials.get.getValue("username").get)
73+
obj
74+
}
75+
76+
def addSubscriptionEventListener(listener: JSONObject => Unit): Unit = tipeeeStreamListener.addSubscriptionEventListener(listener)
77+
78+
def addDonationEventListener(listener: JSONObject => Unit): Unit = tipeeeStreamListener.addDonationEventListener(listener)
79+
80+
def addFollowEventListener(listener: JSONObject => Unit): Unit = tipeeeStreamListener.addFollowEventListener(listener)
81+
82+
override def stop(): Boolean = {
83+
socket.foreach(_.close())
84+
true
85+
}
86+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package org.codeoverflow.chatoverflow.requirement.service.tipeeestream
2+
3+
4+
import org.json.JSONObject
5+
6+
import scala.collection.mutable.ListBuffer
7+
8+
class TipeeestreamListener {
9+
10+
private val subscriptionEventListeners = ListBuffer[JSONObject => Unit]()
11+
private val donationEventListeners = ListBuffer[JSONObject => Unit]()
12+
private val followEventListeners = ListBuffer[JSONObject => Unit]()
13+
14+
def addSubscriptionEventListener(listener: JSONObject => Unit): Unit = {
15+
subscriptionEventListeners += listener
16+
}
17+
18+
def addDonationEventListener(listener: JSONObject => Unit): Unit = {
19+
donationEventListeners += listener
20+
}
21+
22+
def addFollowEventListener(listener: JSONObject => Unit): Unit = {
23+
followEventListeners += listener
24+
}
25+
26+
def onSocketEvent(objects : Array[AnyRef]) : Unit = {
27+
val json: JSONObject = objects(0).asInstanceOf[JSONObject]
28+
val event: JSONObject = json.getJSONObject("event")
29+
val eventType: String = event.getString("type")
30+
eventType match {
31+
case "subscription" =>
32+
subscriptionEventListeners.foreach(_(event))
33+
case "donation" =>
34+
donationEventListeners.foreach(_(event))
35+
case "follow" =>
36+
followEventListeners.foreach(_(event))
37+
case _ =>
38+
}
39+
}
40+
}

src/main/scala/org/codeoverflow/chatoverflow/requirement/service/tipeeestream/impl/TipeeeStreamInputImpl.scala

Lines changed: 0 additions & 49 deletions
This file was deleted.

0 commit comments

Comments
 (0)