forked from couchbase/sync_gateway
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain_test_cluster.go
239 lines (199 loc) · 7.59 KB
/
main_test_cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
// Copyright 2022-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.
package base
import (
"context"
"fmt"
"strings"
"time"
"github.com/couchbase/gocb/v2"
)
// tbpCluster defines the required test bucket pool cluster operations
type tbpCluster interface {
getBucketNames() ([]string, error)
insertBucket(name string, quotaMB int) error
removeBucket(name string) error
openTestBucket(name tbpBucketName, waitUntilReady time.Duration) (Bucket, error)
supportsCollections() (bool, error)
isServerEnterprise() (bool, error)
close() error
}
type clusterLogFunc func(ctx context.Context, format string, args ...interface{})
// newTestCluster returns a cluster based on the driver used by the defaultBucketSpec. Accepts a clusterLogFunc to support
// cluster logging within a test bucket pool context
func newTestCluster(server string, logger clusterLogFunc) tbpCluster {
return newTestClusterV2(server, logger)
}
// tbpClusterV2 implements the tbpCluster interface for a gocb v2 cluster
type tbpClusterV2 struct {
logger clusterLogFunc
server string
// cluster can be used to perform cluster-level operations (but not bucket-level operations)
cluster *gocb.Cluster
}
var _ tbpCluster = &tbpClusterV2{}
func newTestClusterV2(server string, logger clusterLogFunc) *tbpClusterV2 {
tbpCluster := &tbpClusterV2{}
tbpCluster.logger = logger
tbpCluster.cluster = initV2Cluster(server)
tbpCluster.server = server
return tbpCluster
}
// initV2Cluster makes cluster connection. Callers must close.
func initV2Cluster(server string) *gocb.Cluster {
testClusterTimeout := 10 * time.Second
spec := BucketSpec{
Server: server,
TLSSkipVerify: true,
BucketOpTimeout: &testClusterTimeout,
}
connStr, err := spec.GetGoCBConnString(nil)
if err != nil {
FatalfCtx(context.TODO(), "error getting connection string: %v", err)
}
securityConfig, err := GoCBv2SecurityConfig(&spec.TLSSkipVerify, spec.CACertPath)
if err != nil {
FatalfCtx(context.TODO(), "Couldn't initialize cluster security config: %v", err)
}
authenticatorConfig, authErr := GoCBv2Authenticator(TestClusterUsername(), TestClusterPassword(), spec.Certpath, spec.Keypath)
if authErr != nil {
FatalfCtx(context.TODO(), "Couldn't initialize cluster authenticator config: %v", authErr)
}
timeoutsConfig := GoCBv2TimeoutsConfig(spec.BucketOpTimeout, StdlibDurationPtr(spec.GetViewQueryTimeout()))
clusterOptions := gocb.ClusterOptions{
Authenticator: authenticatorConfig,
SecurityConfig: securityConfig,
TimeoutsConfig: timeoutsConfig,
}
cluster, err := gocb.Connect(connStr, clusterOptions)
if err != nil {
FatalfCtx(context.TODO(), "Couldn't connect to %q: %v", server, err)
}
const clusterReadyTimeout = 90 * time.Second
err = cluster.WaitUntilReady(clusterReadyTimeout, nil)
if err != nil {
FatalfCtx(context.TODO(), "Cluster not ready after %ds: %v", int(clusterReadyTimeout.Seconds()), err)
}
return cluster
}
// isServerEnterprise returns true if the connected returns true if the connected couchbase server
// instance is Enterprise edition And false for Community edition
func (c *tbpClusterV2) isServerEnterprise() (bool, error) {
metadata, err := c.cluster.Internal().GetNodesMetadata(&gocb.GetNodesMetadataOptions{})
if err != nil {
return false, err
}
if strings.Contains("enterprise", metadata[0].Version) {
return true, nil
}
return false, nil
}
func (c *tbpClusterV2) getBucketNames() ([]string, error) {
bucketSettings, err := c.cluster.Buckets().GetAllBuckets(nil)
if err != nil {
return nil, fmt.Errorf("couldn't get buckets from cluster: %w", err)
}
var names []string
for name, _ := range bucketSettings {
names = append(names, name)
}
return names, nil
}
func (c *tbpClusterV2) insertBucket(name string, quotaMB int) error {
settings := gocb.CreateBucketSettings{
BucketSettings: gocb.BucketSettings{
Name: name,
RAMQuotaMB: uint64(quotaMB),
BucketType: gocb.CouchbaseBucketType,
FlushEnabled: true,
NumReplicas: tbpNumReplicas(),
},
}
options := &gocb.CreateBucketOptions{
Timeout: 10 * time.Second,
}
return c.cluster.Buckets().CreateBucket(settings, options)
}
func (c *tbpClusterV2) removeBucket(name string) error {
return c.cluster.Buckets().DropBucket(name, nil)
}
// openTestBucket opens the bucket of the given name for the gocb cluster in the given TestBucketPool.
func (c *tbpClusterV2) openTestBucket(testBucketName tbpBucketName, waitUntilReady time.Duration) (Bucket, error) {
bucketCluster := initV2Cluster(c.server)
// bucketSpec := getTestBucketSpec(testBucketName, usingNamedCollections)
bucketSpec := getTestBucketSpec(testBucketName)
bucketFromSpec, err := GetGocbV2BucketFromCluster(bucketCluster, bucketSpec, waitUntilReady, false)
if err != nil {
return nil, err
}
return bucketFromSpec, nil
}
func (c *tbpClusterV2) close() error {
// no close operations needed
if c.cluster != nil {
if err := c.cluster.Close(nil); err != nil {
c.logger(context.Background(), "Couldn't close cluster connection: %v", err)
return err
}
}
return nil
}
func (c *tbpClusterV2) getMinClusterCompatVersion() int {
nodesMeta, err := c.cluster.Internal().GetNodesMetadata(nil)
if err != nil {
FatalfCtx(context.Background(), "TEST: failed to fetch nodes metadata: %v", err)
}
if len(nodesMeta) < 1 {
panic("invalid NodesMetadata: no nodes")
}
return nodesMeta[0].ClusterCompatibility
}
func (c *tbpClusterV2) supportsCollections() (bool, error) {
major, _, err := getClusterVersion(c.cluster)
if err != nil {
return false, err
}
return major >= 7, nil
}
// dropAllScopesAndCollections attempts to drop *all* non-_default scopes and collections from the bucket associated with the collection, except those used by the test bucket pool. Intended for test usage only.
func dropAllScopesAndCollections(bucket *gocb.Bucket) error {
cm := bucket.Collections()
scopes, err := cm.GetAllScopes(nil)
if err != nil {
if httpErr, ok := err.(gocb.HTTPError); ok && httpErr.StatusCode == 404 {
return ErrCollectionsUnsupported
}
WarnfCtx(context.TODO(), "Error getting scopes on bucket %s: %v Will retry.", MD(bucket.Name()).Redact(), err)
return err
}
// For each non-default scope, drop them.
// For each collection within the default scope, drop them.
for _, scope := range scopes {
if scope.Name != DefaultScope && !strings.HasPrefix(scope.Name, tbpScopePrefix) {
scopeName := fmt.Sprintf("scope %s on bucket %s", MD(scope).Redact(), MD(bucket.Name()).Redact())
TracefCtx(context.TODO(), KeyAll, "Dropping %s", scopeName)
if err := cm.DropScope(scope.Name, nil); err != nil {
WarnfCtx(context.TODO(), "Error dropping %s: %v Will retry.", scopeName, err)
return err
}
continue
}
// can't delete _default scope - but we can delete the non-_default collections within it
for _, collection := range scope.Collections {
if collection.Name != DefaultCollection && !strings.HasPrefix(collection.Name, tbpCollectionPrefix) {
collectionName := fmt.Sprintf("collection %s in scope %s on bucket %s", MD(collection.Name).Redact(), MD(scope).Redact(), MD(bucket.Name()).Redact())
TracefCtx(context.TODO(), KeyAll, "Dropping %s", collectionName)
if err := cm.DropCollection(collection, nil); err != nil {
WarnfCtx(context.TODO(), "Error dropping %s: %v Will retry.", collectionName, err)
return err
}
}
}
}
return nil
}