30
30
31
31
import static com .elasticinbox .core .cassandra .CassandraDAOFactory .CF_LABEL_INDEX ;
32
32
import static me .prettyprint .hector .api .factory .HFactory .createColumn ;
33
+ import static me .prettyprint .hector .api .factory .HFactory .createSliceQuery ;
33
34
import static org .hamcrest .Matchers .greaterThan ;
34
35
import static org .hamcrest .Matchers .lessThan ;
36
+ import static org .hamcrest .Matchers .hasItem ;
37
+ import static org .hamcrest .Matchers .equalTo ;
35
38
import static org .junit .Assert .assertThat ;
39
+ import static org .junit .Assert .assertNotNull ;
36
40
41
+ import java .util .HashSet ;
37
42
import java .util .UUID ;
38
43
39
44
import me .prettyprint .cassandra .serializers .BytesArraySerializer ;
43
48
import me .prettyprint .hector .api .Cluster ;
44
49
import me .prettyprint .hector .api .ConsistencyLevelPolicy ;
45
50
import me .prettyprint .hector .api .Keyspace ;
51
+ import me .prettyprint .hector .api .beans .ColumnSlice ;
52
+ import me .prettyprint .hector .api .beans .HColumn ;
46
53
import me .prettyprint .hector .api .factory .HFactory ;
47
54
import me .prettyprint .hector .api .mutation .Mutator ;
55
+ import me .prettyprint .hector .api .query .QueryResult ;
56
+ import me .prettyprint .hector .api .query .SliceQuery ;
48
57
49
58
import org .junit .After ;
50
59
import org .junit .Before ;
@@ -61,13 +70,17 @@ public class ThrottlingMutatorTest
61
70
final static BytesArraySerializer byteSe = BytesArraySerializer .get ();
62
71
63
72
final static String KEYSPACE = "ElasticInbox" ;
73
+ final static int LABEL_T1 = 5555 ;
74
+ final static int LABEL_T2 = 5000 ;
64
75
final static String MAILBOX = "throttling@elasticinbox.com" ;
65
- final static int LABEL = 5555 ;
76
+ final static String KEY_T1 = MAILBOX + ":" + LABEL_T1 ;
77
+ final static String KEY_T2 = MAILBOX + ":" + LABEL_T2 ;
66
78
Cluster cluster ;
67
79
Keyspace keyspace ;
68
80
69
81
@ Before
70
- public void setupCase () {
82
+ public void setupCase ()
83
+ {
71
84
// Consistency Level Policy
72
85
ConsistencyLevelPolicy clp = new QuorumConsistencyLevel ();
73
86
@@ -76,29 +89,37 @@ public void setupCase() {
76
89
77
90
cluster = HFactory .getOrCreateCluster ("TestCluster" , conf );
78
91
keyspace = HFactory .createKeyspace (KEYSPACE , cluster , clp );
92
+
93
+ // clenaup from previous runs
94
+ Mutator <String > m = new ThrottlingMutator <String >(keyspace , strSe , 50 , 500L );
95
+ m .addDeletion (KEY_T1 , CF_LABEL_INDEX , null , strSe );
96
+ m .addDeletion (KEY_T2 , CF_LABEL_INDEX , null , strSe );
97
+ m .execute ();
79
98
}
80
99
81
100
@ After
82
- public void teardownCase () {
101
+ public void teardownCase ()
102
+ {
83
103
keyspace = null ;
84
104
cluster = null ;
85
105
}
86
106
107
+ /**
108
+ * Test throttler's delay functionality.
109
+ */
87
110
@ Test
88
111
public void testThrottlingMutatorDelay ()
89
112
{
90
113
// throttle at 100 ops/ 500 ms
91
114
Mutator <String > m = new ThrottlingMutator <String >(keyspace , strSe , 100 , 500L );
92
115
93
- UUID uuid ;
94
- String indexKey ;
95
116
long ts = System .currentTimeMillis ();
96
117
97
118
// should take 1 sec to insert 200 cols at 5ms rate
98
- for (int i = 0 ; i < 201 ; i ++) {
99
- uuid = new MessageIdBuilder (). build ();
100
- indexKey = MAILBOX + ":" + LABEL ;
101
- m .addInsertion (indexKey , CF_LABEL_INDEX , createColumn (uuid , new byte [0 ], uuidSe , byteSe ));
119
+ for (int i = 0 ; i < 201 ; i ++)
120
+ {
121
+ UUID uuid = new MessageIdBuilder (). build () ;
122
+ m .addInsertion (KEY_T1 , CF_LABEL_INDEX , createColumn (uuid , new byte [0 ], uuidSe , byteSe ));
102
123
}
103
124
104
125
m .execute ();
@@ -109,4 +130,58 @@ public void testThrottlingMutatorDelay()
109
130
assertThat (elapsed , greaterThan (1000L ));
110
131
assertThat (elapsed , lessThan (1200L ));
111
132
}
133
+
134
+ @ Test
135
+ public void testThrottlingMutatorConsistency ()
136
+ {
137
+ int sampleCount = 250 ;
138
+
139
+ // throttle at 50 ops/ 100 ms
140
+ Mutator <String > m = new ThrottlingMutator <String >(keyspace , strSe , 100 , 500L );
141
+
142
+ HashSet <UUID > messageIds = new HashSet <UUID >();
143
+ final byte [] value = "consistent" .getBytes ();
144
+
145
+ // STEP: add samples
146
+ for (int i = 0 ; i < sampleCount ; i ++)
147
+ {
148
+ UUID uuid = new MessageIdBuilder ().build ();
149
+ m .addInsertion (KEY_T2 , CF_LABEL_INDEX , createColumn (uuid , value , uuidSe , byteSe ));
150
+ messageIds .add (uuid );
151
+ }
152
+
153
+ m .execute ();
154
+
155
+ // STEP: validate additions
156
+ SliceQuery <String , UUID , byte []> q =
157
+ createSliceQuery (keyspace , strSe , uuidSe , byteSe );
158
+ q .setColumnFamily (CF_LABEL_INDEX );
159
+ q .setKey (KEY_T2 );
160
+ q .setRange (null , null , false , 500 );
161
+
162
+ QueryResult <ColumnSlice <UUID , byte []>> r = q .execute ();
163
+
164
+ for (HColumn <UUID , byte []> c : r .get ().getColumns ())
165
+ {
166
+ assertNotNull (c );
167
+ assertNotNull (c .getValue ());
168
+ assertThat (messageIds , hasItem (c .getName ()));
169
+ assertThat (value , equalTo (c .getValue ()));
170
+ }
171
+
172
+ assertThat (sampleCount , equalTo (r .get ().getColumns ().size ()));
173
+
174
+ // STEP: delete samples
175
+ for (UUID uuid : messageIds )
176
+ {
177
+ m .addDeletion (KEY_T2 , CF_LABEL_INDEX , uuid , uuidSe );
178
+ }
179
+
180
+ m .execute ();
181
+
182
+ // STEP: validate deletions
183
+ r = q .execute ();
184
+
185
+ assertThat (0 , equalTo (r .get ().getColumns ().size ()));
186
+ }
112
187
}
0 commit comments