Skip to content

Commit

Permalink
p2p(ticdc): add keep alive configuration for p2p server (pingcap#8908) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 19, 2023
1 parent e94472f commit ae3d82c
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 2 deletions.
8 changes: 8 additions & 0 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ func TestParseCfg(t *testing.T) {
ServerAckInterval: config.TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
MaxRecvMsgSize: 256 * 1024 * 1024,
KeepAliveTimeout: config.TomlDuration(time.Second * 10),
KeepAliveTime: config.TomlDuration(time.Second * 30),
},
},
}, o.serverConfig)
Expand Down Expand Up @@ -357,6 +359,8 @@ max-recv-msg-size = 4
ServerAckInterval: config.TomlDuration(1 * time.Second),
ServerWorkerPoolSize: 16,
MaxRecvMsgSize: 4,
KeepAliveTimeout: config.TomlDuration(time.Second * 10),
KeepAliveTime: config.TomlDuration(time.Second * 30),
},
},
}, o.serverConfig)
Expand Down Expand Up @@ -501,6 +505,8 @@ cert-allowed-cn = ["dd","ee"]
ServerAckInterval: config.TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
MaxRecvMsgSize: 256 * 1024 * 1024,
KeepAliveTimeout: config.TomlDuration(time.Second * 10),
KeepAliveTime: config.TomlDuration(time.Second * 30),
},
},
}, o.serverConfig)
Expand Down Expand Up @@ -562,6 +568,8 @@ unknown3 = 3
ServerAckInterval: config.TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
MaxRecvMsgSize: 256 * 1024 * 1024,
KeepAliveTimeout: config.TomlDuration(time.Second * 10),
KeepAliveTime: config.TomlDuration(time.Second * 30),
},
}, o.serverConfig.Debug)
}
4 changes: 3 additions & 1 deletion pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ const (
"server-max-pending-message-count": 102400,
"server-ack-interval": 100000000,
"server-worker-pool-size": 4,
"max-recv-msg-size": 268435456
"max-recv-msg-size": 268435456,
"keep-alive-time": 30000000000,
"keep-alive-timeout": 10000000000
}
}
}`
Expand Down
20 changes: 20 additions & 0 deletions pkg/config/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ type MessagesConfig struct {

// MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive.
MaxRecvMsgSize int `toml:"max-recv-msg-size" json:"max-recv-msg-size"`

// After a duration of this time if the server doesn't see any activity it
// pings the client to see if the transport is still alive.
KeepAliveTime TomlDuration `toml:"keep-alive-time" json:"keep-alive-time"`
// After having pinged for keepalive check, the server waits for a duration
// of Timeout and if no activity is seen even after that the connection is
// closed.
KeepAliveTimeout TomlDuration `toml:"keep-alive-timeout" json:"keep-alive-timeout"`
}

// read only
Expand All @@ -48,6 +56,8 @@ var defaultMessageConfig = &MessagesConfig{
ServerAckInterval: TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
MaxRecvMsgSize: defaultMaxRecvMsgSize,
KeepAliveTime: TomlDuration(time.Second * 30),
KeepAliveTimeout: TomlDuration(time.Second * 10),
}

const (
Expand Down Expand Up @@ -115,6 +125,12 @@ func (c *MessagesConfig) ValidateAndAdjust() error {
if c.ServerAckInterval == 0 {
c.ServerAckInterval = defaultMessageConfig.ServerAckInterval
}
if c.KeepAliveTime == 0 {
c.KeepAliveTime = defaultMessageConfig.KeepAliveTime
}
if c.KeepAliveTimeout == 0 {
c.KeepAliveTimeout = defaultMessageConfig.KeepAliveTimeout
}
if time.Duration(c.ServerAckInterval) > 10*time.Second {
return cerrors.ErrInvalidServerOption.GenWithStackByArgs("server-ack-interval is larger than 10s")
}
Expand Down Expand Up @@ -149,6 +165,8 @@ func (c *MessagesConfig) Clone() *MessagesConfig {
ServerAckInterval: c.ServerAckInterval,
ServerWorkerPoolSize: c.ServerWorkerPoolSize,
MaxRecvMsgSize: c.MaxRecvMsgSize,
KeepAliveTime: c.KeepAliveTime,
KeepAliveTimeout: c.KeepAliveTimeout,
}
}

Expand Down Expand Up @@ -177,5 +195,7 @@ func (c *MessagesConfig) ToMessageServerConfig() *p2p.MessageServerConfig {
WaitUnregisterHandleTimeoutThreshold: unregisterHandleTimeout,
SendRateLimitPerStream: serverSendRateLimit,
MaxRecvMsgSize: c.MaxRecvMsgSize,
KeepAliveTimeout: time.Duration(c.KeepAliveTimeout),
KeepAliveTime: time.Duration(c.KeepAliveTime),
}
}
9 changes: 9 additions & 0 deletions pkg/p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ type MessageServerConfig struct {
// MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive.
MaxRecvMsgSize int

// After a duration of this time if the server doesn't see any activity it
// pings the client to see if the transport is still alive.
KeepAliveTime time.Duration

// After having pinged for keepalive check, the server waits for a duration
// of Timeout and if no activity is seen even after that the connection is
// closed.
KeepAliveTimeout time.Duration

// The maximum time duration to wait before forcefully removing a handler.
//
// waitUnregisterHandleTimeout specifies how long to wait for
Expand Down
10 changes: 9 additions & 1 deletion pkg/p2p/server_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
gRPCPeer "google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -69,7 +70,14 @@ func NewServerWrapper(cfg *MessageServerConfig) *ServerWrapper {

// ServerOptions returns server option for creating grpc servers.
func (s *ServerWrapper) ServerOptions() []grpc.ServerOption {
return []grpc.ServerOption{grpc.MaxRecvMsgSize(s.cfg.MaxRecvMsgSize)}
keepaliveParams := keepalive.ServerParameters{
Time: s.cfg.KeepAliveTime,
Timeout: s.cfg.KeepAliveTimeout,
}
return []grpc.ServerOption{
grpc.MaxRecvMsgSize(s.cfg.MaxRecvMsgSize),
grpc.KeepaliveParams(keepaliveParams),
}
}

// SendMessage implements p2p.CDCPeerToPeerServer
Expand Down

0 comments on commit ae3d82c

Please sign in to comment.