-
Notifications
You must be signed in to change notification settings - Fork 155
/
client.go
370 lines (342 loc) · 12.5 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
/*
Copyright 2020 The Qmgo Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package qmgo
import (
"context"
"fmt"
"net/url"
"strings"
"time"
"github.com/qiniu/qmgo/options"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/bsoncodec"
"go.mongodb.org/mongo-driver/mongo"
officialOpts "go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
)
// Config for initial mongodb instance
type Config struct {
// URI example: [mongodb://][user:pass@]host1[:port1][,host2[:port2],...][/database][?options]
// URI Reference: https://docs.mongodb.com/manual/reference/connection-string/
Uri string `json:"uri"`
Database string `json:"database"`
Coll string `json:"coll"`
// ConnectTimeoutMS specifies a timeout that is used for creating connections to the server.
// If set to 0, no timeout will be used.
// The default is 30 seconds.
ConnectTimeoutMS *int64 `json:"connectTimeoutMS"`
// MaxPoolSize specifies that maximum number of connections allowed in the driver's connection pool to each server.
// If this is 0, it will be set to math.MaxInt64,
// The default is 100.
MaxPoolSize *uint64 `json:"maxPoolSize"`
// MinPoolSize specifies the minimum number of connections allowed in the driver's connection pool to each server. If
// this is non-zero, each server's pool will be maintained in the background to ensure that the size does not fall below
// the minimum. This can also be set through the "minPoolSize" URI option (e.g. "minPoolSize=100"). The default is 0.
MinPoolSize *uint64 `json:"minPoolSize"`
// SocketTimeoutMS specifies how long the driver will wait for a socket read or write to return before returning a
// network error. If this is 0 meaning no timeout is used and socket operations can block indefinitely.
// The default is 300,000 ms.
SocketTimeoutMS *int64 `json:"socketTimeoutMS"`
// ReadPreference determines which servers are considered suitable for read operations.
// default is PrimaryMode
ReadPreference *ReadPref `json:"readPreference"`
// can be used to provide authentication options when configuring a Client.
Auth *Credential `json:"auth"`
}
// Credential can be used to provide authentication options when configuring a Client.
//
// AuthMechanism: the mechanism to use for authentication. Supported values include "SCRAM-SHA-256", "SCRAM-SHA-1",
// "MONGODB-CR", "PLAIN", "GSSAPI", "MONGODB-X509", and "MONGODB-AWS". This can also be set through the "authMechanism"
// URI option. (e.g. "authMechanism=PLAIN"). For more information, see
// https://docs.mongodb.com/manual/core/authentication-mechanisms/.
// AuthSource: the name of the database to use for authentication. This defaults to "$external" for MONGODB-X509,
// GSSAPI, and PLAIN and "admin" for all other mechanisms. This can also be set through the "authSource" URI option
// (e.g. "authSource=otherDb").
//
// Username: the username for authentication. This can also be set through the URI as a username:password pair before
// the first @ character. For example, a URI for user "user", password "pwd", and host "localhost:27017" would be
// "mongodb://user:pwd@localhost:27017". This is optional for X509 authentication and will be extracted from the
// client certificate if not specified.
//
// Password: the password for authentication. This must not be specified for X509 and is optional for GSSAPI
// authentication.
//
// PasswordSet: For GSSAPI, this must be true if a password is specified, even if the password is the empty string, and
// false if no password is specified, indicating that the password should be taken from the context of the running
// process. For other mechanisms, this field is ignored.
type Credential struct {
AuthMechanism string `json:"authMechanism"`
AuthSource string `json:"authSource"`
Username string `json:"username"`
Password string `json:"password"`
PasswordSet bool `json:"passwordSet"`
}
// ReadPref determines which servers are considered suitable for read operations.
type ReadPref struct {
// MaxStaleness is the maximum amount of time to allow a server to be considered eligible for selection.
// Supported from version 3.4.
MaxStalenessMS int64 `json:"maxStalenessMS"`
// indicates the user's preference on reads.
// PrimaryMode as default
Mode readpref.Mode `json:"mode"`
}
// QmgoClient specifies the instance to operate mongoDB
type QmgoClient struct {
*Collection
*Database
*Client
}
// Open creates client instance according to config
// QmgoClient can operates all qmgo.client 、qmgo.database and qmgo.collection
func Open(ctx context.Context, conf *Config, o ...options.ClientOptions) (cli *QmgoClient, err error) {
client, err := NewClient(ctx, conf, o...)
if err != nil {
fmt.Println("new client fail", err)
return
}
db := client.Database(conf.Database)
coll := db.Collection(conf.Coll)
cli = &QmgoClient{
Client: client,
Database: db,
Collection: coll,
}
return
}
// Client creates client to mongo
type Client struct {
client *mongo.Client
conf Config
registry *bsoncodec.Registry
}
// NewClient creates Qmgo MongoDB client
func NewClient(ctx context.Context, conf *Config, o ...options.ClientOptions) (cli *Client, err error) {
opt, err := newConnectOpts(conf, o...)
if err != nil {
return nil, err
}
client, err := client(ctx, opt)
if err != nil {
fmt.Println("new client fail", err)
return
}
cli = &Client{
client: client,
conf: *conf,
registry: opt.Registry,
}
return
}
// client creates connection to MongoDB
func client(ctx context.Context, opt *officialOpts.ClientOptions) (client *mongo.Client, err error) {
client, err = mongo.Connect(ctx, opt)
if err != nil {
fmt.Println(err)
return
}
// half of default connect timeout
pCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
if err = client.Ping(pCtx, readpref.Primary()); err != nil {
fmt.Println(err)
return
}
return
}
// newConnectOpts creates client options from conf
// Qmgo will follow this way official mongodb driver do:
// - the configuration in uri takes precedence over the configuration in the setter
// - Check the validity of the configuration in the uri, while the configuration in the setter is basically not checked
func newConnectOpts(conf *Config, o ...options.ClientOptions) (*officialOpts.ClientOptions, error) {
option := officialOpts.Client()
for _, apply := range o {
option = officialOpts.MergeClientOptions(apply.ClientOptions)
}
if conf.ConnectTimeoutMS != nil {
timeoutDur := time.Duration(*conf.ConnectTimeoutMS) * time.Millisecond
option.SetConnectTimeout(timeoutDur)
}
if conf.SocketTimeoutMS != nil {
timeoutDur := time.Duration(*conf.SocketTimeoutMS) * time.Millisecond
option.SetSocketTimeout(timeoutDur)
} else {
option.SetSocketTimeout(300 * time.Second)
}
if conf.MaxPoolSize != nil {
option.SetMaxPoolSize(*conf.MaxPoolSize)
}
if conf.MinPoolSize != nil {
option.SetMinPoolSize(*conf.MinPoolSize)
}
if conf.ReadPreference != nil {
readPreference, err := newReadPref(*conf.ReadPreference)
if err != nil {
return nil, err
}
option.SetReadPreference(readPreference)
}
if conf.Auth != nil {
auth, err := newAuth(*conf.Auth)
if err != nil {
return nil, err
}
option.SetAuth(auth)
}
option.ApplyURI(conf.Uri)
return option, nil
}
// newAuth create options.Credential from conf.Auth
func newAuth(auth Credential) (credential officialOpts.Credential, err error) {
if auth.AuthMechanism != "" {
credential.AuthMechanism = auth.AuthMechanism
}
if auth.AuthSource != "" {
credential.AuthSource = auth.AuthSource
}
if auth.Username != "" {
// Validate and process the username.
if strings.Contains(auth.Username, "/") {
err = ErrNotSupportedUsername
return
}
credential.Username, err = url.QueryUnescape(auth.Username)
if err != nil {
err = ErrNotSupportedUsername
return
}
}
credential.PasswordSet = auth.PasswordSet
if auth.Password != "" {
if strings.Contains(auth.Password, ":") {
err = ErrNotSupportedPassword
return
}
if strings.Contains(auth.Password, "/") {
err = ErrNotSupportedPassword
return
}
credential.Password, err = url.QueryUnescape(auth.Password)
if err != nil {
err = ErrNotSupportedPassword
return
}
credential.Password = auth.Password
}
return
}
// newReadPref create readpref.ReadPref from config
func newReadPref(pref ReadPref) (*readpref.ReadPref, error) {
readPrefOpts := make([]readpref.Option, 0, 1)
if pref.MaxStalenessMS != 0 {
readPrefOpts = append(readPrefOpts, readpref.WithMaxStaleness(time.Duration(pref.MaxStalenessMS)*time.Millisecond))
}
mode := readpref.PrimaryMode
if pref.Mode != 0 {
mode = pref.Mode
}
readPreference, err := readpref.New(mode, readPrefOpts...)
return readPreference, err
}
// Close closes sockets to the topology referenced by this Client.
func (c *Client) Close(ctx context.Context) error {
err := c.client.Disconnect(ctx)
return err
}
// Ping confirm connection is alive
func (c *Client) Ping(timeout int64) error {
var err error
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
defer cancel()
if err = c.client.Ping(ctx, readpref.Primary()); err != nil {
return err
}
return nil
}
// Database create connection to database
func (c *Client) Database(name string, options ...*options.DatabaseOptions) *Database {
opts := make([]*officialOpts.DatabaseOptions, 0, len(options))
for _, o := range options {
opts = append(opts, o.DatabaseOptions)
}
databaseOpts := officialOpts.MergeDatabaseOptions(opts...)
return &Database{database: c.client.Database(name, databaseOpts), registry: c.registry}
}
// Session create one session on client
// Watch out, close session after operation done
func (c *Client) Session(opt ...*options.SessionOptions) (*Session, error) {
sessionOpts := officialOpts.Session()
if len(opt) > 0 && opt[0].SessionOptions != nil {
sessionOpts = opt[0].SessionOptions
}
s, err := c.client.StartSession(sessionOpts)
return &Session{session: s}, err
}
// DoTransaction do whole transaction in one function
// precondition:
// - version of mongoDB server >= v4.0
// - Topology of mongoDB server is not Single
// At the same time, please pay attention to the following
// - make sure all operations in callback use the sessCtx as context parameter
// - if operations in callback takes more than(include equal) 120s, the operations will not take effect,
// - if operation in callback return qmgo.ErrTransactionRetry,
// the whole transaction will retry, so this transaction must be idempotent
// - if operations in callback return qmgo.ErrTransactionNotSupported,
// - If the ctx parameter already has a Session attached to it, it will be replaced by this session.
func (c *Client) DoTransaction(ctx context.Context, callback func(sessCtx context.Context) (interface{}, error), opts ...*options.TransactionOptions) (interface{}, error) {
if !c.transactionAllowed() {
return nil, ErrTransactionNotSupported
}
s, err := c.Session()
if err != nil {
return nil, err
}
defer s.EndSession(ctx)
return s.StartTransaction(ctx, callback, opts...)
}
// ServerVersion get the version of mongoDB server, like 4.4.0
func (c *Client) ServerVersion() string {
var buildInfo bson.Raw
err := c.client.Database("admin").RunCommand(
context.Background(),
bson.D{{"buildInfo", 1}},
).Decode(&buildInfo)
if err != nil {
fmt.Println("run command err", err)
return ""
}
v, err := buildInfo.LookupErr("version")
if err != nil {
fmt.Println("look up err", err)
return ""
}
return v.StringValue()
}
// transactionAllowed check if transaction is allowed
func (c *Client) transactionAllowed() bool {
vr, err := CompareVersions("4.0", c.ServerVersion())
if err != nil {
return false
}
if vr > 0 {
fmt.Println("transaction is not supported because mongo server version is below 4.0")
return false
}
// TODO dont know why need to do `cli, err := Open(ctx, &c.conf)` in topology() to get topo,
// Before figure it out, we only use this function in UT
//topo, err := c.topology()
//if topo == description.Single {
// fmt.Println("transaction is not supported because mongo server topology is single")
// return false
//}
return true
}