17
17
*/
18
18
package org .apache .hadoop .hbase .master .cleaner ;
19
19
20
+ import com .google .common .annotations .VisibleForTesting ;
21
+ import java .io .IOException ;
22
+ import java .util .ArrayList ;
20
23
import java .util .List ;
21
24
import java .util .Map ;
22
-
23
- import org .apache .hadoop .hbase .classification .InterfaceAudience ;
25
+ import java .util .concurrent .BlockingQueue ;
26
+ import java .util .concurrent .LinkedBlockingQueue ;
27
+ import org .apache .commons .logging .Log ;
28
+ import org .apache .commons .logging .LogFactory ;
24
29
import org .apache .hadoop .conf .Configuration ;
30
+ import org .apache .hadoop .fs .FileStatus ;
25
31
import org .apache .hadoop .fs .FileSystem ;
26
32
import org .apache .hadoop .fs .Path ;
27
33
import org .apache .hadoop .hbase .Stoppable ;
34
+ import org .apache .hadoop .hbase .classification .InterfaceAudience ;
35
+ import org .apache .hadoop .hbase .conf .ConfigurationObserver ;
28
36
import org .apache .hadoop .hbase .io .HFileLink ;
29
37
import org .apache .hadoop .hbase .regionserver .StoreFileInfo ;
38
+
30
39
/**
31
40
* This Chore, every time it runs, will clear the HFiles in the hfile archive
32
41
* folder that are deletable for each HFile cleaner in the chain.
33
42
*/
34
43
@ InterfaceAudience .Private
35
- public class HFileCleaner extends CleanerChore <BaseHFileCleanerDelegate > {
44
+ public class HFileCleaner extends CleanerChore <BaseHFileCleanerDelegate > implements
45
+ ConfigurationObserver {
36
46
37
47
public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins" ;
38
48
@@ -41,6 +51,34 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
41
51
this (period , stopper , conf , fs , directory , null );
42
52
}
43
53
54
+ // Configuration key for large/small throttle point
55
+ public final static String HFILE_DELETE_THROTTLE_THRESHOLD =
56
+ "hbase.regionserver.thread.hfilecleaner.throttle" ;
57
+ public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024 ;// 64M
58
+
59
+ // Configuration key for large queue size
60
+ public final static String LARGE_HFILE_DELETE_QUEUE_SIZE =
61
+ "hbase.regionserver.hfilecleaner.large.queue.size" ;
62
+ public final static int DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE = 1048576 ;
63
+
64
+ // Configuration key for small queue size
65
+ public final static String SMALL_HFILE_DELETE_QUEUE_SIZE =
66
+ "hbase.regionserver.hfilecleaner.small.queue.size" ;
67
+ public final static int DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE = 1048576 ;
68
+
69
+ private static final Log LOG = LogFactory .getLog (HFileCleaner .class );
70
+
71
+ BlockingQueue <HFileDeleteTask > largeFileQueue ;
72
+ BlockingQueue <HFileDeleteTask > smallFileQueue ;
73
+ private int throttlePoint ;
74
+ private int largeQueueSize ;
75
+ private int smallQueueSize ;
76
+ private List <Thread > threads = new ArrayList <Thread >();
77
+ private boolean running ;
78
+
79
+ private long deletedLargeFiles = 0L ;
80
+ private long deletedSmallFiles = 0L ;
81
+
44
82
/**
45
83
* @param period the period of time to sleep between each run
46
84
* @param stopper the stopper
@@ -53,6 +91,15 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
53
91
Path directory , Map <String , Object > params ) {
54
92
super ("HFileCleaner" , period , stopper , conf , fs ,
55
93
directory , MASTER_HFILE_CLEANER_PLUGINS , params );
94
+ throttlePoint =
95
+ conf .getInt (HFILE_DELETE_THROTTLE_THRESHOLD , DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD );
96
+ largeQueueSize =
97
+ conf .getInt (LARGE_HFILE_DELETE_QUEUE_SIZE , DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE );
98
+ smallQueueSize =
99
+ conf .getInt (SMALL_HFILE_DELETE_QUEUE_SIZE , DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE );
100
+ largeFileQueue = new LinkedBlockingQueue <HFileCleaner .HFileDeleteTask >(largeQueueSize );
101
+ smallFileQueue = new LinkedBlockingQueue <HFileCleaner .HFileDeleteTask >(smallQueueSize );
102
+ startHFileDeleteThreads ();
56
103
}
57
104
58
105
@ Override
@@ -69,4 +116,267 @@ protected boolean validate(Path file) {
69
116
public List <BaseHFileCleanerDelegate > getDelegatesForTesting () {
70
117
return this .cleanersChain ;
71
118
}
119
+
120
+ @ Override
121
+ public int deleteFiles (Iterable <FileStatus > filesToDelete ) {
122
+ int deletedFiles = 0 ;
123
+ List <HFileDeleteTask > tasks = new ArrayList <HFileDeleteTask >();
124
+ // construct delete tasks and add into relative queue
125
+ for (FileStatus file : filesToDelete ) {
126
+ HFileDeleteTask task = deleteFile (file );
127
+ if (task != null ) {
128
+ tasks .add (task );
129
+ }
130
+ }
131
+ // wait for each submitted task to finish
132
+ for (HFileDeleteTask task : tasks ) {
133
+ if (task .getResult ()) {
134
+ deletedFiles ++;
135
+ }
136
+ }
137
+ return deletedFiles ;
138
+ }
139
+
140
+ /**
141
+ * Construct an {@link HFileDeleteTask} for each file to delete and add into the correct queue
142
+ * @param file the file to delete
143
+ * @return HFileDeleteTask to track progress
144
+ */
145
+ private HFileDeleteTask deleteFile (FileStatus file ) {
146
+ HFileDeleteTask task = new HFileDeleteTask (file );
147
+ boolean enqueued = dispatch (task );
148
+ return enqueued ? task : null ;
149
+ }
150
+
151
+ private boolean dispatch (HFileDeleteTask task ) {
152
+ if (task .fileLength >= this .throttlePoint ) {
153
+ if (!this .largeFileQueue .offer (task )) {
154
+ if (LOG .isTraceEnabled ()) {
155
+ LOG .trace ("Large file deletion queue is full" );
156
+ }
157
+ return false ;
158
+ }
159
+ } else {
160
+ if (!this .smallFileQueue .offer (task )) {
161
+ if (LOG .isTraceEnabled ()) {
162
+ LOG .trace ("Small file deletion queue is full" );
163
+ }
164
+ return false ;
165
+ }
166
+ }
167
+ return true ;
168
+ }
169
+
170
+ @ Override
171
+ public void cleanup () {
172
+ super .cleanup ();
173
+ stopHFileDeleteThreads ();
174
+ }
175
+
176
+ /**
177
+ * Start threads for hfile deletion
178
+ */
179
+ private void startHFileDeleteThreads () {
180
+ final String n = Thread .currentThread ().getName ();
181
+ running = true ;
182
+ // start thread for large file deletion
183
+ Thread large = new Thread () {
184
+ @ Override
185
+ public void run () {
186
+ consumerLoop (largeFileQueue );
187
+ }
188
+ };
189
+ large .setDaemon (true );
190
+ large .setName (n + "-HFileCleaner.large-" + System .currentTimeMillis ());
191
+ large .start ();
192
+ LOG .debug ("Starting hfile cleaner for large files: " + large .getName ());
193
+ threads .add (large );
194
+
195
+ // start thread for small file deletion
196
+ Thread small = new Thread () {
197
+ @ Override
198
+ public void run () {
199
+ consumerLoop (smallFileQueue );
200
+ }
201
+ };
202
+ small .setDaemon (true );
203
+ small .setName (n + "-HFileCleaner.small-" + System .currentTimeMillis ());
204
+ small .start ();
205
+ LOG .debug ("Starting hfile cleaner for small files: " + small .getName ());
206
+ threads .add (small );
207
+ }
208
+
209
+ protected void consumerLoop (BlockingQueue <HFileDeleteTask > queue ) {
210
+ try {
211
+ while (running ) {
212
+ HFileDeleteTask task = null ;
213
+ try {
214
+ task = queue .take ();
215
+ } catch (InterruptedException e ) {
216
+ if (LOG .isDebugEnabled ()) {
217
+ LOG .debug ("Interrupted while trying to take a task from queue" , e );
218
+ }
219
+ break ;
220
+ }
221
+ if (task != null ) {
222
+ if (LOG .isDebugEnabled ()) {
223
+ LOG .debug ("Removing: " + task .filePath + " from archive" );
224
+ }
225
+ boolean succeed ;
226
+ try {
227
+ succeed = this .fs .delete (task .filePath , false );
228
+ } catch (IOException e ) {
229
+ LOG .warn ("Failed to delete file " + task .filePath , e );
230
+ succeed = false ;
231
+ }
232
+ task .setResult (succeed );
233
+ if (succeed ) {
234
+ countDeletedFiles (queue == largeFileQueue );
235
+ }
236
+ }
237
+ }
238
+ } finally {
239
+ if (LOG .isDebugEnabled ()) {
240
+ LOG .debug ("Exit thread: " + Thread .currentThread ());
241
+ }
242
+ }
243
+ }
244
+
245
+ // Currently only for testing purpose
246
+ private void countDeletedFiles (boolean isLarge ) {
247
+ if (isLarge ) {
248
+ if (deletedLargeFiles == Long .MAX_VALUE ) {
249
+ LOG .info ("Deleted more than Long.MAX_VALUE large files, reset counter to 0" );
250
+ deletedLargeFiles = 0L ;
251
+ }
252
+ deletedLargeFiles ++;
253
+ } else {
254
+ if (deletedSmallFiles == Long .MAX_VALUE ) {
255
+ LOG .info ("Deleted more than Long.MAX_VALUE small files, reset counter to 0" );
256
+ deletedSmallFiles = 0L ;
257
+ }
258
+ deletedSmallFiles ++;
259
+ }
260
+ }
261
+
262
+ /**
263
+ * Stop threads for hfile deletion
264
+ */
265
+ private void stopHFileDeleteThreads () {
266
+ running = false ;
267
+ if (LOG .isDebugEnabled ()) {
268
+ LOG .debug ("Stopping file delete threads" );
269
+ }
270
+ for (Thread thread : threads ){
271
+ thread .interrupt ();
272
+ }
273
+ }
274
+
275
+ static class HFileDeleteTask {
276
+ private static final long MAX_WAIT = 60 * 1000L ;
277
+ private static final long WAIT_UNIT = 1000L ;
278
+
279
+ boolean done = false ;
280
+ boolean result ;
281
+ final Path filePath ;
282
+ final long fileLength ;
283
+
284
+ public HFileDeleteTask (FileStatus file ) {
285
+ this .filePath = file .getPath ();
286
+ this .fileLength = file .getLen ();
287
+ }
288
+
289
+ public synchronized void setResult (boolean result ) {
290
+ this .done = true ;
291
+ this .result = result ;
292
+ notify ();
293
+ }
294
+
295
+ public synchronized boolean getResult () {
296
+ long waitTime = 0 ;
297
+ try {
298
+ while (!done ) {
299
+ wait (WAIT_UNIT );
300
+ waitTime += WAIT_UNIT ;
301
+ if (done ) {
302
+ return this .result ;
303
+ }
304
+ if (waitTime > MAX_WAIT ) {
305
+ LOG .warn ("Wait more than " + MAX_WAIT + " ms for deleting " + this .filePath
306
+ + ", exit..." );
307
+ return false ;
308
+ }
309
+ }
310
+ } catch (InterruptedException e ) {
311
+ LOG .warn ("Interrupted while waiting for result of deleting " + filePath
312
+ + ", will return false" , e );
313
+ return false ;
314
+ }
315
+ return this .result ;
316
+ }
317
+ }
318
+
319
+ @ VisibleForTesting
320
+ public List <Thread > getCleanerThreads () {
321
+ return threads ;
322
+ }
323
+
324
+ @ VisibleForTesting
325
+ public long getNumOfDeletedLargeFiles () {
326
+ return deletedLargeFiles ;
327
+ }
328
+
329
+ @ VisibleForTesting
330
+ public long getNumOfDeletedSmallFiles () {
331
+ return deletedSmallFiles ;
332
+ }
333
+
334
+ @ VisibleForTesting
335
+ public long getLargeQueueSize () {
336
+ return largeQueueSize ;
337
+ }
338
+
339
+ @ VisibleForTesting
340
+ public long getSmallQueueSize () {
341
+ return smallQueueSize ;
342
+ }
343
+
344
+ @ VisibleForTesting
345
+ public long getThrottlePoint () {
346
+ return throttlePoint ;
347
+ }
348
+
349
+ @ Override
350
+ public void onConfigurationChange (Configuration conf ) {
351
+ StringBuilder builder = new StringBuilder ();
352
+ builder .append ("Updating configuration for HFileCleaner, previous throttle point: " )
353
+ .append (throttlePoint ).append (", largeQueueSize: " ).append (largeQueueSize )
354
+ .append (", smallQueueSize: " ).append (smallQueueSize );
355
+ stopHFileDeleteThreads ();
356
+ this .throttlePoint =
357
+ conf .getInt (HFILE_DELETE_THROTTLE_THRESHOLD , DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD );
358
+ this .largeQueueSize =
359
+ conf .getInt (LARGE_HFILE_DELETE_QUEUE_SIZE , DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE );
360
+ this .smallQueueSize =
361
+ conf .getInt (SMALL_HFILE_DELETE_QUEUE_SIZE , DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE );
362
+ // record the left over tasks
363
+ List <HFileDeleteTask > leftOverTasks = new ArrayList <>();
364
+ for (HFileDeleteTask task : largeFileQueue ) {
365
+ leftOverTasks .add (task );
366
+ }
367
+ for (HFileDeleteTask task : smallFileQueue ) {
368
+ leftOverTasks .add (task );
369
+ }
370
+ largeFileQueue = new LinkedBlockingQueue <HFileCleaner .HFileDeleteTask >(largeQueueSize );
371
+ smallFileQueue = new LinkedBlockingQueue <HFileCleaner .HFileDeleteTask >(smallQueueSize );
372
+ threads .clear ();
373
+ builder .append ("; new throttle point: " ).append (throttlePoint ).append (", largeQueueSize: " )
374
+ .append (largeQueueSize ).append (", smallQueueSize: " ).append (smallQueueSize );
375
+ LOG .debug (builder .toString ());
376
+ startHFileDeleteThreads ();
377
+ // re-dispatch the left over tasks
378
+ for (HFileDeleteTask task : leftOverTasks ) {
379
+ dispatch (task );
380
+ }
381
+ }
72
382
}
0 commit comments