forked from apache/cassandra-gocql-driver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
integration_test.go
225 lines (187 loc) · 6.74 KB
/
integration_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
// +build all integration
package gocql
// This file groups integration tests where Cassandra has to be set up with some special integration variables
import (
"context"
"reflect"
"testing"
"time"
)
// TestAuthentication verifies that gocql will work with a host configured to only accept authenticated connections
func TestAuthentication(t *testing.T) {
if *flagProto < 2 {
t.Skip("Authentication is not supported with protocol < 2")
}
if !*flagRunAuthTest {
t.Skip("Authentication is not configured in the target cluster")
}
cluster := createCluster()
cluster.Authenticator = PasswordAuthenticator{
Username: "cassandra",
Password: "cassandra",
}
session, err := cluster.CreateSession()
if err != nil {
t.Fatalf("Authentication error: %s", err)
}
session.Close()
}
func TestGetHosts(t *testing.T) {
clusterHosts := getClusterHosts()
cluster := createCluster()
session := createSessionFromCluster(cluster, t)
hosts, partitioner, err := session.hostSource.GetHosts()
assertTrue(t, "err == nil", err == nil)
assertEqual(t, "len(hosts)", len(clusterHosts), len(hosts))
assertTrue(t, "len(partitioner) != 0", len(partitioner) != 0)
}
// TestRingDiscovery makes sure that you can autodiscover other cluster members
// when you seed a cluster config with just one node
func TestRingDiscovery(t *testing.T) {
clusterHosts := getClusterHosts()
cluster := createCluster()
cluster.Hosts = clusterHosts[:1]
session := createSessionFromCluster(cluster, t)
defer session.Close()
if *clusterSize > 1 {
// wait for autodiscovery to update the pool with the list of known hosts
time.Sleep(*flagAutoWait)
}
session.pool.mu.RLock()
defer session.pool.mu.RUnlock()
size := len(session.pool.hostConnPools)
if *clusterSize != size {
for p, pool := range session.pool.hostConnPools {
t.Logf("p=%q host=%v ips=%s", p, pool.host, pool.host.ConnectAddress().String())
}
t.Errorf("Expected a cluster size of %d, but actual size was %d", *clusterSize, size)
}
}
// TestHostFilter ensures that host filtering works even when we discover hosts
func TestHostFilter(t *testing.T) {
clusterHosts := getClusterHosts()
if len(clusterHosts) < 2 {
t.Skip("skipping because we don't have 2 or more hosts")
}
cluster := createCluster()
rr := RoundRobinHostPolicy().(*roundRobinHostPolicy)
cluster.PoolConfig.HostSelectionPolicy = rr
// we'll filter out the second host
filtered := clusterHosts[1]
cluster.Hosts = clusterHosts[:1]
cluster.HostFilter = HostFilterFunc(func(host *HostInfo) bool {
if host.ConnectAddress().String() == filtered {
return false
}
return true
})
session := createSessionFromCluster(cluster, t)
defer session.Close()
assertEqual(t, "len(rr.hosts.get()) != 0", len(clusterHosts)-1, len(rr.hosts.get()))
}
func TestWriteFailure(t *testing.T) {
cluster := createCluster()
createKeyspace(t, cluster, "test")
cluster.Keyspace = "test"
session, err := cluster.CreateSession()
if err != nil {
t.Fatal("create session:", err)
}
defer session.Close()
if err := createTable(session, "CREATE TABLE test.test (id int,value int,PRIMARY KEY (id))"); err != nil {
t.Fatalf("failed to create table with error '%v'", err)
}
if err := session.Query(`INSERT INTO test.test (id, value) VALUES (1, 1)`).Exec(); err != nil {
errWrite, ok := err.(*RequestErrWriteFailure)
if ok {
if session.cfg.ProtoVersion >= 5 {
// ErrorMap should be filled with some hosts that should've errored
if len(errWrite.ErrorMap) == 0 {
t.Fatal("errWrite.ErrorMap should have some failed hosts but it didn't have any")
}
} else {
// Map doesn't get filled for V4
if len(errWrite.ErrorMap) != 0 {
t.Fatal("errWrite.ErrorMap should have length 0, it's: ", len(errWrite.ErrorMap))
}
}
} else {
t.Fatal("error should be RequestErrWriteFailure, it's: ", errWrite)
}
} else {
t.Fatal("a write fail error should have happened when querying test keyspace")
}
if err = session.Query("DROP KEYSPACE test").Exec(); err != nil {
t.Fatal(err)
}
}
func TestCustomPayloadMessages(t *testing.T) {
cluster := createCluster()
session := createSessionFromCluster(cluster, t)
defer session.Close()
if err := createTable(session, "CREATE TABLE gocql_test.testCustomPayloadMessages (id int, value int, PRIMARY KEY (id))"); err != nil {
t.Fatal(err)
}
// QueryMessage
var customPayload = map[string][]byte{"a": []byte{10, 20}, "b": []byte{20, 30}}
query := session.Query("SELECT id FROM testCustomPayloadMessages where id = ?", 42).Consistency(One).CustomPayload(customPayload)
iter := query.Iter()
rCustomPayload := iter.GetCustomPayload()
if !reflect.DeepEqual(customPayload, rCustomPayload) {
t.Fatal("The received custom payload should match the sent")
}
iter.Close()
// Insert query
query = session.Query("INSERT INTO testCustomPayloadMessages(id,value) VALUES(1, 1)").Consistency(One).CustomPayload(customPayload)
iter = query.Iter()
rCustomPayload = iter.GetCustomPayload()
if !reflect.DeepEqual(customPayload, rCustomPayload) {
t.Fatal("The received custom payload should match the sent")
}
iter.Close()
// Batch Message
b := session.NewBatch(LoggedBatch)
b.CustomPayload = customPayload
b.Query("INSERT INTO testCustomPayloadMessages(id,value) VALUES(1, 1)")
if err := session.ExecuteBatch(b); err != nil {
t.Fatalf("query failed. %v", err)
}
}
func TestCustomPayloadValues(t *testing.T) {
cluster := createCluster()
session := createSessionFromCluster(cluster, t)
defer session.Close()
if err := createTable(session, "CREATE TABLE gocql_test.testCustomPayloadValues (id int, value int, PRIMARY KEY (id))"); err != nil {
t.Fatal(err)
}
values := []map[string][]byte{map[string][]byte{"a": []byte{10, 20}, "b": []byte{20, 30}}, nil, map[string][]byte{"a": []byte{10, 20}, "b": nil}}
for _, customPayload := range values {
query := session.Query("SELECT id FROM testCustomPayloadValues where id = ?", 42).Consistency(One).CustomPayload(customPayload)
iter := query.Iter()
rCustomPayload := iter.GetCustomPayload()
if !reflect.DeepEqual(customPayload, rCustomPayload) {
t.Fatal("The received custom payload should match the sent")
}
}
}
func TestSessionAwaitSchemaAgreement(t *testing.T) {
session := createSession(t)
defer session.Close()
if err := session.AwaitSchemaAgreement(context.Background()); err != nil {
t.Fatalf("expected session.AwaitSchemaAgreement to not return an error but got '%v'", err)
}
}
func TestUDF(t *testing.T) {
session := createSession(t)
defer session.Close()
if session.cfg.ProtoVersion < 4 {
t.Skip("skipping UDF support on proto < 4")
}
const query = `CREATE OR REPLACE FUNCTION uniq(state set<text>, val text)
CALLED ON NULL INPUT RETURNS set<text> LANGUAGE java
AS 'state.add(val); return state;'`
err := session.Query(query).Exec()
if err != nil {
t.Fatal(err)
}
}