17
17
*/
18
18
package org .apache .hadoop .hbase .master .cleaner ;
19
19
20
+ import java .io .IOException ;
21
+ import java .util .ArrayList ;
20
22
import java .util .List ;
21
23
import java .util .Map ;
24
+ import java .util .concurrent .BlockingQueue ;
25
+ import java .util .concurrent .LinkedBlockingQueue ;
22
26
27
+ import org .apache .commons .logging .Log ;
28
+ import org .apache .commons .logging .LogFactory ;
23
29
import org .apache .hadoop .hbase .classification .InterfaceAudience ;
30
+ import org .apache .hadoop .hbase .conf .ConfigurationObserver ;
24
31
import org .apache .hadoop .conf .Configuration ;
32
+ import org .apache .hadoop .fs .FileStatus ;
25
33
import org .apache .hadoop .fs .FileSystem ;
26
34
import org .apache .hadoop .fs .Path ;
27
35
import org .apache .hadoop .hbase .Stoppable ;
28
36
import org .apache .hadoop .hbase .io .HFileLink ;
29
37
import org .apache .hadoop .hbase .regionserver .StoreFileInfo ;
38
+
39
+ import com .google .common .annotations .VisibleForTesting ;
30
40
/**
31
41
* This Chore, every time it runs, will clear the HFiles in the hfile archive
32
42
* folder that are deletable for each HFile cleaner in the chain.
33
43
*/
34
44
@ InterfaceAudience .Private
35
- public class HFileCleaner extends CleanerChore <BaseHFileCleanerDelegate > {
45
+ public class HFileCleaner extends CleanerChore <BaseHFileCleanerDelegate > implements
46
+ ConfigurationObserver {
36
47
37
48
public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins" ;
38
49
@@ -41,6 +52,34 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
41
52
this (period , stopper , conf , fs , directory , null );
42
53
}
43
54
55
+ // Configuration key for large/small throttle point
56
+ public final static String HFILE_DELETE_THROTTLE_THRESHOLD =
57
+ "hbase.regionserver.thread.hfilecleaner.throttle" ;
58
+ public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024 ;// 64M
59
+
60
+ // Configuration key for large queue size
61
+ public final static String LARGE_HFILE_DELETE_QUEUE_SIZE =
62
+ "hbase.regionserver.hfilecleaner.large.queue.size" ;
63
+ public final static int DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE = 1048576 ;
64
+
65
+ // Configuration key for small queue size
66
+ public final static String SMALL_HFILE_DELETE_QUEUE_SIZE =
67
+ "hbase.regionserver.hfilecleaner.small.queue.size" ;
68
+ public final static int DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE = 1048576 ;
69
+
70
+ private static final Log LOG = LogFactory .getLog (HFileCleaner .class );
71
+
72
+ BlockingQueue <HFileDeleteTask > largeFileQueue ;
73
+ BlockingQueue <HFileDeleteTask > smallFileQueue ;
74
+ private int throttlePoint ;
75
+ private int largeQueueSize ;
76
+ private int smallQueueSize ;
77
+ private List <Thread > threads = new ArrayList <Thread >();
78
+ private boolean running ;
79
+
80
+ private long deletedLargeFiles = 0L ;
81
+ private long deletedSmallFiles = 0L ;
82
+
44
83
/**
45
84
* @param period the period of time to sleep between each run
46
85
* @param stopper the stopper
@@ -53,6 +92,15 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
53
92
Path directory , Map <String , Object > params ) {
54
93
super ("HFileCleaner" , period , stopper , conf , fs ,
55
94
directory , MASTER_HFILE_CLEANER_PLUGINS , params );
95
+ throttlePoint =
96
+ conf .getInt (HFILE_DELETE_THROTTLE_THRESHOLD , DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD );
97
+ largeQueueSize =
98
+ conf .getInt (LARGE_HFILE_DELETE_QUEUE_SIZE , DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE );
99
+ smallQueueSize =
100
+ conf .getInt (SMALL_HFILE_DELETE_QUEUE_SIZE , DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE );
101
+ largeFileQueue = new LinkedBlockingQueue <HFileCleaner .HFileDeleteTask >(largeQueueSize );
102
+ smallFileQueue = new LinkedBlockingQueue <HFileCleaner .HFileDeleteTask >(smallQueueSize );
103
+ startHFileDeleteThreads ();
56
104
}
57
105
58
106
@ Override
@@ -69,4 +117,267 @@ protected boolean validate(Path file) {
69
117
public List <BaseHFileCleanerDelegate > getDelegatesForTesting () {
70
118
return this .cleanersChain ;
71
119
}
120
+
121
+ @ Override
122
+ public int deleteFiles (Iterable <FileStatus > filesToDelete ) {
123
+ int deletedFiles = 0 ;
124
+ List <HFileDeleteTask > tasks = new ArrayList <HFileDeleteTask >();
125
+ // construct delete tasks and add into relative queue
126
+ for (FileStatus file : filesToDelete ) {
127
+ HFileDeleteTask task = deleteFile (file );
128
+ if (task != null ) {
129
+ tasks .add (task );
130
+ }
131
+ }
132
+ // wait for each submitted task to finish
133
+ for (HFileDeleteTask task : tasks ) {
134
+ if (task .getResult ()) {
135
+ deletedFiles ++;
136
+ }
137
+ }
138
+ return deletedFiles ;
139
+ }
140
+
141
+ /**
142
+ * Construct an {@link HFileDeleteTask} for each file to delete and add into the correct queue
143
+ * @param file the file to delete
144
+ * @return HFileDeleteTask to track progress
145
+ */
146
+ private HFileDeleteTask deleteFile (FileStatus file ) {
147
+ HFileDeleteTask task = new HFileDeleteTask (file );
148
+ boolean enqueued = dispatch (task );
149
+ return enqueued ? task : null ;
150
+ }
151
+
152
+ private boolean dispatch (HFileDeleteTask task ) {
153
+ if (task .fileLength >= this .throttlePoint ) {
154
+ if (!this .largeFileQueue .offer (task )) {
155
+ if (LOG .isTraceEnabled ()) {
156
+ LOG .trace ("Large file deletion queue is full" );
157
+ }
158
+ return false ;
159
+ }
160
+ } else {
161
+ if (!this .smallFileQueue .offer (task )) {
162
+ if (LOG .isTraceEnabled ()) {
163
+ LOG .trace ("Small file deletion queue is full" );
164
+ }
165
+ return false ;
166
+ }
167
+ }
168
+ return true ;
169
+ }
170
+
171
+ @ Override
172
+ public void cleanup () {
173
+ super .cleanup ();
174
+ stopHFileDeleteThreads ();
175
+ }
176
+
177
+ /**
178
+ * Start threads for hfile deletion
179
+ */
180
+ private void startHFileDeleteThreads () {
181
+ final String n = Thread .currentThread ().getName ();
182
+ running = true ;
183
+ // start thread for large file deletion
184
+ Thread large = new Thread () {
185
+ @ Override
186
+ public void run () {
187
+ consumerLoop (largeFileQueue );
188
+ }
189
+ };
190
+ large .setDaemon (true );
191
+ large .setName (n + "-HFileCleaner.large-" + System .currentTimeMillis ());
192
+ large .start ();
193
+ LOG .debug ("Starting hfile cleaner for large files: " + large .getName ());
194
+ threads .add (large );
195
+
196
+ // start thread for small file deletion
197
+ Thread small = new Thread () {
198
+ @ Override
199
+ public void run () {
200
+ consumerLoop (smallFileQueue );
201
+ }
202
+ };
203
+ small .setDaemon (true );
204
+ small .setName (n + "-HFileCleaner.small-" + System .currentTimeMillis ());
205
+ small .start ();
206
+ LOG .debug ("Starting hfile cleaner for small files: " + small .getName ());
207
+ threads .add (small );
208
+ }
209
+
210
+ protected void consumerLoop (BlockingQueue <HFileDeleteTask > queue ) {
211
+ try {
212
+ while (running ) {
213
+ HFileDeleteTask task = null ;
214
+ try {
215
+ task = queue .take ();
216
+ } catch (InterruptedException e ) {
217
+ if (LOG .isDebugEnabled ()) {
218
+ LOG .debug ("Interrupted while trying to take a task from queue" , e );
219
+ }
220
+ break ;
221
+ }
222
+ if (task != null ) {
223
+ if (LOG .isDebugEnabled ()) {
224
+ LOG .debug ("Removing: " + task .filePath + " from archive" );
225
+ }
226
+ boolean succeed ;
227
+ try {
228
+ succeed = this .fs .delete (task .filePath , false );
229
+ } catch (IOException e ) {
230
+ LOG .warn ("Failed to delete file " + task .filePath , e );
231
+ succeed = false ;
232
+ }
233
+ task .setResult (succeed );
234
+ if (succeed ) {
235
+ countDeletedFiles (queue == largeFileQueue );
236
+ }
237
+ }
238
+ }
239
+ } finally {
240
+ if (LOG .isDebugEnabled ()) {
241
+ LOG .debug ("Exit thread: " + Thread .currentThread ());
242
+ }
243
+ }
244
+ }
245
+
246
+ // Currently only for testing purpose
247
+ private void countDeletedFiles (boolean isLarge ) {
248
+ if (isLarge ) {
249
+ if (deletedLargeFiles == Long .MAX_VALUE ) {
250
+ LOG .info ("Deleted more than Long.MAX_VALUE large files, reset counter to 0" );
251
+ deletedLargeFiles = 0L ;
252
+ }
253
+ deletedLargeFiles ++;
254
+ } else {
255
+ if (deletedSmallFiles == Long .MAX_VALUE ) {
256
+ LOG .info ("Deleted more than Long.MAX_VALUE small files, reset counter to 0" );
257
+ deletedSmallFiles = 0L ;
258
+ }
259
+ deletedSmallFiles ++;
260
+ }
261
+ }
262
+
263
+ /**
264
+ * Stop threads for hfile deletion
265
+ */
266
+ private void stopHFileDeleteThreads () {
267
+ running = false ;
268
+ if (LOG .isDebugEnabled ()) {
269
+ LOG .debug ("Stopping file delete threads" );
270
+ }
271
+ for (Thread thread : threads ){
272
+ thread .interrupt ();
273
+ }
274
+ }
275
+
276
+ static class HFileDeleteTask {
277
+ private static final long MAX_WAIT = 60 * 1000L ;
278
+ private static final long WAIT_UNIT = 1000L ;
279
+
280
+ boolean done = false ;
281
+ boolean result ;
282
+ final Path filePath ;
283
+ final long fileLength ;
284
+
285
+ public HFileDeleteTask (FileStatus file ) {
286
+ this .filePath = file .getPath ();
287
+ this .fileLength = file .getLen ();
288
+ }
289
+
290
+ public synchronized void setResult (boolean result ) {
291
+ this .done = true ;
292
+ this .result = result ;
293
+ notify ();
294
+ }
295
+
296
+ public synchronized boolean getResult () {
297
+ long waitTime = 0 ;
298
+ try {
299
+ while (!done ) {
300
+ wait (WAIT_UNIT );
301
+ waitTime += WAIT_UNIT ;
302
+ if (done ) {
303
+ return this .result ;
304
+ }
305
+ if (waitTime > MAX_WAIT ) {
306
+ LOG .warn ("Wait more than " + MAX_WAIT + " ms for deleting " + this .filePath
307
+ + ", exit..." );
308
+ return false ;
309
+ }
310
+ }
311
+ } catch (InterruptedException e ) {
312
+ LOG .warn ("Interrupted while waiting for result of deleting " + filePath
313
+ + ", will return false" , e );
314
+ return false ;
315
+ }
316
+ return this .result ;
317
+ }
318
+ }
319
+
320
+ @ VisibleForTesting
321
+ public List <Thread > getCleanerThreads () {
322
+ return threads ;
323
+ }
324
+
325
+ @ VisibleForTesting
326
+ public long getNumOfDeletedLargeFiles () {
327
+ return deletedLargeFiles ;
328
+ }
329
+
330
+ @ VisibleForTesting
331
+ public long getNumOfDeletedSmallFiles () {
332
+ return deletedSmallFiles ;
333
+ }
334
+
335
+ @ VisibleForTesting
336
+ public long getLargeQueueSize () {
337
+ return largeQueueSize ;
338
+ }
339
+
340
+ @ VisibleForTesting
341
+ public long getSmallQueueSize () {
342
+ return smallQueueSize ;
343
+ }
344
+
345
+ @ VisibleForTesting
346
+ public long getThrottlePoint () {
347
+ return throttlePoint ;
348
+ }
349
+
350
+ @ Override
351
+ public void onConfigurationChange (Configuration conf ) {
352
+ StringBuilder builder = new StringBuilder ();
353
+ builder .append ("Updating configuration for HFileCleaner, previous throttle point: " )
354
+ .append (throttlePoint ).append (", largeQueueSize: " ).append (largeQueueSize )
355
+ .append (", smallQueueSize: " ).append (smallQueueSize );
356
+ stopHFileDeleteThreads ();
357
+ this .throttlePoint =
358
+ conf .getInt (HFILE_DELETE_THROTTLE_THRESHOLD , DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD );
359
+ this .largeQueueSize =
360
+ conf .getInt (LARGE_HFILE_DELETE_QUEUE_SIZE , DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE );
361
+ this .smallQueueSize =
362
+ conf .getInt (SMALL_HFILE_DELETE_QUEUE_SIZE , DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE );
363
+ // record the left over tasks
364
+ List <HFileDeleteTask > leftOverTasks = new ArrayList <>();
365
+ for (HFileDeleteTask task : largeFileQueue ) {
366
+ leftOverTasks .add (task );
367
+ }
368
+ for (HFileDeleteTask task : smallFileQueue ) {
369
+ leftOverTasks .add (task );
370
+ }
371
+ largeFileQueue = new LinkedBlockingQueue <HFileCleaner .HFileDeleteTask >(largeQueueSize );
372
+ smallFileQueue = new LinkedBlockingQueue <HFileCleaner .HFileDeleteTask >(smallQueueSize );
373
+ threads .clear ();
374
+ builder .append ("; new throttle point: " ).append (throttlePoint ).append (", largeQueueSize: " )
375
+ .append (largeQueueSize ).append (", smallQueueSize: " ).append (smallQueueSize );
376
+ LOG .debug (builder .toString ());
377
+ startHFileDeleteThreads ();
378
+ // re-dispatch the left over tasks
379
+ for (HFileDeleteTask task : leftOverTasks ) {
380
+ dispatch (task );
381
+ }
382
+ }
72
383
}
0 commit comments