16
16
*/
17
17
package org .apache .hadoop .hbase .quotas ;
18
18
19
+ import static org .junit .Assert .assertEquals ;
20
+ import static org .junit .Assert .assertTrue ;
19
21
import static org .junit .Assert .fail ;
20
22
21
23
import java .io .IOException ;
29
31
import java .util .concurrent .atomic .AtomicLong ;
30
32
31
33
import org .apache .hadoop .conf .Configuration ;
34
+ import org .apache .hadoop .fs .FileSystem ;
35
+ import org .apache .hadoop .fs .Path ;
32
36
import org .apache .hadoop .hbase .HBaseTestingUtility ;
33
37
import org .apache .hadoop .hbase .HColumnDescriptor ;
38
+ import org .apache .hadoop .hbase .HConstants ;
34
39
import org .apache .hadoop .hbase .HTableDescriptor ;
35
40
import org .apache .hadoop .hbase .MiniHBaseCluster ;
36
41
import org .apache .hadoop .hbase .NamespaceDescriptor ;
37
42
import org .apache .hadoop .hbase .TableName ;
43
+ import org .apache .hadoop .hbase .TableNotEnabledException ;
38
44
import org .apache .hadoop .hbase .Waiter .Predicate ;
39
45
import org .apache .hadoop .hbase .client .Admin ;
46
+ import org .apache .hadoop .hbase .client .Append ;
47
+ import org .apache .hadoop .hbase .client .ClientServiceCallable ;
40
48
import org .apache .hadoop .hbase .client .Connection ;
49
+ import org .apache .hadoop .hbase .client .Delete ;
50
+ import org .apache .hadoop .hbase .client .Increment ;
51
+ import org .apache .hadoop .hbase .client .Mutation ;
41
52
import org .apache .hadoop .hbase .client .Put ;
53
+ import org .apache .hadoop .hbase .client .Result ;
54
+ import org .apache .hadoop .hbase .client .ResultScanner ;
55
+ import org .apache .hadoop .hbase .client .Scan ;
56
+ import org .apache .hadoop .hbase .client .SecureBulkLoadClient ;
42
57
import org .apache .hadoop .hbase .client .Table ;
58
+ import org .apache .hadoop .hbase .ipc .RpcControllerFactory ;
43
59
import org .apache .hadoop .hbase .regionserver .HRegion ;
44
60
import org .apache .hadoop .hbase .regionserver .HStore ;
45
61
import org .apache .hadoop .hbase .regionserver .HStoreFile ;
62
+ import org .apache .hadoop .hbase .regionserver .TestHRegionServerBulkLoad ;
46
63
import org .apache .hadoop .hbase .util .Bytes ;
64
+ import org .apache .hadoop .hbase .util .Pair ;
65
+ import org .apache .hadoop .util .StringUtils ;
47
66
import org .apache .yetus .audience .InterfaceAudience ;
48
67
import org .junit .rules .TestName ;
49
68
import org .slf4j .Logger ;
@@ -65,6 +84,7 @@ public class SpaceQuotaHelperForTests {
65
84
private final HBaseTestingUtility testUtil ;
66
85
private final TestName testName ;
67
86
private final AtomicLong counter ;
87
+ private static final int NUM_RETRIES = 10 ;
68
88
69
89
public SpaceQuotaHelperForTests (
70
90
HBaseTestingUtility testUtil , TestName testName , AtomicLong counter ) {
@@ -110,16 +130,214 @@ long listNumDefinedQuotas(Connection conn) throws IOException {
110
130
}
111
131
}
112
132
133
+ /**
134
+ * Writes the given mutation into a table until it violates the given policy.
135
+ * Verifies that the policy has been violated & then returns the name of
136
+ * the table created & written into.
137
+ */
138
+ TableName writeUntilViolationAndVerifyViolation (
139
+ SpaceViolationPolicy policyToViolate , Mutation m ) throws Exception {
140
+ final TableName tn = writeUntilViolation (policyToViolate );
141
+ verifyViolation (policyToViolate , tn , m );
142
+ return tn ;
143
+ }
144
+
145
+ /**
146
+ * Writes the given mutation into a table until it violates the given policy.
147
+ * Returns the name of the table created & written into.
148
+ */
149
+ TableName writeUntilViolation (SpaceViolationPolicy policyToViolate ) throws Exception {
150
+ TableName tn = createTableWithRegions (10 );
151
+ setQuotaLimit (tn , policyToViolate , 2L );
152
+ // Write more data than should be allowed and flush it to disk
153
+ writeData (tn , 3L * SpaceQuotaHelperForTests .ONE_MEGABYTE );
154
+
155
+ // This should be sufficient time for the chores to run and see the change.
156
+ Thread .sleep (5000 );
157
+
158
+ return tn ;
159
+ }
160
+
161
+ /**
162
+ * Verifies that the given policy on the given table has been violated
163
+ */
164
+ void verifyViolation (SpaceViolationPolicy policyToViolate , TableName tn , Mutation m )
165
+ throws Exception {
166
+ // But let's try a few times to get the exception before failing
167
+ boolean sawError = false ;
168
+ String msg = "" ;
169
+ for (int i = 0 ; i < NUM_RETRIES && !sawError ; i ++) {
170
+ try (Table table = testUtil .getConnection ().getTable (tn )) {
171
+ if (m instanceof Put ) {
172
+ table .put ((Put ) m );
173
+ } else if (m instanceof Delete ) {
174
+ table .delete ((Delete ) m );
175
+ } else if (m instanceof Append ) {
176
+ table .append ((Append ) m );
177
+ } else if (m instanceof Increment ) {
178
+ table .increment ((Increment ) m );
179
+ } else {
180
+ fail (
181
+ "Failed to apply " + m .getClass ().getSimpleName () +
182
+ " to the table. Programming error" );
183
+ }
184
+ LOG .info ("Did not reject the " + m .getClass ().getSimpleName () + ", will sleep and retry" );
185
+ Thread .sleep (2000 );
186
+ } catch (Exception e ) {
187
+ msg = StringUtils .stringifyException (e );
188
+ if ((policyToViolate .equals (SpaceViolationPolicy .DISABLE )
189
+ && e instanceof TableNotEnabledException ) || msg .contains (policyToViolate .name ())) {
190
+ LOG .info ("Got the expected exception={}" , msg );
191
+ sawError = true ;
192
+ break ;
193
+ } else {
194
+ LOG .warn ("Did not get the expected exception, will sleep and retry" , e );
195
+ Thread .sleep (2000 );
196
+ }
197
+ }
198
+ }
199
+ if (!sawError ) {
200
+ try (Table quotaTable = testUtil .getConnection ().getTable (QuotaUtil .QUOTA_TABLE_NAME )) {
201
+ ResultScanner scanner = quotaTable .getScanner (new Scan ());
202
+ Result result = null ;
203
+ LOG .info ("Dumping contents of hbase:quota table" );
204
+ while ((result = scanner .next ()) != null ) {
205
+ LOG .info (Bytes .toString (result .getRow ()) + " => " + result .toString ());
206
+ }
207
+ scanner .close ();
208
+ }
209
+ } else {
210
+ if (policyToViolate .equals (SpaceViolationPolicy .DISABLE )) {
211
+ assertTrue (
212
+ msg .contains ("TableNotEnabledException" ) || msg .contains (policyToViolate .name ()));
213
+ } else {
214
+ assertTrue ("Expected exception message to contain the word '" + policyToViolate .name ()
215
+ + "', but was " + msg ,
216
+ msg .contains (policyToViolate .name ()));
217
+ }
218
+ }
219
+ assertTrue (
220
+ "Expected to see an exception writing data to a table exceeding its quota" , sawError );
221
+ }
222
+
223
+ /**
224
+ * Verifies that no policy has been violated on the given table
225
+ */
226
+ void verifyNoViolation (TableName tn , Mutation m ) throws Exception {
227
+ // But let's try a few times to write data before failing
228
+ boolean sawSuccess = false ;
229
+ for (int i = 0 ; i < NUM_RETRIES && !sawSuccess ; i ++) {
230
+ try (Table table = testUtil .getConnection ().getTable (tn )) {
231
+ if (m instanceof Put ) {
232
+ table .put ((Put ) m );
233
+ } else if (m instanceof Delete ) {
234
+ table .delete ((Delete ) m );
235
+ } else if (m instanceof Append ) {
236
+ table .append ((Append ) m );
237
+ } else if (m instanceof Increment ) {
238
+ table .increment ((Increment ) m );
239
+ } else {
240
+ fail ("Failed to apply " + m .getClass ().getSimpleName () + " to the table."
241
+ + " Programming error" );
242
+ }
243
+ sawSuccess = true ;
244
+ } catch (Exception e ) {
245
+ LOG .info ("Rejected the " + m .getClass ().getSimpleName () + ", will sleep and retry" );
246
+ Thread .sleep (2000 );
247
+ }
248
+ }
249
+ if (!sawSuccess ) {
250
+ try (Table quotaTable = testUtil .getConnection ().getTable (QuotaUtil .QUOTA_TABLE_NAME )) {
251
+ ResultScanner scanner = quotaTable .getScanner (new Scan ());
252
+ Result result = null ;
253
+ LOG .info ("Dumping contents of hbase:quota table" );
254
+ while ((result = scanner .next ()) != null ) {
255
+ LOG .info (Bytes .toString (result .getRow ()) + " => " + result .toString ());
256
+ }
257
+ scanner .close ();
258
+ }
259
+ }
260
+ assertTrue ("Expected to succeed in writing data to a table not having quota " , sawSuccess );
261
+ }
262
+
263
+ /**
264
+ * Sets the given quota (policy & limit) on the passed table.
265
+ */
266
+ void setQuotaLimit (final TableName tn , SpaceViolationPolicy policy , long sizeInMBs )
267
+ throws Exception {
268
+ final long sizeLimit = sizeInMBs * SpaceQuotaHelperForTests .ONE_MEGABYTE ;
269
+ QuotaSettings settings = QuotaSettingsFactory .limitTableSpace (tn , sizeLimit , policy );
270
+ testUtil .getAdmin ().setQuota (settings );
271
+ LOG .debug ("Quota limit set for table = {}, limit = {}" , tn , sizeLimit );
272
+ }
273
+
274
+ /**
275
+ * Removes the space quota from the given table
276
+ */
277
+ void removeQuotaFromtable (final TableName tn ) throws Exception {
278
+ QuotaSettings removeQuota = QuotaSettingsFactory .removeTableSpaceLimit (tn );
279
+ testUtil .getAdmin ().setQuota (removeQuota );
280
+ LOG .debug ("Space quota settings removed from the table " , tn );
281
+ }
282
+
283
+ /**
284
+ *
285
+ * @param tn the tablename
286
+ * @param numFiles number of files
287
+ * @param numRowsPerFile number of rows per file
288
+ * @return a clientServiceCallable which can be used with the Caller factory for bulk load
289
+ * @throws Exception when failed to get connection, table or preparation of the bulk load
290
+ */
291
+ ClientServiceCallable <Void > generateFileToLoad (TableName tn , int numFiles , int numRowsPerFile )
292
+ throws Exception {
293
+ Connection conn = testUtil .getConnection ();
294
+ FileSystem fs = testUtil .getTestFileSystem ();
295
+ Configuration conf = testUtil .getConfiguration ();
296
+ Path baseDir = new Path (fs .getHomeDirectory (), testName .getMethodName () + "_files" );
297
+ fs .mkdirs (baseDir );
298
+ final List <Pair <byte [], String >> famPaths = new ArrayList <Pair <byte [], String >>();
299
+ for (int i = 1 ; i <= numFiles ; i ++) {
300
+ Path hfile = new Path (baseDir , "file" + i );
301
+ TestHRegionServerBulkLoad
302
+ .createHFile (fs , hfile , Bytes .toBytes (SpaceQuotaHelperForTests .F1 ), Bytes .toBytes ("to" ),
303
+ Bytes .toBytes ("reject" ), numRowsPerFile );
304
+ famPaths .add (new Pair <>(Bytes .toBytes (SpaceQuotaHelperForTests .F1 ), hfile .toString ()));
305
+ }
306
+
307
+ // bulk load HFiles
308
+ Table table = conn .getTable (tn );
309
+ final String bulkToken = new SecureBulkLoadClient (conf , table ).prepareBulkLoad (conn );
310
+ return new ClientServiceCallable <Void >(conn , tn , Bytes .toBytes ("row" ),
311
+ new RpcControllerFactory (conf ).newController (), HConstants .PRIORITY_UNSET ) {
312
+ @ Override
313
+ public Void rpcCall () throws Exception {
314
+ SecureBulkLoadClient secureClient = null ;
315
+ byte [] regionName = getLocation ().getRegionInfo ().getRegionName ();
316
+ try (Table table = conn .getTable (getTableName ())) {
317
+ secureClient = new SecureBulkLoadClient (conf , table );
318
+ secureClient .secureBulkLoadHFiles (getStub (), famPaths , regionName , true , null , bulkToken );
319
+ }
320
+ return null ;
321
+ }
322
+ };
323
+ }
324
+
325
+ /**
326
+ * Removes all quotas defined in the HBase quota table.
327
+ */
328
+ void removeAllQuotas () throws Exception {
329
+ final Connection conn = testUtil .getConnection ();
330
+ removeAllQuotas (conn );
331
+ assertEquals (0 , listNumDefinedQuotas (conn ));
332
+ }
333
+
113
334
/**
114
335
* Removes all quotas defined in the HBase quota table.
115
336
*/
116
- void removeAllQuotas (Connection conn ) throws IOException , InterruptedException {
337
+ void removeAllQuotas (Connection conn ) throws IOException {
117
338
// Wait for the quota table to be created
118
339
if (!conn .getAdmin ().tableExists (QuotaUtil .QUOTA_TABLE_NAME )) {
119
- do {
120
- LOG .debug ("Quota table does not yet exist" );
121
- Thread .sleep (1000 );
122
- } while (!conn .getAdmin ().tableExists (QuotaUtil .QUOTA_TABLE_NAME ));
340
+ waitForQuotaTable (conn );
123
341
} else {
124
342
// Or, clean up any quotas from previous test runs.
125
343
QuotaRetriever scanner = QuotaRetriever .open (conn .getConfiguration ());
0 commit comments