1717 */
1818package org .apache .hadoop .hbase .master .cleaner ;
1919
20- import java .util .List ;
21- import java .util .Map ;
22-
23- import org .apache .hadoop .hbase .classification .InterfaceAudience ;
20+ import com .google .common .annotations .VisibleForTesting ;
21+ import org .apache .commons .logging .Log ;
22+ import org .apache .commons .logging .LogFactory ;
2423import org .apache .hadoop .conf .Configuration ;
24+ import org .apache .hadoop .fs .FileStatus ;
2525import org .apache .hadoop .fs .FileSystem ;
2626import org .apache .hadoop .fs .Path ;
2727import org .apache .hadoop .hbase .Stoppable ;
28+ import org .apache .hadoop .hbase .classification .InterfaceAudience ;
29+ import org .apache .hadoop .hbase .conf .ConfigurationObserver ;
2830import org .apache .hadoop .hbase .io .HFileLink ;
2931import org .apache .hadoop .hbase .regionserver .StoreFileInfo ;
32+
33+ import java .io .IOException ;
34+ import java .util .ArrayList ;
35+ import java .util .List ;
36+ import java .util .Map ;
37+ import java .util .concurrent .BlockingQueue ;
38+ import java .util .concurrent .LinkedBlockingQueue ;
39+
3040/**
3141 * This Chore, every time it runs, will clear the HFiles in the hfile archive
3242 * folder that are deletable for each HFile cleaner in the chain.
3343 */
3444@ InterfaceAudience .Private
35- public class HFileCleaner extends CleanerChore <BaseHFileCleanerDelegate > {
45+ public class HFileCleaner extends CleanerChore <BaseHFileCleanerDelegate > implements
46+ ConfigurationObserver {
3647
3748 public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins" ;
3849
@@ -41,6 +52,34 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
4152 this (period , stopper , conf , fs , directory , null );
4253 }
4354
55+ // Configuration key for large/small throttle point
56+ public final static String HFILE_DELETE_THROTTLE_THRESHOLD =
57+ "hbase.regionserver.thread.hfilecleaner.throttle" ;
58+ public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024 ;// 64M
59+
60+ // Configuration key for large queue size
61+ public final static String LARGE_HFILE_DELETE_QUEUE_SIZE =
62+ "hbase.regionserver.hfilecleaner.large.queue.size" ;
63+ public final static int DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE = 1048576 ;
64+
65+ // Configuration key for small queue size
66+ public final static String SMALL_HFILE_DELETE_QUEUE_SIZE =
67+ "hbase.regionserver.hfilecleaner.small.queue.size" ;
68+ public final static int DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE = 1048576 ;
69+
70+ private static final Log LOG = LogFactory .getLog (HFileCleaner .class );
71+
72+ BlockingQueue <HFileDeleteTask > largeFileQueue ;
73+ BlockingQueue <HFileDeleteTask > smallFileQueue ;
74+ private int throttlePoint ;
75+ private int largeQueueSize ;
76+ private int smallQueueSize ;
77+ private List <Thread > threads = new ArrayList <Thread >();
78+ private boolean running ;
79+
80+ private long deletedLargeFiles = 0L ;
81+ private long deletedSmallFiles = 0L ;
82+
4483 /**
4584 * @param period the period of time to sleep between each run
4685 * @param stopper the stopper
@@ -53,6 +92,15 @@ public HFileCleaner(final int period, final Stoppable stopper, Configuration con
5392 Path directory , Map <String , Object > params ) {
5493 super ("HFileCleaner" , period , stopper , conf , fs ,
5594 directory , MASTER_HFILE_CLEANER_PLUGINS , params );
95+ throttlePoint =
96+ conf .getInt (HFILE_DELETE_THROTTLE_THRESHOLD , DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD );
97+ largeQueueSize =
98+ conf .getInt (LARGE_HFILE_DELETE_QUEUE_SIZE , DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE );
99+ smallQueueSize =
100+ conf .getInt (SMALL_HFILE_DELETE_QUEUE_SIZE , DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE );
101+ largeFileQueue = new LinkedBlockingQueue <HFileCleaner .HFileDeleteTask >(largeQueueSize );
102+ smallFileQueue = new LinkedBlockingQueue <HFileCleaner .HFileDeleteTask >(smallQueueSize );
103+ startHFileDeleteThreads ();
56104 }
57105
58106 @ Override
@@ -69,4 +117,267 @@ protected boolean validate(Path file) {
69117 public List <BaseHFileCleanerDelegate > getDelegatesForTesting () {
70118 return this .cleanersChain ;
71119 }
120+
121+ @ Override
122+ public int deleteFiles (Iterable <FileStatus > filesToDelete ) {
123+ int deletedFiles = 0 ;
124+ List <HFileDeleteTask > tasks = new ArrayList <HFileDeleteTask >();
125+ // construct delete tasks and add into relative queue
126+ for (FileStatus file : filesToDelete ) {
127+ HFileDeleteTask task = deleteFile (file );
128+ if (task != null ) {
129+ tasks .add (task );
130+ }
131+ }
132+ // wait for each submitted task to finish
133+ for (HFileDeleteTask task : tasks ) {
134+ if (task .getResult ()) {
135+ deletedFiles ++;
136+ }
137+ }
138+ return deletedFiles ;
139+ }
140+
141+ /**
142+ * Construct an {@link HFileDeleteTask} for each file to delete and add into the correct queue
143+ * @param file the file to delete
144+ * @return HFileDeleteTask to track progress
145+ */
146+ private HFileDeleteTask deleteFile (FileStatus file ) {
147+ HFileDeleteTask task = new HFileDeleteTask (file );
148+ boolean enqueued = dispatch (task );
149+ return enqueued ? task : null ;
150+ }
151+
152+ private boolean dispatch (HFileDeleteTask task ) {
153+ if (task .fileLength >= this .throttlePoint ) {
154+ if (!this .largeFileQueue .offer (task )) {
155+ if (LOG .isTraceEnabled ()) {
156+ LOG .trace ("Large file deletion queue is full" );
157+ }
158+ return false ;
159+ }
160+ } else {
161+ if (!this .smallFileQueue .offer (task )) {
162+ if (LOG .isTraceEnabled ()) {
163+ LOG .trace ("Small file deletion queue is full" );
164+ }
165+ return false ;
166+ }
167+ }
168+ return true ;
169+ }
170+
171+ @ Override
172+ public void cleanup () {
173+ super .cleanup ();
174+ stopHFileDeleteThreads ();
175+ }
176+
177+ /**
178+ * Start threads for hfile deletion
179+ */
180+ private void startHFileDeleteThreads () {
181+ final String n = Thread .currentThread ().getName ();
182+ running = true ;
183+ // start thread for large file deletion
184+ Thread large = new Thread () {
185+ @ Override
186+ public void run () {
187+ consumerLoop (largeFileQueue );
188+ }
189+ };
190+ large .setDaemon (true );
191+ large .setName (n + "-HFileCleaner.large-" + System .currentTimeMillis ());
192+ large .start ();
193+ LOG .debug ("Starting hfile cleaner for large files: " + large .getName ());
194+ threads .add (large );
195+
196+ // start thread for small file deletion
197+ Thread small = new Thread () {
198+ @ Override
199+ public void run () {
200+ consumerLoop (smallFileQueue );
201+ }
202+ };
203+ small .setDaemon (true );
204+ small .setName (n + "-HFileCleaner.small-" + System .currentTimeMillis ());
205+ small .start ();
206+ LOG .debug ("Starting hfile cleaner for small files: " + small .getName ());
207+ threads .add (small );
208+ }
209+
210+ protected void consumerLoop (BlockingQueue <HFileDeleteTask > queue ) {
211+ try {
212+ while (running ) {
213+ HFileDeleteTask task = null ;
214+ try {
215+ task = queue .take ();
216+ } catch (InterruptedException e ) {
217+ if (LOG .isDebugEnabled ()) {
218+ LOG .debug ("Interrupted while trying to take a task from queue" , e );
219+ }
220+ break ;
221+ }
222+ if (task != null ) {
223+ if (LOG .isDebugEnabled ()) {
224+ LOG .debug ("Removing: " + task .filePath + " from archive" );
225+ }
226+ boolean succeed ;
227+ try {
228+ succeed = this .fs .delete (task .filePath , false );
229+ } catch (IOException e ) {
230+ LOG .warn ("Failed to delete file " + task .filePath , e );
231+ succeed = false ;
232+ }
233+ task .setResult (succeed );
234+ if (succeed ) {
235+ countDeletedFiles (queue == largeFileQueue );
236+ }
237+ }
238+ }
239+ } finally {
240+ if (LOG .isDebugEnabled ()) {
241+ LOG .debug ("Exit thread: " + Thread .currentThread ());
242+ }
243+ }
244+ }
245+
246+ // Currently only for testing purpose
247+ private void countDeletedFiles (boolean isLarge ) {
248+ if (isLarge ) {
249+ if (deletedLargeFiles == Long .MAX_VALUE ) {
250+ LOG .info ("Deleted more than Long.MAX_VALUE large files, reset counter to 0" );
251+ deletedLargeFiles = 0L ;
252+ }
253+ deletedLargeFiles ++;
254+ } else {
255+ if (deletedSmallFiles == Long .MAX_VALUE ) {
256+ LOG .info ("Deleted more than Long.MAX_VALUE small files, reset counter to 0" );
257+ deletedSmallFiles = 0L ;
258+ }
259+ deletedSmallFiles ++;
260+ }
261+ }
262+
263+ /**
264+ * Stop threads for hfile deletion
265+ */
266+ private void stopHFileDeleteThreads () {
267+ running = false ;
268+ if (LOG .isDebugEnabled ()) {
269+ LOG .debug ("Stopping file delete threads" );
270+ }
271+ for (Thread thread : threads ){
272+ thread .interrupt ();
273+ }
274+ }
275+
276+ static class HFileDeleteTask {
277+ private static final long MAX_WAIT = 60 * 1000L ;
278+ private static final long WAIT_UNIT = 1000L ;
279+
280+ boolean done = false ;
281+ boolean result ;
282+ final Path filePath ;
283+ final long fileLength ;
284+
285+ public HFileDeleteTask (FileStatus file ) {
286+ this .filePath = file .getPath ();
287+ this .fileLength = file .getLen ();
288+ }
289+
290+ public synchronized void setResult (boolean result ) {
291+ this .done = true ;
292+ this .result = result ;
293+ notify ();
294+ }
295+
296+ public synchronized boolean getResult () {
297+ long waitTime = 0 ;
298+ try {
299+ while (!done ) {
300+ wait (WAIT_UNIT );
301+ waitTime += WAIT_UNIT ;
302+ if (done ) {
303+ return this .result ;
304+ }
305+ if (waitTime > MAX_WAIT ) {
306+ LOG .warn ("Wait more than " + MAX_WAIT + " ms for deleting " + this .filePath
307+ + ", exit..." );
308+ return false ;
309+ }
310+ }
311+ } catch (InterruptedException e ) {
312+ LOG .warn ("Interrupted while waiting for result of deleting " + filePath
313+ + ", will return false" , e );
314+ return false ;
315+ }
316+ return this .result ;
317+ }
318+ }
319+
320+ @ VisibleForTesting
321+ public List <Thread > getCleanerThreads () {
322+ return threads ;
323+ }
324+
325+ @ VisibleForTesting
326+ public long getNumOfDeletedLargeFiles () {
327+ return deletedLargeFiles ;
328+ }
329+
330+ @ VisibleForTesting
331+ public long getNumOfDeletedSmallFiles () {
332+ return deletedSmallFiles ;
333+ }
334+
335+ @ VisibleForTesting
336+ public long getLargeQueueSize () {
337+ return largeQueueSize ;
338+ }
339+
340+ @ VisibleForTesting
341+ public long getSmallQueueSize () {
342+ return smallQueueSize ;
343+ }
344+
345+ @ VisibleForTesting
346+ public long getThrottlePoint () {
347+ return throttlePoint ;
348+ }
349+
350+ @ Override
351+ public void onConfigurationChange (Configuration conf ) {
352+ StringBuilder builder = new StringBuilder ();
353+ builder .append ("Updating configuration for HFileCleaner, previous throttle point: " )
354+ .append (throttlePoint ).append (", largeQueueSize: " ).append (largeQueueSize )
355+ .append (", smallQueueSize: " ).append (smallQueueSize );
356+ stopHFileDeleteThreads ();
357+ this .throttlePoint =
358+ conf .getInt (HFILE_DELETE_THROTTLE_THRESHOLD , DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD );
359+ this .largeQueueSize =
360+ conf .getInt (LARGE_HFILE_DELETE_QUEUE_SIZE , DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE );
361+ this .smallQueueSize =
362+ conf .getInt (SMALL_HFILE_DELETE_QUEUE_SIZE , DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE );
363+ // record the left over tasks
364+ List <HFileDeleteTask > leftOverTasks = new ArrayList <>();
365+ for (HFileDeleteTask task : largeFileQueue ) {
366+ leftOverTasks .add (task );
367+ }
368+ for (HFileDeleteTask task : smallFileQueue ) {
369+ leftOverTasks .add (task );
370+ }
371+ largeFileQueue = new LinkedBlockingQueue <HFileCleaner .HFileDeleteTask >(largeQueueSize );
372+ smallFileQueue = new LinkedBlockingQueue <HFileCleaner .HFileDeleteTask >(smallQueueSize );
373+ threads .clear ();
374+ builder .append ("; new throttle point: " ).append (throttlePoint ).append (", largeQueueSize: " )
375+ .append (largeQueueSize ).append (", smallQueueSize: " ).append (smallQueueSize );
376+ LOG .debug (builder .toString ());
377+ startHFileDeleteThreads ();
378+ // re-dispatch the left over tasks
379+ for (HFileDeleteTask task : leftOverTasks ) {
380+ dispatch (task );
381+ }
382+ }
72383}
0 commit comments