Skip to content

Commit

Permalink
Beta468 (#470)
Browse files Browse the repository at this point in the history
* beta447

* beta448

* beta449

* beta450

* beta451

* beta452

* beta453

* beta454

* beta455

* btea455

* beta456

* beta457

* beta458

* beta460

* beta460

* beta461

* beta462

* beta463

* beta464

* beta465

* beta467

* beta468
  • Loading branch information
Hoshinonyaruko authored Jul 21, 2024
1 parent 09c6d85 commit 07c5fa9
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 2 deletions.
97 changes: 97 additions & 0 deletions botgo/sessions/multi/multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package multi

import (
"sync"
"time"

"github.com/tencent-connect/botgo/dto"
"github.com/tencent-connect/botgo/log"
"github.com/tencent-connect/botgo/sessions/manager"
"github.com/tencent-connect/botgo/token"
"github.com/tencent-connect/botgo/websocket"
)

type ShardManager struct {
Sessions []dto.Session
SessionChans []chan dto.Session
Clients []websocket.WebSocket
APInfo *dto.WebsocketAP
Token *token.Token
Intents *dto.Intent
StartInterval time.Duration
wg sync.WaitGroup
}

func NewShardManager(apInfo *dto.WebsocketAP, token *token.Token, intents *dto.Intent) *ShardManager {
m := &ShardManager{
APInfo: apInfo,
Token: token,
Intents: intents,
Sessions: make([]dto.Session, apInfo.Shards),
Clients: make([]websocket.WebSocket, apInfo.Shards),
SessionChans: make([]chan dto.Session, apInfo.Shards),
}
for i := range m.Sessions {
m.SessionChans[i] = make(chan dto.Session, 1)
}
m.StartInterval = manager.CalcInterval(apInfo.SessionStartLimit.MaxConcurrency)
return m
}

func (sm *ShardManager) StartAllShards() {
for i := uint32(0); i < sm.APInfo.Shards; i++ {
sm.StartShard(i)
}
sm.wg.Wait()
}

func (sm *ShardManager) StartShard(shardID uint32) {
sm.wg.Add(1)
go func() {
defer sm.wg.Done()
session := dto.Session{
URL: sm.APInfo.URL,
Token: *sm.Token,
Intent: *sm.Intents,
LastSeq: 0,
Shards: dto.ShardConfig{
ShardID: shardID,
ShardCount: sm.APInfo.Shards,
},
}
sm.Sessions[shardID] = session
sm.SessionChans[shardID] <- session

for session := range sm.SessionChans[shardID] {
time.Sleep(sm.StartInterval)
sm.newConnect(session, shardID)
}
}()
}

func (sm *ShardManager) newConnect(session dto.Session, shardID uint32) {
wsClient := websocket.ClientImpl.New(session)
sm.Clients[shardID] = wsClient
if err := wsClient.Connect(); err != nil {
log.Error(err)
sm.SessionChans[shardID] <- session // Reconnect
return
}
if session.ID != "" {
err := wsClient.Resume()
if err != nil {
log.Errorf("[ws/session] Resume error: %+v", err)
return
}
} else {
err := wsClient.Identify()
if err != nil {
log.Errorf("[ws/session] Identify error: %+v", err)
return
}
}
if err := wsClient.Listening(); err != nil {
log.Errorf("[ws/session] Listening error: %+v", err)
sm.SessionChans[shardID] <- session // Reconnect
}
}
9 changes: 7 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/hoshinonyaruko/gensokyo/url"
"github.com/hoshinonyaruko/gensokyo/webui"
"github.com/hoshinonyaruko/gensokyo/wsclient"
"github.com/tencent-connect/botgo/sessions/multi"
"google.golang.org/grpc"

"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -283,8 +284,12 @@ func main() {
if conf.Settings.ShardCount == 1 {
go func() {
wsInfo.Shards = uint32(conf.Settings.ShardNum)
if err = botgo.NewSessionManager().Start(wsInfo, token, &intent); err != nil {
log.Fatalln(err)
if wsInfo.Shards == 1 {
if err = botgo.NewSessionManager().Start(wsInfo, token, &intent); err != nil {
log.Fatalln(err)
}
} else {
multi.NewShardManager(wsInfo, token, &intent).StartAllShards()
}
}()
log.Printf("不使用分片,所有信息都由当前gensokyo处理...\n")
Expand Down

0 comments on commit 07c5fa9

Please sign in to comment.