Skip to content
This repository has been archived by the owner on Dec 5, 2023. It is now read-only.

Commit

Permalink
feat(stomp): 支持subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
Nagico committed Jun 4, 2023
1 parent 8f762d4 commit 098ed54
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 23 deletions.
79 changes: 58 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ _✨ Author: [Nagico](https://github.com/Nagico/) ✨_

### 文档版本

| 版本 | 日期 | 作者 | 备注 |
|:------:|:---------:|:------:|:---------------:|
| v0.0.1 | 2023-5-10 | Nagico | 初稿 |
| v0.0.2 | 2023-6-4 | Nagico | 增加DEBUG CONNECT |
| 版本 | 日期 | 作者 | 备注 |
|:------:|:---------:|:------:|:------------------:|
| v0.0.1 | 2023-5-10 | Nagico | 初稿 |
| v0.0.2 | 2023-6-4 | Nagico | 增加DEBUG CONNECT |
| v0.1.0 | 2023-6-4 | Nagico | 支持SUBSCRIBE,取消自动订阅 |

### 规定

Expand Down Expand Up @@ -146,24 +147,9 @@ NULL
- heat-beat: 协商后采用的心跳包模式 服务端强制禁用心跳包
- user: 当前登录的用户 id

此时服务器会自动订阅 `{user_id}` 的目的地址,用于用户接受私人消息
~~此时服务器会自动订阅 `{user_id}` 的目的地址,用于用户接受私人消息~~

若用户存在未 ACK 的消息,此时服务器根据时间顺序依次发送消息原文

**接收若干 MESSAGE 帧**

```http request
MESSAGE
id:344465d0-8914-41cf-bea5-449b6ced5184
content-type:application/json
content-length:27
{"content":"1234","type":1}NULL
```

- id: message id

注意,确认消息接收后需要对消息进行 ACK 返回
v0.1.0 版本后,服务器不再自动订阅,而是在用户主动订阅后,根据时间顺序依次返回 `未 ACK` 的 MESSAGE 帧

#### 失败

Expand Down Expand Up @@ -224,6 +210,57 @@ user:2
NULL
```

### 订阅地址

v0.1.0版本后,用户需要手动${user_id}订阅地址,用于接收私人消息

**发送SUBSCRIBE帧**

```http request
SUBSCRIBE
id:${id}
destination:${user_id}
ack:client-individual
NULL
```

- id: 订阅id,~~用于取消订阅~~,暂无用处
- user_id: 用户id,需与已登陆id一致
- ack: 客户端确认模式,此处必须为 `client-individual`,即每个MESSAGE帧都需要独立的ACK

#### 成功

若用户存在未 ACK 的消息,此时服务器根据时间顺序依次发送消息原文

**接收若干 MESSAGE 帧**

```http request
MESSAGE
id:344465d0-8914-41cf-bea5-449b6ced5184
content-type:application/json
content-length:27
{"content":"1234","type":1}NULL
```

- id: message id

注意,确认消息接收后需要对消息进行 ACK 返回

#### 失败

- 订阅地址错误

**接收 ERROR 帧**

```http request
ERROR
message:permission_error
user 1 cannot subscribe to 2NULL
```

### 发送消息

**发送 SEND 帧**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class StompChatHandler : SimpleChannelInboundHandler<StompFrame>() {
when (inboundFrame.command()) { // 根据不同的命令进行处理
StompCommand.STOMP, StompCommand.CONNECT -> stompService.onConnect(ctx, inboundFrame) // 连接
StompCommand.SEND -> stompService.onSend(ctx, inboundFrame) // 发送
StompCommand.SUBSCRIBE -> stompService.onSubscribe(ctx, inboundFrame) // 订阅
StompCommand.DISCONNECT -> stompService.onDisconnect(ctx, inboundFrame) // 断开连接
StompCommand.ACK -> stompService.onAck(ctx, inboundFrame) // 确认消息
else -> throw StompCommandError("Received unsupported command ${inboundFrame.command()}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ interface StompService {
*/
fun onSend(ctx: ChannelHandlerContext, inboundFrame: StompFrame)

/**
* 处理SUBSCRIBE命令
*
* @param ctx Context
* @param inboundFrame 请求帧
*/
fun onSubscribe(ctx: ChannelHandlerContext, inboundFrame: StompFrame)

/**
* 传递消息 将消息发送给本服务端用户
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import cn.nagico.teamup.backend.stomp.exception.frame.StompHeadMissing
import cn.nagico.teamup.backend.service.StompMessageService
import cn.nagico.teamup.backend.service.StompService
import cn.nagico.teamup.backend.service.UserService
import cn.nagico.teamup.backend.stomp.exception.StompException
import cn.nagico.teamup.backend.stomp.exception.StompPermissionError
import io.netty.channel.ChannelFutureListener
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.stomp.DefaultStompFrame
Expand Down Expand Up @@ -79,8 +81,6 @@ class StompServiceImpl: StompService {


ctx.channel().attr(USER).set(user)
subscribe(user, ctx)

// 发送连接成功帧
val connectedFrame = DefaultStompFrame(StompCommand.CONNECTED)
connectedFrame.headers()
Expand All @@ -90,6 +90,17 @@ class StompServiceImpl: StompService {
.set(StompHeaders.HEART_BEAT, "0,0")
.set("user", user.toString())
ctx.writeAndFlush(connectedFrame)
}
override fun onSubscribe(ctx: ChannelHandlerContext, inboundFrame: StompFrame) {
val user = ctx.channel().attr(USER).get()!!

// 检查destination
val destination = getHeader(inboundFrame, StompHeaders.DESTINATION).toLong()
if (destination != user) {
throw StompPermissionError("user $user cannot subscribe to $destination")
}

subscribe(user, ctx)

// 发送未读消息
for (message in stompMessageService.fetchUnreceivedMessages(user)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ enum class StompExceptionType(val content: String) {
INVALID_VERSION("invalid_version"),
UNSUPPORTED_COMMAND("unsupported_command"),
AUTHENTICATION_ERROR("authentication_error"),
PERMISSION_ERROR("permission_error"),
UNKNOWN_ERROR("unknown_error"),
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package cn.nagico.teamup.backend.stomp.exception

open class StompPermissionError (
message: String? = null,
cause: Throwable? = null
): StompException(StompExceptionType.PERMISSION_ERROR, message, cause)

0 comments on commit 098ed54

Please sign in to comment.