Skip to content

Commit 72d0ac2

Browse files
authored
Separate read/write network timeouts (#161)
* socket: separate read/write network timeouts Splits DialInfo.Timeout (defaults to 60s when using mgo.Dial()) into ReadTimeout and WriteTimeout to address #160. Read/write timeout defaults to DialInfo.Timeout to preserve existing behaviour. * cluster: remove AcquireSocket Only used by tests, replaced by the pool-aware acquire socket functions: * AcquireSocketWithPoolTimeout * AcquireSocketWithBlocking * cluster: use configured timeouts for cluster operations * `mongoCluster.syncServer()` no longer uses hard-coded 5 seconds * `mongoCluster.isMaster()` no longer uses hard-coded 10 seconds * tests: use DialInfo for internal timeouts * server: fix fantastic serverTags nil slice bug When unmarshalling serverTags, it is now an empty slice, instead of a nil slice. `len(thing) == 0` works all the time, regardless. * cluster: remove unused duplicate pool config * session: avoid calculating default values in hot path Changes `DialWithInfo` to handle setting default values by setting the relevant `DialInfo` field, rather than calling the respective methods in the hot path for: * `PoolLimit` * `ReadTimeout` * `WriteTimeout` * session: remove unused consts * session: update docs
1 parent 45151e7 commit 72d0ac2

File tree

8 files changed

+285
-179
lines changed

8 files changed

+285
-179
lines changed

cluster.go

Lines changed: 38 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -48,34 +48,26 @@ import (
4848

4949
type mongoCluster struct {
5050
sync.RWMutex
51-
serverSynced sync.Cond
52-
userSeeds []string
53-
dynaSeeds []string
54-
servers mongoServers
55-
masters mongoServers
56-
references int
57-
syncing bool
58-
direct bool
59-
failFast bool
60-
syncCount uint
61-
setName string
62-
cachedIndex map[string]bool
63-
sync chan bool
64-
dial dialer
65-
appName string
66-
minPoolSize int
67-
maxIdleTimeMS int
51+
serverSynced sync.Cond
52+
userSeeds []string
53+
dynaSeeds []string
54+
servers mongoServers
55+
masters mongoServers
56+
references int
57+
syncing bool
58+
syncCount uint
59+
cachedIndex map[string]bool
60+
sync chan bool
61+
dial dialer
62+
dialInfo *DialInfo
6863
}
6964

70-
func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string, appName string) *mongoCluster {
65+
func newCluster(userSeeds []string, info *DialInfo) *mongoCluster {
7166
cluster := &mongoCluster{
7267
userSeeds: userSeeds,
7368
references: 1,
74-
direct: direct,
75-
failFast: failFast,
76-
dial: dial,
77-
setName: setName,
78-
appName: appName,
69+
dial: dialer{info.Dial, info.DialServer},
70+
dialInfo: info,
7971
}
8072
cluster.serverSynced.L = cluster.RWMutex.RLocker()
8173
cluster.sync = make(chan bool, 1)
@@ -147,7 +139,7 @@ type isMasterResult struct {
147139

148140
func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResult) error {
149141
// Monotonic let's it talk to a slave and still hold the socket.
150-
session := newSession(Monotonic, cluster, 10*time.Second)
142+
session := newSession(Monotonic, cluster, cluster.dialInfo)
151143
session.setSocket(socket)
152144

153145
var cmd = bson.D{{Name: "isMaster", Value: 1}}
@@ -171,8 +163,8 @@ func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResul
171163
}
172164

173165
// Include the application name if set
174-
if cluster.appName != "" {
175-
meta["application"] = bson.M{"name": cluster.appName}
166+
if cluster.dialInfo.AppName != "" {
167+
meta["application"] = bson.M{"name": cluster.dialInfo.AppName}
176168
}
177169

178170
cmd = append(cmd, bson.DocElem{
@@ -190,27 +182,15 @@ type possibleTimeout interface {
190182
Timeout() bool
191183
}
192184

193-
var syncSocketTimeout = 5 * time.Second
194-
195185
func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerInfo, hosts []string, err error) {
196-
var syncTimeout time.Duration
197-
if raceDetector {
198-
// This variable is only ever touched by tests.
199-
globalMutex.Lock()
200-
syncTimeout = syncSocketTimeout
201-
globalMutex.Unlock()
202-
} else {
203-
syncTimeout = syncSocketTimeout
204-
}
205-
206186
addr := server.Addr
207187
log("SYNC Processing ", addr, "...")
208188

209189
// Retry a few times to avoid knocking a server down for a hiccup.
210190
var result isMasterResult
211191
var tryerr error
212192
for retry := 0; ; retry++ {
213-
if retry == 3 || retry == 1 && cluster.failFast {
193+
if retry == 3 || retry == 1 && cluster.dialInfo.FailFast {
214194
return nil, nil, tryerr
215195
}
216196
if retry > 0 {
@@ -222,16 +202,22 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerI
222202
time.Sleep(syncShortDelay)
223203
}
224204

225-
// It's not clear what would be a good timeout here. Is it
226-
// better to wait longer or to retry?
227-
socket, _, err := server.AcquireSocket(0, syncTimeout)
205+
// Don't ever hit the pool limit for syncing
206+
config := cluster.dialInfo.Copy()
207+
config.PoolLimit = 0
208+
209+
socket, _, err := server.AcquireSocket(config)
228210
if err != nil {
229211
tryerr = err
230212
logf("SYNC Failed to get socket to %s: %v", addr, err)
231213
continue
232214
}
233215
err = cluster.isMaster(socket, &result)
216+
217+
// Restore the correct dial config before returning it to the pool
218+
socket.dialInfo = cluster.dialInfo
234219
socket.Release()
220+
235221
if err != nil {
236222
tryerr = err
237223
logf("SYNC Command 'ismaster' to %s failed: %v", addr, err)
@@ -241,9 +227,9 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerI
241227
break
242228
}
243229

244-
if cluster.setName != "" && result.SetName != cluster.setName {
245-
logf("SYNC Server %s is not a member of replica set %q", addr, cluster.setName)
246-
return nil, nil, fmt.Errorf("server %s is not a member of replica set %q", addr, cluster.setName)
230+
if cluster.dialInfo.ReplicaSetName != "" && result.SetName != cluster.dialInfo.ReplicaSetName {
231+
logf("SYNC Server %s is not a member of replica set %q", addr, cluster.dialInfo.ReplicaSetName)
232+
return nil, nil, fmt.Errorf("server %s is not a member of replica set %q", addr, cluster.dialInfo.ReplicaSetName)
247233
}
248234

249235
if result.IsMaster {
@@ -255,7 +241,7 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerI
255241
}
256242
} else if result.Secondary {
257243
debugf("SYNC %s is a slave.", addr)
258-
} else if cluster.direct {
244+
} else if cluster.dialInfo.Direct {
259245
logf("SYNC %s in unknown state. Pretending it's a slave due to direct connection.", addr)
260246
} else {
261247
logf("SYNC %s is neither a master nor a slave.", addr)
@@ -386,7 +372,7 @@ func (cluster *mongoCluster) syncServersLoop() {
386372
break
387373
}
388374
cluster.references++ // Keep alive while syncing.
389-
direct := cluster.direct
375+
direct := cluster.dialInfo.Direct
390376
cluster.Unlock()
391377

392378
cluster.syncServersIteration(direct)
@@ -401,7 +387,7 @@ func (cluster *mongoCluster) syncServersLoop() {
401387

402388
// Hold off before allowing another sync. No point in
403389
// burning CPU looking for down servers.
404-
if !cluster.failFast {
390+
if !cluster.dialInfo.FailFast {
405391
time.Sleep(syncShortDelay)
406392
}
407393

@@ -439,13 +425,11 @@ func (cluster *mongoCluster) syncServersLoop() {
439425
func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoServer {
440426
cluster.RLock()
441427
server := cluster.servers.Search(tcpaddr.String())
442-
minPoolSize := cluster.minPoolSize
443-
maxIdleTimeMS := cluster.maxIdleTimeMS
444428
cluster.RUnlock()
445429
if server != nil {
446430
return server
447431
}
448-
return newServer(addr, tcpaddr, cluster.sync, cluster.dial, minPoolSize, maxIdleTimeMS)
432+
return newServer(addr, tcpaddr, cluster.sync, cluster.dial, cluster.dialInfo)
449433
}
450434

451435
func resolveAddr(addr string) (*net.TCPAddr, error) {
@@ -614,19 +598,10 @@ func (cluster *mongoCluster) syncServersIteration(direct bool) {
614598
cluster.Unlock()
615599
}
616600

617-
// AcquireSocket returns a socket to a server in the cluster. If slaveOk is
618-
// true, it will attempt to return a socket to a slave server. If it is
619-
// false, the socket will necessarily be to a master server.
620-
func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) {
621-
return cluster.AcquireSocketWithPoolTimeout(mode, slaveOk, syncTimeout, socketTimeout, serverTags, poolLimit, 0)
622-
}
623-
624601
// AcquireSocketWithPoolTimeout returns a socket to a server in the cluster. If slaveOk is
625602
// true, it will attempt to return a socket to a slave server. If it is
626603
// false, the socket will necessarily be to a master server.
627-
func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(
628-
mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int, poolTimeout time.Duration,
629-
) (s *mongoSocket, err error) {
604+
func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(mode Mode, slaveOk bool, syncTimeout time.Duration, serverTags []bson.D, info *DialInfo) (s *mongoSocket, err error) {
630605
var started time.Time
631606
var syncCount uint
632607
for {
@@ -645,7 +620,7 @@ func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(
645620
// Initialize after fast path above.
646621
started = time.Now()
647622
syncCount = cluster.syncCount
648-
} else if syncTimeout != 0 && started.Before(time.Now().Add(-syncTimeout)) || cluster.failFast && cluster.syncCount != syncCount {
623+
} else if syncTimeout != 0 && started.Before(time.Now().Add(-syncTimeout)) || cluster.dialInfo.FailFast && cluster.syncCount != syncCount {
649624
cluster.RUnlock()
650625
return nil, errors.New("no reachable servers")
651626
}
@@ -670,7 +645,7 @@ func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(
670645
continue
671646
}
672647

673-
s, abended, err := server.AcquireSocketWithBlocking(poolLimit, socketTimeout, poolTimeout)
648+
s, abended, err := server.AcquireSocketWithBlocking(info)
674649
if err == errPoolTimeout {
675650
// No need to remove servers from the topology if acquiring a socket fails for this reason.
676651
return nil, err

cluster_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,8 +1055,6 @@ func (s *S) TestSocketTimeoutOnDial(c *C) {
10551055

10561056
timeout := 1 * time.Second
10571057

1058-
defer mgo.HackSyncSocketTimeout(timeout)()
1059-
10601058
s.Freeze("localhost:40001")
10611059

10621060
started := time.Now()

export_test.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,6 @@ func HackPingDelay(newDelay time.Duration) (restore func()) {
1919
return
2020
}
2121

22-
func HackSyncSocketTimeout(newTimeout time.Duration) (restore func()) {
23-
globalMutex.Lock()
24-
defer globalMutex.Unlock()
25-
26-
oldTimeout := syncSocketTimeout
27-
restore = func() {
28-
globalMutex.Lock()
29-
syncSocketTimeout = oldTimeout
30-
globalMutex.Unlock()
31-
}
32-
syncSocketTimeout = newTimeout
33-
return
34-
}
35-
3622
func (s *Session) Cluster() *mongoCluster {
3723
return s.cluster()
3824
}

0 commit comments

Comments
 (0)