-
Notifications
You must be signed in to change notification settings - Fork 82
/
tools.py
101 lines (89 loc) · 4.03 KB
/
tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import time
from ccmlib.node import Node
def retry_till_success(fun, *args, **kwargs):
timeout = kwargs["timeout"] or 60
deadline = time.time() + timeout
exception = None
while time.time() < deadline:
try:
if len(args) == 0:
fun(None)
else:
fun(*args)
return
except Exception as e:
exception = e
raise exception
def insert_c1c2(cursor, key, consistency="QUORUM"):
cursor.execute('UPDATE cf USING CONSISTENCY %s SET c1=value1, c2=value2 WHERE key=k%d' % (consistency, key))
def insert_columns(cursor, key, columns_count, consistency="QUORUM"):
kvs = [ "c%d=value%d" % (i, i) for i in xrange(0, columns_count)]
query = 'UPDATE cf USING CONSISTENCY %s SET %s WHERE key=k%d' % (consistency, ', '.join(kvs), key)
cursor.execute(query)
def query_c1c2(cursor, key, consistency="QUORUM"):
cursor.execute('SELECT c1, c2 FROM cf USING CONSISTENCY %s WHERE key=k%d' % (consistency, key))
assert cursor.rowcount == 1
res = cursor.fetchone()
assert len(res) == 2 and res[0] == 'value1' and res[1] == 'value2'
def query_columns(cursor, key, columns_count, consistency="QUORUM"):
cursor.execute('SELECT c0..c%d FROM cf USING CONSISTENCY %s WHERE key=k%d' % (columns_count, consistency, key))
assert cursor.rowcount == 1
res = cursor.fetchone()
assert len(res) == columns_count
for i in xrange(0, columns_count):
assert res[i] == 'value%d' % i
def remove_c1c2(cursor, key, consistency="QUORUM"):
cursor.execute('DELETE c1, c2 FROM cf USING CONSISTENCY %s WHERE key=k%d' % (consistency, key))
# work for cluster started by populate
def new_node(cluster, bootstrap=True, token=None):
i = len(cluster.nodes) + 1
node = Node('node%s' % i,
cluster,
bootstrap,
('127.0.0.%s' % i, 9160),
('127.0.0.%s' % i, 7000),
str(7000 + i * 100),
token)
cluster.add(node, not bootstrap)
return node
# Simple puts and get (on one row), testing both reads by names and by slice,
# with overwrites and flushes between inserts to make sure we hit multiple
# sstables on reads
def putget(cluster, cursor, cl="QUORUM"):
kvs = [ "c%02d=value%d" % (i, i) for i in xrange(0, 100) ]
cursor.execute('UPDATE cf USING CONSISTENCY %s SET %s WHERE key=k0' % (cl, ','.join(kvs)))
time.sleep(.01)
cluster.flush()
kvs = [ "c%02d=value%d" % (i*2, i*4) for i in xrange(0, 50) ]
cursor.execute('UPDATE cf USING CONSISTENCY %s SET %s WHERE key=k0' % (cl, ','.join(kvs)))
time.sleep(.01)
cluster.flush()
kvs = [ "c%02d=value%d" % (i*5, i*20) for i in xrange(0, 20) ]
cursor.execute('UPDATE cf USING CONSISTENCY %s SET %s WHERE key=k0' % (cl, ','.join(kvs)))
time.sleep(.01)
cluster.flush()
# reads by name
ks = [ "c%02d" % i for i in xrange(0, 100) ]
cursor.execute('SELECT %s FROM cf USING CONSISTENCY %s WHERE key=k0' % (','.join(ks), cl))
assert cursor.rowcount == 1
res = cursor.fetchone()
assert len(res) == 100
for i in xrange(0, 100):
if i % 5 == 0:
assert res[i] == 'value%d' % (i*4), 'for %d, expecting value%d, got %s' % (i, i*4, res[i])
elif i % 2 == 0:
assert res[i] == 'value%d' % (i*2), 'for %d, expecting value%d, got %s' % (i, i*2, res[i])
else:
assert res[i] == 'value%d' % i, 'for %d, expecting value%d, got %s' % (i, i, res[i])
# slice reads
cursor.execute('SELECT * FROM cf USING CONSISTENCY %s WHERE key=k0' % cl)
assert cursor.rowcount == 1
res = cursor.fetchone()[1:] # removing key
assert len(res) == 100
for i in xrange(0, 100):
if i % 5 == 0:
assert res[i] == 'value%d' % (i*4), 'for %d, expecting value%d, got %s' % (i, i*4, res[i])
elif i % 2 == 0:
assert res[i] == 'value%d' % (i*2), 'for %d, expecting value%d, got %s' % (i, i*2, res[i])
else:
assert res[i] == 'value%d' % i, 'for %d, expecting value%d, got %s' % (i, i, res[i])