Skip to content

Commit 677c2df

Browse files
committed
enable shrink the socket pool size
we found the mgo will allocate the pool size during burst traffic but won't close the sockets any more until restart the client or server. And the mongo document defines two related query options - [minPoolSize](https://docs.mongodb.com/manual/reference/connection-string/#urioption.minPoolSize) - [maxIdleTimeMS](https://docs.mongodb.com/manual/reference/connection-string/#urioption.maxIdleTimeMS) By implementing these two options, it could shrink the pool to minPoolSize after the sockets introduced by burst traffic timeout. The idea comes from https://github.com/JodeZer/mgo , he investigated this issue and provide the initial commits. I found there are still some issue in sockets maintenance, and had a PR against his repo JodeZer#1 . This commit include JodeZer's commits and my fix, and I simplified the data structure. What's in this commit could be described as this figure: +------------------------+ | Session | <-------+ Add options here +------------------------+ +------------------------+ | Cluster | <-------+ Add options here +------------------------+ +------------------------+ | Server | <-------+*Add options here | | *add timestamp when recycle a socket +---+ | +-----------+ | +---+ *periodically check the unused sockets | | | shrinker <------+ and reclaim the timeout sockets. +---+ | +-----------+ | | | | | +------------------------+ | | +------------------------+ | | Socket | <-------+ Add a field for last used times+---------+ +------------------------+ Signed-off-by: Wang Xu <gnawux@gmail.com>
1 parent 88cedcd commit 677c2df

File tree

4 files changed

+123
-25
lines changed

4 files changed

+123
-25
lines changed

cluster.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,21 +48,23 @@ import (
4848

4949
type mongoCluster struct {
5050
sync.RWMutex
51-
serverSynced sync.Cond
52-
userSeeds []string
53-
dynaSeeds []string
54-
servers mongoServers
55-
masters mongoServers
56-
references int
57-
syncing bool
58-
direct bool
59-
failFast bool
60-
syncCount uint
61-
setName string
62-
cachedIndex map[string]bool
63-
sync chan bool
64-
dial dialer
65-
appName string
51+
serverSynced sync.Cond
52+
userSeeds []string
53+
dynaSeeds []string
54+
servers mongoServers
55+
masters mongoServers
56+
references int
57+
syncing bool
58+
direct bool
59+
failFast bool
60+
syncCount uint
61+
setName string
62+
cachedIndex map[string]bool
63+
sync chan bool
64+
dial dialer
65+
appName string
66+
minPoolSize int
67+
maxIdleTimeMS int
6668
}
6769

6870
func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string, appName string) *mongoCluster {
@@ -419,7 +421,7 @@ func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoSer
419421
if server != nil {
420422
return server
421423
}
422-
return newServer(addr, tcpaddr, cluster.sync, cluster.dial)
424+
return newServer(addr, tcpaddr, cluster.sync, cluster.dial, cluster.minPoolSize, cluster.maxIdleTimeMS)
423425
}
424426

425427
func resolveAddr(addr string) (*net.TCPAddr, error) {

server.go

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ type mongoServer struct {
5555
pingCount uint32
5656
closed bool
5757
abended bool
58+
minPoolSize int
59+
maxIdleTimeMS int
5860
}
5961

6062
type dialer struct {
@@ -76,17 +78,22 @@ type mongoServerInfo struct {
7678

7779
var defaultServerInfo mongoServerInfo
7880

79-
func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer) *mongoServer {
81+
func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer {
8082
server := &mongoServer{
81-
Addr: addr,
82-
ResolvedAddr: tcpaddr.String(),
83-
tcpaddr: tcpaddr,
84-
sync: sync,
85-
dial: dial,
86-
info: &defaultServerInfo,
87-
pingValue: time.Hour, // Push it back before an actual ping.
83+
Addr: addr,
84+
ResolvedAddr: tcpaddr.String(),
85+
tcpaddr: tcpaddr,
86+
sync: sync,
87+
dial: dial,
88+
info: &defaultServerInfo,
89+
pingValue: time.Hour, // Push it back before an actual ping.
90+
minPoolSize: minPoolSize,
91+
maxIdleTimeMS: maxIdleTimeMS,
8892
}
8993
go server.pinger(true)
94+
if maxIdleTimeMS != 0 {
95+
go server.poolShrinker()
96+
}
9097
return server
9198
}
9299

@@ -221,6 +228,7 @@ func (server *mongoServer) close(waitForIdle bool) {
221228
func (server *mongoServer) RecycleSocket(socket *mongoSocket) {
222229
server.Lock()
223230
if !server.closed {
231+
socket.lastTimeUsed = time.Now()
224232
server.unusedSockets = append(server.unusedSockets, socket)
225233
}
226234
server.Unlock()
@@ -346,6 +354,53 @@ func (server *mongoServer) pinger(loop bool) {
346354
}
347355
}
348356

357+
func (server *mongoServer) poolShrinker() {
358+
ticker := time.NewTicker(1 * time.Minute)
359+
for _ = range ticker.C {
360+
if server.closed {
361+
ticker.Stop()
362+
return
363+
}
364+
server.Lock()
365+
unused := len(server.unusedSockets)
366+
if unused < server.minPoolSize {
367+
server.Unlock()
368+
continue
369+
}
370+
now := time.Now()
371+
end := 0
372+
reclaimMap := map[*mongoSocket]bool{}
373+
// Because the acquirision and recycle are done at the tail of array,
374+
// the head is always the oldest unused socket.
375+
for _, s := range server.unusedSockets[:unused-server.minPoolSize] {
376+
if s.lastTimeUsed.Add(time.Duration(server.maxIdleTimeMS) * time.Millisecond).After(now) {
377+
break
378+
}
379+
end++
380+
reclaimMap[s] = true
381+
}
382+
tbr := server.unusedSockets[:end]
383+
if end > 0 {
384+
next := make([]*mongoSocket, unused-end)
385+
copy(next, server.unusedSockets[end:])
386+
server.unusedSockets = next
387+
remainSockets := []*mongoSocket{}
388+
for _, s := range server.liveSockets {
389+
if !reclaimMap[s] {
390+
remainSockets = append(remainSockets, s)
391+
}
392+
}
393+
server.liveSockets = remainSockets
394+
stats.conn(-1*end, server.info.Master)
395+
}
396+
server.Unlock()
397+
398+
for _, s := range tbr {
399+
s.Close()
400+
}
401+
}
402+
}
403+
349404
type mongoServerSlice []*mongoServer
350405

351406
func (s mongoServerSlice) Len() int {

session.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,15 @@ const (
269269
// Defines the per-server socket pool limit. Defaults to 4096.
270270
// See Session.SetPoolLimit for details.
271271
//
272+
// minPoolSize=<limit>
273+
//
274+
// Defines the per-server socket pool minium size. Defaults to 0.
275+
//
276+
// maxIdleTimeMS=<millisecond>
277+
//
278+
// The maximum number of milliseconds that a connection can remain idle in the pool
279+
// before being removed and closed.
280+
//
272281
// appName=<appName>
273282
//
274283
// The identifier of this client application. This parameter is used to
@@ -320,6 +329,8 @@ func ParseURL(url string) (*DialInfo, error) {
320329
appName := ""
321330
readPreferenceMode := Primary
322331
var readPreferenceTagSets []bson.D
332+
minPoolSize := 0
333+
maxIdleTimeMS := 0
323334
for _, opt := range uinfo.options {
324335
switch opt.key {
325336
case "authSource":
@@ -366,6 +377,16 @@ func ParseURL(url string) (*DialInfo, error) {
366377
doc = append(doc, bson.DocElem{Name: strings.TrimSpace(kvp[0]), Value: strings.TrimSpace(kvp[1])})
367378
}
368379
readPreferenceTagSets = append(readPreferenceTagSets, doc)
380+
case "minPoolSize":
381+
minPoolSize, err = strconv.Atoi(opt.value)
382+
if err != nil {
383+
return nil, errors.New("bad value for minPoolSize: " + opt.value)
384+
}
385+
case "maxIdleTimeMS":
386+
maxIdleTimeMS, err = strconv.Atoi(opt.value)
387+
if err != nil {
388+
return nil, errors.New("bad value for maxIdleTimeMS: " + opt.value)
389+
}
369390
case "connect":
370391
if opt.value == "direct" {
371392
direct = true
@@ -400,6 +421,8 @@ func ParseURL(url string) (*DialInfo, error) {
400421
TagSets: readPreferenceTagSets,
401422
},
402423
ReplicaSetName: setName,
424+
MinPoolSize: minPoolSize,
425+
MaxIdleTimeMS: maxIdleTimeMS,
403426
}
404427
return &info, nil
405428
}
@@ -473,6 +496,14 @@ type DialInfo struct {
473496
// cluster and establish connections with further servers too.
474497
Direct bool
475498

499+
// MinPoolSize defines The minimum number of connections in the connection pool.
500+
// Defaults to 0.
501+
MinPoolSize int
502+
503+
//The maximum number of milliseconds that a connection can remain idle in the pool
504+
// before being removed and closed.
505+
MaxIdleTimeMS int
506+
476507
// DialServer optionally specifies the dial function for establishing
477508
// connections with the MongoDB servers.
478509
DialServer func(addr *ServerAddr) (net.Conn, error)
@@ -552,6 +583,15 @@ func DialWithInfo(info *DialInfo) (*Session, error) {
552583
if info.PoolLimit > 0 {
553584
session.poolLimit = info.PoolLimit
554585
}
586+
587+
if info.MinPoolSize > 0 {
588+
cluster.minPoolSize = info.MinPoolSize
589+
}
590+
591+
if info.maxIdleTimeMS > 0 {
592+
cluster.maxIdleTimeMS = info.MaxIdleTimeMS
593+
}
594+
555595
cluster.Release()
556596

557597
// People get confused when we return a session that is not actually
@@ -5262,7 +5302,7 @@ func getRFC2253NameString(RDNElements *pkix.RDNSequence) string {
52625302
var replacer = strings.NewReplacer(",", "\\,", "=", "\\=", "+", "\\+", "<", "\\<", ">", "\\>", ";", "\\;")
52635303
//The elements in the sequence needs to be reversed when converting them
52645304
for i := len(*RDNElements) - 1; i >= 0; i-- {
5265-
var nameAndValueList = make([]string,len((*RDNElements)[i]))
5305+
var nameAndValueList = make([]string, len((*RDNElements)[i]))
52665306
for j, attribute := range (*RDNElements)[i] {
52675307
var shortAttributeName = rdnOIDToShortName(attribute.Type)
52685308
if len(shortAttributeName) <= 0 {

socket.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type mongoSocket struct {
5454
dead error
5555
serverInfo *mongoServerInfo
5656
closeAfterIdle bool
57+
lastTimeUsed time.Time // for time based idle socket release
5758
}
5859

5960
type queryOpFlags uint32

0 commit comments

Comments
 (0)