18
18
package org .apache .hadoop .hbase .backup ;
19
19
20
20
import java .io .IOException ;
21
- import java .util .ArrayList ;
22
21
import java .util .Collections ;
23
22
import java .util .HashSet ;
24
- import java .util .List ;
25
- import java .util .Map ;
26
23
import java .util .Set ;
24
+ import org .apache .commons .lang3 .NotImplementedException ;
27
25
import org .apache .hadoop .conf .Configuration ;
28
26
import org .apache .hadoop .fs .FileStatus ;
29
27
import org .apache .hadoop .fs .Path ;
30
28
import org .apache .hadoop .hbase .Abortable ;
31
29
import org .apache .hadoop .hbase .HBaseInterfaceAudience ;
32
30
import org .apache .hadoop .hbase .TableName ;
33
31
import org .apache .hadoop .hbase .backup .impl .BackupSystemTable ;
32
+ import org .apache .hadoop .hbase .backup .impl .BulkLoad ;
34
33
import org .apache .hadoop .hbase .client .Connection ;
35
34
import org .apache .hadoop .hbase .client .ConnectionFactory ;
36
35
import org .apache .hadoop .hbase .master .cleaner .BaseHFileCleanerDelegate ;
42
41
import org .apache .hbase .thirdparty .com .google .common .collect .Iterables ;
43
42
44
43
/**
45
- * Implementation of a file cleaner that checks if an hfile is still referenced by backup before
46
- * deleting it from hfile archive directory.
44
+ * File cleaner that prevents deletion of HFiles that are still required by future incremental
45
+ * backups.
46
+ * <p>
47
+ * Bulk loaded HFiles that are needed by future updates are stored in the backup system table.
47
48
*/
48
49
@ InterfaceAudience .LimitedPrivate (HBaseInterfaceAudience .CONFIG )
49
50
public class BackupHFileCleaner extends BaseHFileCleanerDelegate implements Abortable {
50
51
private static final Logger LOG = LoggerFactory .getLogger (BackupHFileCleaner .class );
52
+
51
53
private boolean stopped = false ;
52
- private boolean aborted ;
53
- private Configuration conf ;
54
+ private boolean aborted = false ;
54
55
private Connection connection ;
55
- private long prevReadFromBackupTbl = 0 , // timestamp of most recent read from backup:system table
56
- secondPrevReadFromBackupTbl = 0 ; // timestamp of 2nd most recent read from backup:system table
57
- // used by unit test to skip reading backup:system
58
- private boolean checkForFullyBackedUpTables = true ;
59
- private List <TableName > fullyBackedUpTables = null ;
60
-
61
- private Set <String > getFilenameFromBulkLoad (Map <byte [], List <Path >>[] maps ) {
62
- Set <String > filenames = new HashSet <>();
63
- for (Map <byte [], List <Path >> map : maps ) {
64
- if (map == null ) {
65
- continue ;
66
- }
67
-
68
- for (List <Path > paths : map .values ()) {
69
- for (Path p : paths ) {
70
- filenames .add (p .getName ());
71
- }
72
- }
73
- }
74
- return filenames ;
75
- }
76
-
77
- private Set <String > loadHFileRefs (List <TableName > tableList ) throws IOException {
78
- if (connection == null ) {
79
- connection = ConnectionFactory .createConnection (conf );
80
- }
81
- try (BackupSystemTable tbl = new BackupSystemTable (connection )) {
82
- Map <byte [], List <Path >>[] res = tbl .readBulkLoadedFiles (null , tableList );
83
- secondPrevReadFromBackupTbl = prevReadFromBackupTbl ;
84
- prevReadFromBackupTbl = EnvironmentEdgeManager .currentTime ();
85
- return getFilenameFromBulkLoad (res );
86
- }
87
- }
88
-
89
- @ InterfaceAudience .Private
90
- void setCheckForFullyBackedUpTables (boolean b ) {
91
- checkForFullyBackedUpTables = b ;
92
- }
56
+ // timestamp of most recent read from backup system table
57
+ private long prevReadFromBackupTbl = 0 ;
58
+ // timestamp of 2nd most recent read from backup system table
59
+ private long secondPrevReadFromBackupTbl = 0 ;
93
60
94
61
@ Override
95
62
public Iterable <FileStatus > getDeletableFiles (Iterable <FileStatus > files ) {
96
- if (conf == null ) {
97
- return files ;
63
+ if (stopped ) {
64
+ return Collections . emptyList () ;
98
65
}
99
- // obtain the Set of TableName's which have been fully backed up
100
- // so that we filter BulkLoad to be returned from server
101
- if (checkForFullyBackedUpTables ) {
102
- if (connection == null ) {
103
- return files ;
104
- }
105
66
106
- try (BackupSystemTable tbl = new BackupSystemTable (connection )) {
107
- fullyBackedUpTables = new ArrayList <>(tbl .getTablesIncludedInBackups ());
108
- } catch (IOException ioe ) {
109
- LOG .error ("Failed to get tables which have been fully backed up, skipping checking" , ioe );
110
- return Collections .emptyList ();
67
+ // We use filenames because the HFile will have been moved to the archive since it
68
+ // was registered.
69
+ final Set <String > hfileFilenames = new HashSet <>();
70
+ try (BackupSystemTable tbl = new BackupSystemTable (connection )) {
71
+ Set <TableName > tablesIncludedInBackups = fetchFullyBackedUpTables (tbl );
72
+ for (BulkLoad bulkLoad : tbl .readBulkloadRows (tablesIncludedInBackups )) {
73
+ hfileFilenames .add (new Path (bulkLoad .getHfilePath ()).getName ());
111
74
}
112
- Collections .sort (fullyBackedUpTables );
113
- }
114
- final Set <String > hfileRefs ;
115
- try {
116
- hfileRefs = loadHFileRefs (fullyBackedUpTables );
75
+ LOG .debug ("Found {} unique HFile filenames registered as bulk loads." , hfileFilenames .size ());
117
76
} catch (IOException ioe ) {
118
- LOG .error ("Failed to read hfile references, skipping checking deletable files" , ioe );
77
+ LOG .error (
78
+ "Failed to read registered bulk load references from backup system table, marking all files as non-deletable." ,
79
+ ioe );
119
80
return Collections .emptyList ();
120
81
}
121
- Iterable <FileStatus > deletables = Iterables .filter (files , file -> {
122
- // If the file is recent, be conservative and wait for one more scan of backup:system table
82
+
83
+ secondPrevReadFromBackupTbl = prevReadFromBackupTbl ;
84
+ prevReadFromBackupTbl = EnvironmentEdgeManager .currentTime ();
85
+
86
+ return Iterables .filter (files , file -> {
87
+ // If the file is recent, be conservative and wait for one more scan of the bulk loads
123
88
if (file .getModificationTime () > secondPrevReadFromBackupTbl ) {
89
+ LOG .debug ("Preventing deletion due to timestamp: {}" , file .getPath ().toString ());
124
90
return false ;
125
91
}
92
+ // A file can be deleted if it is not registered as a backup bulk load.
126
93
String hfile = file .getPath ().getName ();
127
- boolean foundHFileRef = hfileRefs .contains (hfile );
128
- return !foundHFileRef ;
94
+ if (hfileFilenames .contains (hfile )) {
95
+ LOG .debug ("Preventing deletion due to bulk load registration in backup system table: {}" ,
96
+ file .getPath ().toString ());
97
+ return false ;
98
+ } else {
99
+ LOG .debug ("OK to delete: {}" , file .getPath ().toString ());
100
+ return true ;
101
+ }
129
102
});
130
- return deletables ;
103
+ }
104
+
105
+ protected Set <TableName > fetchFullyBackedUpTables (BackupSystemTable tbl ) throws IOException {
106
+ return tbl .getTablesIncludedInBackups ();
131
107
}
132
108
133
109
@ Override
134
110
public boolean isFileDeletable (FileStatus fStat ) {
135
- // work is done in getDeletableFiles()
136
- return true ;
111
+ throw new NotImplementedException ("This method should not be called" );
137
112
}
138
113
139
114
@ Override
140
115
public void setConf (Configuration config ) {
141
- this .conf = config ;
142
116
this .connection = null ;
143
117
try {
144
- this .connection = ConnectionFactory .createConnection (conf );
118
+ this .connection = ConnectionFactory .createConnection (config );
145
119
} catch (IOException ioe ) {
146
120
LOG .error ("Couldn't establish connection" , ioe );
147
121
}
@@ -156,7 +130,7 @@ public void stop(String why) {
156
130
try {
157
131
this .connection .close ();
158
132
} catch (IOException ioe ) {
159
- LOG .debug ("Got " + ioe + " when closing connection" );
133
+ LOG .debug ("Got IOException when closing connection" , ioe );
160
134
}
161
135
}
162
136
this .stopped = true ;
@@ -169,7 +143,7 @@ public boolean isStopped() {
169
143
170
144
@ Override
171
145
public void abort (String why , Throwable e ) {
172
- LOG .warn ("Aborting ReplicationHFileCleaner because " + why , e );
146
+ LOG .warn ("Aborting ReplicationHFileCleaner because {}" , why , e );
173
147
this .aborted = true ;
174
148
stop (why );
175
149
}
0 commit comments