Skip to content

Commit

Permalink
Merge pull request #141 from fangyincheng/develop
Browse files Browse the repository at this point in the history
add task pool for getty
  • Loading branch information
AlexStocks authored Jul 28, 2019
2 parents 8ea2e60 + 8dd9f77 commit 688a156
Show file tree
Hide file tree
Showing 20 changed files with 52 additions and 22 deletions.
7 changes: 6 additions & 1 deletion config/testdata/consumer_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,19 @@ protocol_conf:
session_timeout: "20s"
pool_size: 64
pool_ttl: 600
# gr_pool_size is recommended to be set to [cpu core number] * 100
gr_pool_size: 1200
# queue_len is recommended to be set to 64 or 128
queue_len: 64
# queue_number is recommended to be set to gr_pool_size / 20
queue_number: 60
getty_session_param:
compress_encoding: false
tcp_no_delay: true
tcp_keep_alive: true
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
Expand Down
1 change: 0 additions & 1 deletion config/testdata/consumer_config_with_configcenter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
Expand Down
7 changes: 6 additions & 1 deletion config/testdata/provider_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,19 @@ protocol_conf:
dubbo:
session_number: 700
session_timeout: "20s"
# gr_pool_size is recommended to be set to [cpu core number] * 10
gr_pool_size: 120
# queue_len is recommended to be set to 64 or 128
queue_len: 64
# queue_number is recommended to be set to gr_pool_size / 20
queue_number: 6
getty_session_param:
compress_encoding: false
tcp_no_delay: true
tcp_keep_alive: true
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
Expand Down
1 change: 0 additions & 1 deletion examples/dubbo/go-client/profiles/dev/client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
Expand Down
1 change: 0 additions & 1 deletion examples/dubbo/go-client/profiles/release/client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
Expand Down
1 change: 0 additions & 1 deletion examples/dubbo/go-client/profiles/test/client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
Expand Down
1 change: 0 additions & 1 deletion examples/dubbo/go-server/profiles/dev/server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
Expand Down
1 change: 0 additions & 1 deletion examples/dubbo/go-server/profiles/release/server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
Expand Down
1 change: 0 additions & 1 deletion examples/dubbo/go-server/profiles/test/server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ protocol_conf:
keep_alive_period: "120s"
tcp_r_buf_size: 262144
tcp_w_buf_size: 65536
pkg_rq_size: 1024
pkg_wq_size: 512
tcp_read_timeout: "1s"
tcp_write_timeout: "5s"
Expand Down
13 changes: 12 additions & 1 deletion protocol/dubbo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

import (
"github.com/dubbogo/getty"
"github.com/dubbogo/gost/sync"
"github.com/dubbogo/hessian2"
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
Expand All @@ -45,7 +46,8 @@ var (
errClientClosed = perrors.New("client closed")
errClientReadTimeout = perrors.New("client read timeout")

clientConf *ClientConfig
clientConf *ClientConfig
clientGrpool *gxsync.TaskPool
)

func init() {
Expand Down Expand Up @@ -78,6 +80,7 @@ func init() {
}

clientConf = conf
setClientGrpool()
}

func SetClientConf(c ClientConfig) {
Expand All @@ -87,12 +90,20 @@ func SetClientConf(c ClientConfig) {
logger.Warnf("[ClientConfig CheckValidity] error: %v", err)
return
}
setClientGrpool()
}

func GetClientConf() ClientConfig {
return *clientConf
}

func setClientGrpool() {
if clientConf.GrPoolSize > 1 {
clientGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber))
}
}

type Options struct {
// connect timeout
ConnectTimeout time.Duration
Expand Down
2 changes: 0 additions & 2 deletions protocol/dubbo/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) {
KeepAlivePeriod: "120s",
TcpRBufSize: 262144,
TcpWBufSize: 65536,
PkgRQSize: 1024,
PkgWQSize: 512,
TcpReadTimeout: "4s",
TcpWriteTimeout: "5s",
Expand All @@ -193,7 +192,6 @@ func InitTest(t *testing.T) (protocol.Protocol, common.URL) {
KeepAlivePeriod: "120s",
TcpRBufSize: 262144,
TcpWBufSize: 65536,
PkgRQSize: 1024,
PkgWQSize: 512,
TcpReadTimeout: "1s",
TcpWriteTimeout: "5s",
Expand Down
11 changes: 10 additions & 1 deletion protocol/dubbo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type (
keepAlivePeriod time.Duration
TcpRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"`
TcpWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"`
PkgRQSize int `default:"1024" yaml:"pkg_rq_size" json:"pkg_rq_size,omitempty"`
PkgWQSize int `default:"1024" yaml:"pkg_wq_size" json:"pkg_wq_size,omitempty"`
TcpReadTimeout string `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"`
tcpReadTimeout time.Duration
Expand All @@ -53,6 +52,11 @@ type (
sessionTimeout time.Duration
SessionNumber int `default:"1000" yaml:"session_number" json:"session_number,omitempty"`

// grpool
GrPoolSize int `default:"0" yaml:"gr_pool_size" json:"gr_pool_size,omitempty"`
QueueLen int `default:"0" yaml:"queue_len" json:"queue_len,omitempty"`
QueueNumber int `default:"0" yaml:"queue_number" json:"queue_number,omitempty"`

// session tcp parameters
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
}
Expand All @@ -76,6 +80,11 @@ type (
PoolSize int `default:"2" yaml:"pool_size" json:"pool_size,omitempty"`
PoolTTL int `default:"180" yaml:"pool_ttl" json:"pool_ttl,omitempty"`

// grpool
GrPoolSize int `default:"0" yaml:"gr_pool_size" json:"gr_pool_size,omitempty"`
QueueLen int `default:"0" yaml:"queue_len" json:"queue_len,omitempty"`
QueueNumber int `default:"0" yaml:"queue_number" json:"queue_number,omitempty"`

// session tcp parameters
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
}
Expand Down
3 changes: 2 additions & 1 deletion protocol/dubbo/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,15 @@ func (c *gettyRPCClient) newSession(session getty.Session) error {
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(NewRpcClientPackageHandler(c.pool.rpcClient))
session.SetEventListener(NewRpcClientHandler(c))
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("client new session:%s\n", session.Stat())

session.SetTaskPool(clientGrpool)

return nil
}

Expand Down
18 changes: 16 additions & 2 deletions protocol/dubbo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

import (
"github.com/dubbogo/getty"
"github.com/dubbogo/gost/sync"
"gopkg.in/yaml.v2"
)

Expand All @@ -33,7 +34,10 @@ import (
"github.com/apache/dubbo-go/config"
)

var srvConf *ServerConfig
var (
srvConf *ServerConfig
srvGrpool *gxsync.TaskPool
)

func init() {

Expand Down Expand Up @@ -64,6 +68,7 @@ func init() {
}

srvConf = conf
SetServerGrpool()
}

func SetServerConfig(s ServerConfig) {
Expand All @@ -73,12 +78,20 @@ func SetServerConfig(s ServerConfig) {
logger.Warnf("[ServerConfig CheckValidity] error: %v", err)
return
}
SetServerGrpool()
}

func GetServerConfig() ServerConfig {
return *srvConf
}

func SetServerGrpool() {
if srvConf.GrPoolSize > 1 {
srvGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(srvConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(srvConf.QueueLen),
gxsync.WithTaskPoolTaskQueueNumber(srvConf.QueueNumber))
}
}

type Server struct {
conf ServerConfig
tcpServer getty.Server
Expand Down Expand Up @@ -123,14 +136,15 @@ func (s *Server) newSession(session getty.Session) error {
session.SetMaxMsgLen(conf.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(rpcServerPkgHandler)
session.SetEventListener(s.rpcHandler)
session.SetRQLen(conf.GettySessionParam.PkgRQSize)
session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout)
logger.Debugf("app accepts new session:%s\n", session.Stat())

session.SetTaskPool(srvGrpool)

return nil
}

Expand Down

0 comments on commit 688a156

Please sign in to comment.