Skip to content

Commit

Permalink
feat: use mongodb official driver instead of mgo (zeromicro#1782)
Browse files Browse the repository at this point in the history
* wip: backup

* wip: backup

* wip: backup

* backup

* backup

* backup

* add more tests

* fix wrong dependency

* fix lint errors

* remove test due to data race

* add tests

* fix test error

* add mon.Model

* add mon.Model unmarshal

* add monc

* add more tests for monc

* add more tests for monc

* add docs for mon and monc packages

* fix doc errors

* chhore: add comment

* chore: fix test bug

* chore: refine tests

* chore: remove primitive.NewObjectID in test code

* chore: rename test files for typo
  • Loading branch information
kevwan authored Apr 19, 2022
1 parent bbe1249 commit 2cdff97
Show file tree
Hide file tree
Showing 20 changed files with 2,618 additions and 2 deletions.
91 changes: 91 additions & 0 deletions core/stores/mon/bulkinserter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package mon

import (
"context"
"time"

"github.com/zeromicro/go-zero/core/executors"
"github.com/zeromicro/go-zero/core/logx"
"go.mongodb.org/mongo-driver/mongo"
)

const (
flushInterval = time.Second
maxBulkRows = 1000
)

type (
// ResultHandler is a handler that used to handle results.
ResultHandler func(*mongo.InsertManyResult, error)

// A BulkInserter is used to insert bulk of mongo records.
BulkInserter struct {
executor *executors.PeriodicalExecutor
inserter *dbInserter
}
)

// NewBulkInserter returns a BulkInserter.
func NewBulkInserter(coll *mongo.Collection, interval ...time.Duration) *BulkInserter {
inserter := &dbInserter{
collection: coll,
}

duration := flushInterval
if len(interval) > 0 {
duration = interval[0]
}

return &BulkInserter{
executor: executors.NewPeriodicalExecutor(duration, inserter),
inserter: inserter,
}
}

// Flush flushes the inserter, writes all pending records.
func (bi *BulkInserter) Flush() {
bi.executor.Flush()
}

// Insert inserts doc.
func (bi *BulkInserter) Insert(doc interface{}) {
bi.executor.Add(doc)
}

// SetResultHandler sets the result handler.
func (bi *BulkInserter) SetResultHandler(handler ResultHandler) {
bi.executor.Sync(func() {
bi.inserter.resultHandler = handler
})
}

type dbInserter struct {
collection *mongo.Collection
documents []interface{}
resultHandler ResultHandler
}

func (in *dbInserter) AddTask(doc interface{}) bool {
in.documents = append(in.documents, doc)
return len(in.documents) >= maxBulkRows
}

func (in *dbInserter) Execute(objs interface{}) {
docs := objs.([]interface{})
if len(docs) == 0 {
return
}

result, err := in.collection.InsertMany(context.Background(), docs)
if in.resultHandler != nil {
in.resultHandler(result, err)
} else if err != nil {
logx.Error(err)
}
}

func (in *dbInserter) RemoveAll() interface{} {
documents := in.documents
in.documents = nil
return documents
}
27 changes: 27 additions & 0 deletions core/stores/mon/bulkinserter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package mon

import (
"testing"

"github.com/stretchr/testify/assert"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
)

func TestBulkInserter(t *testing.T) {
mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock))
defer mt.Close()

mt.Run("test", func(mt *mtest.T) {
mt.AddMockResponses(mtest.CreateSuccessResponse(bson.D{{Key: "ok", Value: 1}}...))
bulk := NewBulkInserter(mt.Coll)
bulk.SetResultHandler(func(result *mongo.InsertManyResult, err error) {
assert.Nil(t, err)
assert.Equal(t, 2, len(result.InsertedIDs))
})
bulk.Insert(bson.D{{Key: "foo", Value: "bar"}})
bulk.Insert(bson.D{{Key: "foo", Value: "baz"}})
bulk.Flush()
})
}
51 changes: 51 additions & 0 deletions core/stores/mon/clientmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package mon

import (
"context"
"io"
"time"

"github.com/zeromicro/go-zero/core/syncx"
"go.mongodb.org/mongo-driver/mongo"
mopt "go.mongodb.org/mongo-driver/mongo/options"
)

const defaultTimeout = time.Second

var clientManager = syncx.NewResourceManager()

// ClosableClient wraps *mongo.Client and provides a Close method.
type ClosableClient struct {
*mongo.Client
}

// Close disconnects the underlying *mongo.Client.
func (cs *ClosableClient) Close() error {
return cs.Client.Disconnect(context.Background())
}

// Inject injects a *mongo.Client into the client manager.
// Typically, this is used to inject a *mongo.Client for test purpose.
func Inject(key string, client *mongo.Client) {
clientManager.Inject(key, &ClosableClient{client})
}

func getClient(url string) (*mongo.Client, error) {
val, err := clientManager.GetResource(url, func() (io.Closer, error) {
cli, err := mongo.Connect(context.Background(), mopt.Client().ApplyURI(url))
if err != nil {
return nil, err
}

concurrentSess := &ClosableClient{
Client: cli,
}

return concurrentSess, nil
})
if err != nil {
return nil, err
}

return val.(*ClosableClient).Client, nil
}
20 changes: 20 additions & 0 deletions core/stores/mon/clientmanager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package mon

import (
"testing"

"github.com/stretchr/testify/assert"
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
)

func TestClientManger_getClient(t *testing.T) {
mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock))
defer mt.Close()

mt.Run("test", func(mt *mtest.T) {
Inject(mtest.ClusterURI(), mt.Client)
cli, err := getClient(mtest.ClusterURI())
assert.Nil(t, err)
assert.Equal(t, mt.Client, cli)
})
}
Loading

0 comments on commit 2cdff97

Please sign in to comment.