Skip to content

Commit dd9b92e

Browse files
committed
add tcp frame parser
1 parent fb1ce43 commit dd9b92e

File tree

2 files changed

+152
-0
lines changed

2 files changed

+152
-0
lines changed

build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ kotlin {
5151
dependencies {
5252
implementation("io.vertx:vertx-core:$vertxVersion")
5353
implementation("io.vertx:vertx-codegen:$vertxVersion")
54+
implementation("io.vertx:vertx-tcp-eventbus-bridge:$vertxVersion")
5455
implementation(files(".ext/vertx-service-discovery-4.0.3-SNAPSHOT.jar"))
5556
implementation(files(".ext/vertx-service-proxy-4.0.2.jar"))
5657
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jacksonVersion")
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package spp.protocol.extend
2+
3+
import spp.protocol.error.*
4+
import spp.protocol.error.LiveInstrumentException.ErrorType
5+
import io.vertx.core.AsyncResult
6+
import io.vertx.core.Handler
7+
import io.vertx.core.Vertx
8+
import io.vertx.core.eventbus.DeliveryOptions
9+
import io.vertx.core.eventbus.ReplyException
10+
import io.vertx.core.eventbus.ReplyFailure
11+
import io.vertx.core.json.JsonObject
12+
import io.vertx.core.net.NetSocket
13+
import io.vertx.ext.bridge.BridgeEventType
14+
import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameHelper
15+
16+
class TCPServiceFrameParser(val vertx: Vertx, val socket: NetSocket) : Handler<AsyncResult<JsonObject>> {
17+
18+
// companion object {
19+
// private val log = LoggerFactory.getLogger(TCPServiceFrameParser::class.java)
20+
// }
21+
22+
override fun handle(event: AsyncResult<JsonObject>) {
23+
if (event.failed()) {
24+
//log.error("Failed to receive frame", event.cause())
25+
return
26+
}
27+
val frame = event.result()
28+
//log.trace("Received frame: {}", frame)
29+
30+
//todo: revisit this || after fixing below todo
31+
if ("message" == frame.getString("type") || "send" == frame.getString("type")) {
32+
if (frame.getString("replyAddress") != null) {
33+
val deliveryOptions = DeliveryOptions()
34+
frame.getJsonObject("headers").fieldNames().forEach {
35+
deliveryOptions.addHeader(it, frame.getJsonObject("headers").getString(it))
36+
}
37+
vertx.eventBus().request<Any>(
38+
frame.getString("address"),
39+
frame.getJsonObject("body"),
40+
deliveryOptions
41+
).onComplete {
42+
if (it.succeeded()) {
43+
FrameHelper.sendFrame(
44+
BridgeEventType.SEND.name.toLowerCase(),
45+
frame.getString("replyAddress"),
46+
JsonObject.mapFrom(it.result().body()),
47+
socket
48+
)
49+
} else {
50+
FrameHelper.sendFrame(
51+
BridgeEventType.SEND.name.toLowerCase(),
52+
frame.getString("replyAddress"),
53+
JsonObject.mapFrom(it.cause()),
54+
socket
55+
)
56+
}
57+
}
58+
} else {
59+
val body = frame.getJsonObject("body")
60+
if (body.fieldNames().size == 1 && body.containsKey("value")) {
61+
//todo: understand why can't just re-send body like below
62+
vertx.eventBus()
63+
.send("local." + frame.getString("address"), body.getValue("value"))
64+
} else {
65+
vertx.eventBus()
66+
.send("local." + frame.getString("address"), body)
67+
}
68+
}
69+
} else if ("err" == frame.getString("type")) {
70+
//directly thrown event bus exceptions
71+
handleErrorFrame(frame)
72+
} else {
73+
throw UnsupportedOperationException(frame.toString())
74+
}
75+
}
76+
77+
private fun handleErrorFrame(frame: JsonObject) {
78+
if (frame.getString("message")?.startsWith("EventBusException:") == true) {
79+
val rawFailure = frame.getString("rawFailure")
80+
val failureCode = frame.getInteger("failureCode")
81+
val error = ReplyException(
82+
ReplyFailure.RECIPIENT_FAILURE,
83+
failureCode,
84+
rawFailure
85+
)
86+
87+
val causeMessage = frame.getString("message")!!
88+
val exceptionType = causeMessage.substringAfter("EventBusException:")
89+
.substringBefore("[")
90+
val exceptionParams = causeMessage.substringAfter("[").substringBefore("]")
91+
val exceptionMessage = causeMessage.substringAfter("]: ").trimEnd()
92+
if (exceptionType == "LiveInstrumentException") {
93+
error.initCause(
94+
LiveInstrumentException(ErrorType.valueOf(exceptionParams), exceptionMessage)
95+
)
96+
} else {
97+
TODO()
98+
}
99+
vertx.eventBus()
100+
.send("local." + frame.getString("address"), error)
101+
} else {
102+
//i think these are service exceptions
103+
val error = ReplyException(
104+
ReplyFailure.RECIPIENT_FAILURE,
105+
frame.getInteger("failureCode"),
106+
frame.getString("rawFailure")
107+
)
108+
var debugInfo = JsonObject(frame.getString("rawFailure")).getJsonObject("debugInfo")
109+
if (frame.getString("message").contains("JWT")) {
110+
error.initCause(JWTVerificationException(frame.getString("message")))
111+
} else if (debugInfo == null) {
112+
debugInfo = JsonObject().put(
113+
"causeMessage", JsonObject(frame.getString("message")).getString("message")
114+
)
115+
}
116+
117+
if (debugInfo.getString("causeName") == MissingRemoteException::class.java.name) {
118+
error.initCause(MissingRemoteException(debugInfo.getString("causeMessage")))
119+
} else {
120+
val causeMessage = debugInfo.getString("causeMessage")
121+
if (causeMessage?.startsWith("EventBusException:") == true) {
122+
val exceptionType = causeMessage.substringAfter("EventBusException:")
123+
.substringBefore("[")
124+
val exceptionParams = causeMessage.substringAfter("[").substringBefore("]")
125+
val exceptionMessage = causeMessage.substringAfter("]: ").trimEnd()
126+
when (exceptionType) {
127+
LiveInstrumentException::class.simpleName -> {
128+
error.initCause(
129+
LiveInstrumentException(
130+
ErrorType.valueOf(exceptionParams),
131+
exceptionMessage
132+
)
133+
)
134+
}
135+
InstrumentAccessDenied::class.simpleName -> {
136+
error.initCause(InstrumentAccessDenied(exceptionParams))
137+
}
138+
AccessDenied::class.simpleName -> {
139+
error.initCause(AccessDenied(exceptionParams))
140+
}
141+
else -> TODO()
142+
}
143+
} else {
144+
TODO()
145+
}
146+
}
147+
vertx.eventBus()
148+
.send("local." + frame.getString("address"), error)
149+
}
150+
}
151+
}

0 commit comments

Comments
 (0)