Skip to content

Commit

Permalink
- Implemented support for Safe() and Unsafe() in the session.
Browse files Browse the repository at this point in the history
- Better socket acquiring/releasing in the session.
- Some minor method renaming.
  • Loading branch information
niemeyer committed Jan 12, 2011
1 parent ec23d38 commit 1d7d318
Show file tree
Hide file tree
Showing 10 changed files with 461 additions and 218 deletions.
17 changes: 14 additions & 3 deletions IDEAS
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
col.Find(...).All() // With exhaust
col.Find(...).Chan() // With channel
Limit(n) == Batch(n).Prefetch(0) + NotFound when reaching (n)



col.Find(...).ForEach(...)
col.Find(...).Select(...).Skip(i).Limit(j).All()
col.Sort(...)
Expand All @@ -14,12 +16,21 @@ Delete()
Query(foo).Update(document)
Find(foo).Update(doc)
...or...
session.Insert(value)
session.UpdateOne(query, update)
session.UpdateAll(query, update)
session.Insert(value)
session.UpdateOne(query, update, nil)
session.UpdateAll(query, update, &Safe{W:3})
session.Upsert(query, value)
session.SafeUpdateOne(query, update, nil)
session.SafeUpdateAll(query, update, nil)
session.SafeInsert(value)


session.Safe(&mongodb.Safe{W:1})
session.Safe(0, 0, false) + session.Unsafe()
+
query.Safe(0, 0, false)

session.PushSafe(0, 0, false)
session.PopSafe()
183 changes: 176 additions & 7 deletions all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@ func (s *S) TestTopologySyncWithSingleMaster(c *C) {
c.Assert(err, IsNil)

coll := session.DB("mydb").C("mycollection")
coll.Insert(B.M{"a": 1, "b": 2})

result := struct{Ok bool}{}
err = session.Run("getLastError", &result)
err = coll.Insert(B.M{"a": 1, "b": 2})
c.Assert(err, IsNil)
c.Assert(result.Ok, Equals, true)

// One connection during discovery. Master socket
// recycled for insert.
// One connection used for discovery. Master socket recycled for
// insert. Socket is reserved after insert.
stats := mongogo.GetStats()
c.Assert(stats.MasterConns, Equals, 1)
c.Assert(stats.SlaveConns, Equals, 0)
c.Assert(stats.SocketRefs, Equals, 1)

// Restart session and socket must be released.
session.Restart()
stats = mongogo.GetStats()
c.Assert(stats.SocketRefs, Equals, 0)
}

func (s *S) TestTopologySyncWithSlaveSeed(c *C) {
Expand All @@ -50,6 +52,15 @@ func (s *S) TestTopologySyncWithSlaveSeed(c *C) {
stats := mongogo.GetStats()
c.Assert(stats.MasterConns, Equals, 1)
c.Assert(stats.SlaveConns, Equals, 2)

// Only one socket reference alive, in the master socket owned
// by the above session.
c.Assert(stats.SocketRefs, Equals, 1)

// Restart it, and it must be gone.
session.Restart()
stats = mongogo.GetStats()
c.Assert(stats.SocketRefs, Equals, 0)
}

func (s *S) TestRunString(c *C) {
Expand Down Expand Up @@ -124,6 +135,8 @@ func (s *S) TestInsertFindIter(c *C) {
coll.Insert(B.M{"n": n})
}

session.Restart() // Release socket.

mongogo.ResetStats()

query := coll.Find(B.M{"n": B.M{"$gte": 42}}).Prefetch(0).Batch(2)
Expand Down Expand Up @@ -162,10 +175,13 @@ func (s *S) TestInsertFindIter(c *C) {
err = iter.Next(&result)
c.Assert(err == mongogo.NotFound, Equals, true)

session.Restart() // Release socket.

stats := mongogo.GetStats()
c.Assert(stats.SentOps, Equals, 3) // 1*QUERY_OP + 2*GET_MORE_OP
c.Assert(stats.ReceivedOps, Equals, 3) // and their REPLY_OPs.
c.Assert(stats.ReceivedDocs, Equals, 5)
c.Assert(stats.SocketRefs, Equals, 0)
}

func (s *S) TestSort(c *C) {
Expand Down Expand Up @@ -316,3 +332,156 @@ func (s *S) TestPrefetching(c *C) {
c.Assert(stats.ReceivedDocs, Equals, 201) // 200 + the ping result
}
}

func (s *S) TestSafeInsert(c *C) {
session, err := mongogo.Mongo("localhost:40001")
c.Assert(err, IsNil)

coll := session.DB("mydb").C("mycollection")

// Insert an element with a predefined key.
err = coll.Insert(B.M{"_id": 1})
c.Assert(err, IsNil)

mongogo.ResetStats()

// Session should be safe by default, so inserting it again must fail.
err = coll.Insert(B.M{"_id": 1})
c.Assert(err, Matches, "E11000 duplicate.*")
c.Assert(err.(*mongogo.LastError).Code, Equals, 11000)

// It must have sent two operations (INSERT_OP + getLastError QUERY_OP)
stats := mongogo.GetStats()
c.Assert(stats.SentOps, Equals, 2)

mongogo.ResetStats()

// If we disable safety, though, it won't complain.
session.Unsafe()
err = coll.Insert(B.M{"_id": 1})
c.Assert(err, IsNil)

// Must have sent a single operation this time (just the INSERT_OP)
stats = mongogo.GetStats()
c.Assert(stats.SentOps, Equals, 1)
}


func (s *S) TestSafeParameters(c *C) {
session, err := mongogo.Mongo("localhost:40001")
c.Assert(err, IsNil)

coll := session.DB("mydb").C("mycollection")

// Tweak the safety parameters to something unachievable,
// since we're talking to a single master.
session.Safe(2, 100, false)
err = coll.Insert(B.M{"_id": 1})
c.Assert(err, Matches, "timeout")
c.Assert(err.(*mongogo.LastError).WTimeout, Equals, true)
}

func (s *S) TestNewSession(c *C) {
session, err := mongogo.Mongo("localhost:40001")
c.Assert(err, IsNil)

// Do a dummy operation to wait for connection.
coll := session.DB("mydb").C("mycollection")
err = coll.Insert(B.M{"_id": 1})
c.Assert(err, IsNil)

// Tweak safety and query settings to ensure clone is copying those.
session.Unsafe()
session.Batch(-1)
clone := session.New()
session.Safe(0, 0, false)

// Clone was copied while session was unsafe, so no errors.
cloneColl := clone.DB("mydb").C("mycollection")
err = cloneColl.Insert(B.M{"_id": 1})
c.Assert(err, IsNil)

// Original session was made safe again.
err = coll.Insert(B.M{"_id": 1})
c.Assert(err, NotNil)

// With New(), each session has its own socket now.
stats := mongogo.GetStats()
c.Assert(stats.MasterConns, Equals, 2)
c.Assert(stats.SocketRefs, Equals, 2)

// Ensure query parameters were cloned.
err = cloneColl.Insert(B.M{"_id": 2})
c.Assert(err, IsNil)

mongogo.ResetStats()

iter, err := cloneColl.Find(B.M{}).Iter()
c.Assert(err, IsNil)

m := B.M{}
err = iter.Next(m)
c.Assert(err, IsNil)

// If Batch(-1) is in effect, a single document must have been received.
stats = mongogo.GetStats()
c.Assert(stats.ReceivedDocs, Equals, 1)
}

func (s *S) TestCloneSession(c *C) {
session, err := mongogo.Mongo("localhost:40001")
c.Assert(err, IsNil)

// Do a dummy operation to wait for connection.
coll := session.DB("mydb").C("mycollection")
err = coll.Insert(B.M{"_id": 1})
c.Assert(err, IsNil)

// Tweak safety and query settings to ensure clone is copying those.
session.Unsafe()
session.Batch(-1)
clone := session.Clone()
session.Safe(0, 0, false)

// Clone was copied while session was unsafe, so no errors.
cloneColl := clone.DB("mydb").C("mycollection")
err = cloneColl.Insert(B.M{"_id": 1})
c.Assert(err, IsNil)

// Original session was made safe again.
err = coll.Insert(B.M{"_id": 1})
c.Assert(err, NotNil)

// With Clone(), same socket is shared between sessions now.
stats := mongogo.GetStats()
c.Assert(stats.MasterConns, Equals, 1)
c.Assert(stats.SocketRefs, Equals, 2)

// Restarting one of them should let the original socket go,
// while preserving the safety settings.
clone.Restart()
err = cloneColl.Insert(B.M{"_id": 1})
c.Assert(err, IsNil)

// Must have used another connection now.
stats = mongogo.GetStats()
c.Assert(stats.MasterConns, Equals, 2)
c.Assert(stats.SocketRefs, Equals, 2)

// Ensure query parameters were cloned.
err = cloneColl.Insert(B.M{"_id": 2})
c.Assert(err, IsNil)

mongogo.ResetStats()

iter, err := cloneColl.Find(B.M{}).Iter()
c.Assert(err, IsNil)

m := B.M{}
err = iter.Next(m)
c.Assert(err, IsNil)

// If Batch(-1) is in effect, a single document must have been received.
stats = mongogo.GetStats()
c.Assert(stats.ReceivedDocs, Equals, 1)
}
7 changes: 4 additions & 3 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func Mongo(servers string) (session *Session, err os.Error) {
userSeeds := strings.Split(servers, ",", -1)
cluster := &mongoCluster{userSeeds:userSeeds}
go cluster.syncServers()
session = newSession(StrongConsistency, cluster, nil)
session = newSession(Strong, cluster, nil)
return session, nil
}

Expand Down Expand Up @@ -77,8 +77,9 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (
log("[sync] Failed to get socket to ", addr, ": ", err.String())
return
}
defer socket.Release()

session := newSession(StrongConsistency, cluster, socket)
session := newSession(Strong, cluster, socket)

result := isMasterResult{}
err = session.Run("ismaster", &result)
Expand Down Expand Up @@ -108,7 +109,7 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (
hosts = append(hosts, result.Hosts...)
hosts = append(hosts, result.Passives...)

session.Reset() // Recycle the socket.
session.Restart() // Release the socket.
cluster.mergeServer(server)

debugf("[sync] %s knows about the following peers: %#v", addr, hosts)
Expand Down
Loading

0 comments on commit 1d7d318

Please sign in to comment.