Skip to content

Commit

Permalink
no socketid
Browse files Browse the repository at this point in the history
  • Loading branch information
bobohume committed May 2, 2021
1 parent 1c48760 commit 7539820
Show file tree
Hide file tree
Showing 37 changed files with 147 additions and 104 deletions.
2 changes: 1 addition & 1 deletion common/cluster/Cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (this *Cluster) Init(num int, info *common.ClusterInfo, Endpoints []string)

//链接断开
this.RegisterCall("DISCONNECT", func(ctx context.Context, ClusterId uint32) {
pInfo, bEx := this.m_ClusterInfoMap[info.Id()]
pInfo, bEx := this.m_ClusterInfoMap[ClusterId]
if bEx {
this.DelCluster(pInfo)
}
Expand Down
10 changes: 3 additions & 7 deletions common/cluster/ClusterServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,10 @@ func (this *ClusterServer) BindServer(pService *network.ServerSocket){
}

func (this *ClusterServer) sendPoint(head rpc.RpcHead, buff []byte){
if head.SocketId != 0{
pCluster:= this.GetCluster(head)
if pCluster != nil {
head.SocketId = pCluster.SocketId
this.m_pService.Send(head, buff)
}else{
pCluster:= this.GetCluster(head)
if pCluster != nil {
head.SocketId = pCluster.SocketId
this.m_pService.Send(head, buff)
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions rpc/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1139,4 +1139,12 @@ func unmarshalPB(bitstream *base.BitStream) (proto.Message, error) {
packet := reflect.New(proto.MessageType(packetName).Elem()).Interface().(proto.Message)
err := proto.Unmarshal(packetBuf, packet)
return packet, err
}

func UnmarshalPB(bitstream *base.BitStream) []byte{
bitstream.ReadInt(8)
bitstream.ReadString()
nLen := bitstream.ReadInt(32)
packetBuf := bitstream.ReadBits(nLen << 3)
return packetBuf
}
2 changes: 1 addition & 1 deletion rpc/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func getTypeString(param interface{}) string{
sType = getArrayTypeString(paramType.String(), true)
}else if paramType.Elem().Kind() == reflect.Slice{
sType = getSliceTypeString(paramType.String(), true)
}else if strings.Index(paramType.String(), "*rpc")!= -1{
}else if strings.Index(paramType.String(), "*rpc")!= -1 || strings.Index(paramType.String(), "*message")!= -1{
sType = "*rpc"
}else{
sType = "*gob"
Expand Down
7 changes: 7 additions & 0 deletions server/ServerInit.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
"gonet/server/world/player"
"gonet/server/world/social"
"gonet/server/world/toprank"
"gonet/server/zone"
"gonet/server/zone/game"
data2 "gonet/server/zone/game/data"
)


Expand All @@ -29,6 +32,10 @@ func InitMgr(serverName string){
player.SIMPLEMGR.Init(1000)
social.MGR().Init(1000)
actor.MGR.InitActorHandle(world.SERVER.GetServer())
}else if serverName == "zone"{
data2.InitRepository()
game.MAPMGR.Init(1000)
actor.MGR.InitActorHandle(zone.SERVER.GetServer())
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/account/AccountMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (this* AccountMgr) Init(num int){
LoginAccount := func(pAccount *Account) {
if pAccount != nil {
SERVER.GetLog().Printf("帐号[%s]返回登录OK", accountName)
SERVER.GetClusterMgr().SendMsg(rpc.RpcHead{SocketId:id}, "A_G_Account_Login", accountId, socketId)
SERVER.GetClusterMgr().SendMsg(rpc.RpcHead{ClusterId:id}, "A_G_Account_Login", accountId, socketId)
}
}

Expand Down
11 changes: 3 additions & 8 deletions server/account/ClusterMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"github.com/golang/protobuf/proto"
"gonet/actor"
"gonet/base"
"gonet/common"
"gonet/common/cluster"
"gonet/rpc"
"gonet/server/message"
"gonet/common"
)

type (
Expand All @@ -24,7 +24,7 @@ type (
func (this *ClusterManager) Init(num int){
this.Actor.Init(num)
//注册account集群
this.InitService(&common.ClusterInfo{Type:rpc.SERVICE_ACCOUNTSERVER, Ip:UserNetIP, Port:int32(base.Int(UserNetPort))}, EtcdEndpoints)
this.InitService(&common.ClusterInfo{Type: rpc.SERVICE_ACCOUNTSERVER, Ip:UserNetIP, Port:int32(base.Int(UserNetPort))}, EtcdEndpoints)
this.RegisterClusterCall()

this.Actor.Start()
Expand All @@ -48,11 +48,6 @@ func BoardCastToWorld(funcName string, params ...interface{}){

//发送到客户端
func SendToClient(head rpc.RpcHead, packet proto.Message){
buff := message.Encode(packet)
pakcetHead := packet.(message.Packet).GetPacketHead()
head.Id = pakcetHead.Id
head.DestServerType = rpc.SERVICE_GATESERVER
rpcPacket := &rpc.RpcPacket{FuncName:message.GetMessageName(packet), ArgLen:1, RpcHead:(*rpc.RpcHead)(&head), RpcBody:buff}
data, _ := proto.Marshal(rpcPacket)
SERVER.GetClusterMgr().Send(head, base.SetTcpEnd(data))
SERVER.GetClusterMgr().SendMsg(rpc.RpcHead{DestServerType:rpc.SERVICE_GATESERVER, Id:pakcetHead.Id}, message.GetMessageName(packet), packet)
}
10 changes: 5 additions & 5 deletions server/account/EventProcess.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (this *EventProcess) Init(num int) {
}
}
if nError != 0 {
SendToClient(rpc.RpcHead{SocketId:this.GetRpcHead(ctx).SocketId, ClusterId:socketId}, &message.A_C_RegisterResponse{
SendToClient(rpc.RpcHead{ClusterId:this.GetRpcHead(ctx).SrcClusterId, SocketId:socketId}, &message.A_C_RegisterResponse{
PacketHead: message.BuildPacketHead( accountId, 0),
Error: int32(nError),
})
Expand All @@ -86,7 +86,7 @@ func (this *EventProcess) Init(num int) {
passWd := rs.Row().String("password")
if password== passWd{
nError = base.NONE_ERROR
SERVER.GetAccountMgr().SendMsg(rpc.RpcHead{},"Account_Login", accountName, accountId, socketId, this.GetRpcHead(ctx).SocketId)
SERVER.GetAccountMgr().SendMsg(rpc.RpcHead{},"Account_Login", accountName, accountId, socketId, this.GetRpcHead(ctx).SrcClusterId)
}else{//密码错误
nError = base.PASSWORD_ERROR
}
Expand All @@ -100,7 +100,7 @@ func (this *EventProcess) Init(num int) {
}

if nError != base.NONE_ERROR {
SendToClient(rpc.RpcHead{SocketId:this.GetRpcHead(ctx).SocketId, ClusterId:socketId}, &message.A_C_LoginResponse{
SendToClient(rpc.RpcHead{ClusterId:this.GetRpcHead(ctx).SrcClusterId}, &message.A_C_LoginResponse{
PacketHead:message.BuildPacketHead( 0, 0 ),
Error:int32(nError),
AccountName:packet.AccountName,
Expand All @@ -109,11 +109,11 @@ func (this *EventProcess) Init(num int) {
})

//创建玩家
this.RegisterCall("W_A_CreatePlayer", func(ctx context.Context, accountId int64, playername string, sex int32, socketId uint32) {
this.RegisterCall("W_A_CreatePlayer", func(ctx context.Context, accountId int64, playername string, sex int32, gClusterId uint32) {
playerId := base.UUID.UUID()
_, err := this.m_db.Exec(fmt.Sprintf("insert into tbl_player (account_id, player_name, player_id) values (%d, '%s', %d)", accountId, playername, playerId))
if err == nil {
SendToWorld(this.GetRpcHead(ctx).SrcClusterId, "A_W_CreatePlayer", accountId, playerId, playername, sex, socketId)
SendToWorld(this.GetRpcHead(ctx).SrcClusterId, "A_W_CreatePlayer", accountId, playerId, playername, sex, gClusterId)
}
})

Expand Down
14 changes: 7 additions & 7 deletions server/bin/GONET_SERVER.CFG
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
[AccountDB]
DB_LANIP = 127.0.0.1;
DB_LANIP = 49.234.239.37;
DB_Name = md_account;
DB_UserId = root;
DB_Password = 123456;
DB_Password = Gonet1q2w3e4r();
DB_MaxIdleConns = 500;
DB_MaxOpenConns = 1000;

[WorldDB]
DB_LANIP = 127.0.0.1;
DB_LANIP = 49.234.239.37;
DB_Name = md_actor;
DB_UserId = root;
DB_Password = 123456;
DB_Password = Gonet1q2w3e4r();
DB_MaxIdleConns = 500;
DB_MaxOpenConns = 1000;

[Redis]
Redis_Open = true;
Redis_Host = 127.0.0.1;
Redis_Pwd = 123456;
Redis_Host = 49.234.239.37:6379;
Redis_Pwd = Gonet1q2w3e4r();

[Server]
Account_LANAddress = 127.0.0.1:31200;
Expand All @@ -32,6 +32,6 @@ Etcd_SnowFlake_Cluster = http://127.0.0.1:2379;


[127.0.0.1]
NetGate_WANAddress = 127.0.0.1:31700;
NetGate_WANAddress = 0.0.0.0:31700;


2 changes: 2 additions & 0 deletions server/bin/server.bat
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
start server.exe "world"
start server.exe "account"
start server.exe "netgate"
start server.exe "zone"

2 changes: 2 additions & 0 deletions server/bin/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ nohup ./server "account" &
sleep 1
nohup ./server "netgate" &
sleep 1
nohup ./server "zone" &
sleep 1
6 changes: 6 additions & 0 deletions server/client/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"gonet/common"
"gonet/rpc"
"gonet/server/message"
"strconv"
)

type (
Expand All @@ -30,6 +31,11 @@ func (this *CmdProcess) Init(num int) {
SendPacket(packet1)
})

this.RegisterCall("move", func(ctx context.Context, yaw string) {
ya, _ := strconv.ParseFloat(yaw, 32)
PACKET.Move(float32(ya), 100.0)
})

this.Actor.Start()
}

Expand Down
8 changes: 4 additions & 4 deletions server/login/LoginServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

type(
ServerMgr struct{
m_Inited bool
m_config base.Config
m_Log base.CLog
m_Inited bool
m_config base.Config
m_Log base.CLog
m_FileMonitor common.IFileMonitor
}

Expand Down Expand Up @@ -52,6 +52,6 @@ func (this *ServerMgr)Init() bool{
return &this.m_Log
}

func (this *ServerMgr) GetFileMonitor() common.IFileMonitor{
func (this *ServerMgr) GetFileMonitor() common.IFileMonitor {
return this.m_FileMonitor
}
6 changes: 6 additions & 0 deletions server/message/Ipacket.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ func Encode(packet proto.Message) []byte{
return data
}

func EncodeEx(packetName string, buf []byte) []byte{
packetId := base.GetMessageCode1(packetName)
data := append(base.IntToBytes(int(packetId)), buf...)
return data
}

func Decode(buff []byte) (uint32, []byte){
packetId := uint32(base.BytesToInt(buff[0:4]))
return packetId, buff[4:]
Expand Down
4 changes: 2 additions & 2 deletions server/netgate/AccountProcess.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"gonet/actor"
"gonet/base"
"gonet/rpc"
"gonet/common"
"gonet/rpc"
"strings"
)

Expand Down Expand Up @@ -39,7 +39,7 @@ func (this *AccountProcess) Init(num int) {
this.m_LostTimer.Start()
this.RegisterTimer(1 * 1000 * 1000 * 1000, this.Update)
this.RegisterCall("COMMON_RegisterRequest", func(ctx context.Context) {
SERVER.GetAccountCluster().SendMsg(rpc.RpcHead{ClusterId:this.m_ClusterId},"COMMON_RegisterRequest", &common.ClusterInfo{Type:rpc.SERVICE_GATESERVER, Ip:UserNetIP, Port:int32(base.Int(UserNetPort))})
SERVER.GetAccountCluster().SendMsg(rpc.RpcHead{ClusterId:this.m_ClusterId},"COMMON_RegisterRequest", &common.ClusterInfo{Type: rpc.SERVICE_GATESERVER, Ip:UserNetIP, Port:int32(base.Int(UserNetPort))})
})

this.RegisterCall("COMMON_RegisterResponse", func(ctx context.Context) {
Expand Down
6 changes: 4 additions & 2 deletions server/netgate/DispatchPacket.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ func DispatchPacket(id uint32, buff []byte) bool{
case rpc.SERVICE_WORLDSERVER:
SERVER.GetWorldCluster().Send(head, base.SetTcpEnd(buff))
default:
bitstream := base.NewBitStream(rpcPacket.RpcBody, len(rpcPacket.RpcBody))
buff := message.EncodeEx(rpcPacket.FuncName, rpc.UnmarshalPB(bitstream))
if rpcPacket.FuncName == A_C_RegisterResponse || rpcPacket.FuncName == A_C_LoginResponse {
SERVER.GetServer().Send(rpc.RpcHead{SocketId:head.ClusterId}, base.SetTcpEnd(rpcPacket.RpcBody))
SERVER.GetServer().Send(rpc.RpcHead{SocketId:head.SocketId}, base.SetTcpEnd(buff))
}else{
socketId := SERVER.GetPlayerMgr().GetSocket(head.Id)
SERVER.GetServer().Send(rpc.RpcHead{SocketId:socketId}, base.SetTcpEnd(rpcPacket.RpcBody))
SERVER.GetServer().Send(rpc.RpcHead{SocketId:socketId}, base.SetTcpEnd(buff))
}
}

Expand Down
14 changes: 7 additions & 7 deletions server/netgate/NetGateServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ func (this *ServerMgr) GetCluster () *cluster.Service {
return this.m_Cluster
}

func (this *ServerMgr) GetWorldCluster() *cluster.Cluster{
func (this *ServerMgr) GetWorldCluster() *cluster.Cluster {
return this.m_WorldCluster
}

func (this *ServerMgr) GetAccountCluster() *cluster.Cluster{
func (this *ServerMgr) GetAccountCluster() *cluster.Cluster {
return this.m_AccountCluster
}

func (this *ServerMgr) GetZoneCluster() *cluster.Cluster{
func (this *ServerMgr) GetZoneCluster() *cluster.Cluster {
return this.m_ZoneCluster
}

Expand Down Expand Up @@ -115,23 +115,23 @@ func (this *ServerMgr)Init() bool{
this.m_pService.BindPacketFunc(packet.PacketFunc)
this.m_pService.Start()*/
//注册到集群服务器
this.m_Cluster = cluster.NewService( &common.ClusterInfo{Type:rpc.SERVICE_GATESERVER, Ip:UserNetIP, Port:int32(base.Int(UserNetPort))}, EtcdEndpoints)
this.m_Cluster = cluster.NewService( &common.ClusterInfo{Type: rpc.SERVICE_GATESERVER, Ip:UserNetIP, Port:int32(base.Int(UserNetPort))}, EtcdEndpoints)

//世界服务器集群
this.m_WorldCluster = new(cluster.Cluster)
this.m_WorldCluster.Init(1000, &common.ClusterInfo{Type:rpc.SERVICE_WORLDSERVER}, EtcdEndpoints)
this.m_WorldCluster.Init(1000, &common.ClusterInfo{Type: rpc.SERVICE_WORLDSERVER}, EtcdEndpoints)
this.m_WorldCluster.BindPacket(&WorldProcess{})
this.m_WorldCluster.BindPacketFunc(DispatchPacket)

//账号服务器集群
this.m_AccountCluster = new(cluster.Cluster)
this.m_AccountCluster.Init(1000, &common.ClusterInfo{Type:rpc.SERVICE_ACCOUNTSERVER}, EtcdEndpoints)
this.m_AccountCluster.Init(1000, &common.ClusterInfo{Type: rpc.SERVICE_ACCOUNTSERVER}, EtcdEndpoints)
this.m_AccountCluster.BindPacket(&AccountProcess{})
this.m_AccountCluster.BindPacketFunc(DispatchPacket)

//战斗服务器集群
this.m_ZoneCluster = new(cluster.Cluster)
this.m_ZoneCluster.Init(1000, &common.ClusterInfo{Type:rpc.SERVICE_ZONESERVER}, EtcdEndpoints)
this.m_ZoneCluster.Init(1000, &common.ClusterInfo{Type: rpc.SERVICE_ZONESERVER}, EtcdEndpoints)
this.m_ZoneCluster.BindPacket(&ZoneProcess{})
this.m_ZoneCluster.BindPacketFunc(DispatchPacket)

Expand Down
2 changes: 1 addition & 1 deletion server/netgate/PlayerMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (this *PlayerManager) Init(num int){
if len(accountMap) != 0{
for i, v := range accountMap {
SERVER.GetWorldCluster().SendMsg(v, "G_W_Relogin", &message.RpcHead{Id:i})
SERVER.GetWorldCluster().SendMsg(v, "G_W_Relogin", &rpc.RpcHead{Id:i})
}
}
})*/
Expand Down
2 changes: 1 addition & 1 deletion server/netgate/UserProcess.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (this *UserPrcoess) PacketFunc(socketid uint32, buff []byte) bool{
}

packetName := message.GetMessageName(packet)
head := rpc.RpcHead{Id:packetHead.Id}
head := rpc.RpcHead{Id:packetHead.Id, SrcClusterId:SERVER.GetCluster().Id()}
if packetName == C_A_LoginRequest{
head.ClusterId = socketid
}else if packetName == C_A_RegisterRequest {
Expand Down
4 changes: 2 additions & 2 deletions server/netgate/WorldProcess.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"gonet/actor"
"gonet/base"
"gonet/rpc"
"gonet/common"
"gonet/rpc"
)

type (
Expand Down Expand Up @@ -34,7 +34,7 @@ func (this *WorldProcess) Init(num int) {
this.m_ClusterId = 0
this.RegisterTimer(1 * 1000 * 1000 * 1000, this.Update)
this.RegisterCall("COMMON_RegisterRequest", func(ctx context.Context) {
SERVER.GetWorldCluster().SendMsg(rpc.RpcHead{ClusterId:this.m_ClusterId},"COMMON_RegisterRequest", &common.ClusterInfo{Type:rpc.SERVICE_GATESERVER, Ip:UserNetIP, Port:int32(base.Int(UserNetPort))})
SERVER.GetWorldCluster().SendMsg(rpc.RpcHead{ClusterId:this.m_ClusterId},"COMMON_RegisterRequest", &common.ClusterInfo{Type: rpc.SERVICE_GATESERVER, Ip:UserNetIP, Port:int32(base.Int(UserNetPort))})
})

this.RegisterCall("COMMON_RegisterResponse", func(ctx context.Context) {
Expand Down
4 changes: 2 additions & 2 deletions server/netgate/ZoneProcess.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"gonet/actor"
"gonet/base"
"gonet/rpc"
"gonet/common"
"gonet/rpc"
)

type (
Expand Down Expand Up @@ -34,7 +34,7 @@ func (this *ZoneProcess) Init(num int) {
this.m_ClusterId = 0
this.RegisterTimer(1 * 1000 * 1000 * 1000, this.Update)
this.RegisterCall("COMMON_RegisterRequest", func(ctx context.Context) {
SERVER.GetZoneCluster().SendMsg(rpc.RpcHead{ClusterId:this.m_ClusterId},"COMMON_RegisterRequest", &common.ClusterInfo{Type:rpc.SERVICE_GATESERVER, Ip:UserNetIP, Port:int32(base.Int(UserNetPort))})
SERVER.GetZoneCluster().SendMsg(rpc.RpcHead{ClusterId:this.m_ClusterId},"COMMON_RegisterRequest", &common.ClusterInfo{Type: rpc.SERVICE_GATESERVER, Ip:UserNetIP, Port:int32(base.Int(UserNetPort))})
})

this.RegisterCall("COMMON_RegisterResponse", func(ctx context.Context) {
Expand Down
Loading

0 comments on commit 7539820

Please sign in to comment.