Skip to content

Commit eebda10

Browse files
BukrosSzabolcsApache9
authored andcommitted
HBASE-26271 Cleanup the broken store files under data directory (#3786)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Josh Elser <elserj@apache.org> Signed-off-by: Wellington Ramos Chevreuil <wchevreuil@apache.org>
1 parent 71bd286 commit eebda10

23 files changed

+556
-35
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.apache.hadoop.hbase.KeyValue;
3939
import org.apache.hadoop.hbase.PrivateCellUtil;
4040
import org.apache.hadoop.hbase.TableName;
41-
import org.apache.hadoop.hbase.regionserver.CellSink;
4241
import org.apache.hadoop.hbase.regionserver.HMobStore;
4342
import org.apache.hadoop.hbase.regionserver.HStore;
4443
import org.apache.hadoop.hbase.regionserver.HStoreFile;
@@ -286,7 +285,6 @@ private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throw
286285
* </ol>
287286
* @param fd File details
288287
* @param scanner Where to read from.
289-
* @param writer Where to write to.
290288
* @param smallestReadPoint Smallest read point.
291289
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
292290
* @param throughputController The compaction throughput controller.
@@ -295,7 +293,7 @@ private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throw
295293
* @return Whether compaction ended; false if it was interrupted for any reason.
296294
*/
297295
@Override
298-
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
296+
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
299297
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
300298
boolean major, int numofFilesToCompact) throws IOException {
301299
long bytesWrittenProgressForLog = 0;
@@ -665,7 +663,7 @@ private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId
665663

666664

667665
@Override
668-
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
666+
protected List<Path> commitWriter(FileDetails fd,
669667
CompactionRequestImpl request) throws IOException {
670668
List<Path> newFiles = Lists.newArrayList(writer.getPath());
671669
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,11 @@ public List<Path> abortWriters() {
110110
return paths;
111111
}
112112

113-
protected abstract Collection<StoreFileWriter> writers();
113+
/**
114+
* Returns all writers. This is used to prevent deleting currently writen storefiles
115+
* during cleanup.
116+
*/
117+
public abstract Collection<StoreFileWriter> writers();
114118

115119
/**
116120
* Subclasses override this method to be called at the end of a successful sequence of append; all
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.regionserver;
19+
20+
import java.io.IOException;
21+
import java.util.Arrays;
22+
import java.util.List;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.atomic.AtomicLong;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.fs.FileStatus;
27+
import org.apache.hadoop.fs.Path;
28+
import org.apache.hadoop.hbase.ScheduledChore;
29+
import org.apache.hadoop.hbase.Stoppable;
30+
import org.apache.hadoop.hbase.io.HFileLink;
31+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
32+
import org.apache.hadoop.ipc.RemoteException;
33+
import org.apache.yetus.audience.InterfaceAudience;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
/**
38+
* This Chore, every time it runs, will clear the unsused HFiles in the data
39+
* folder.
40+
*/
41+
@InterfaceAudience.Private
42+
public class BrokenStoreFileCleaner extends ScheduledChore {
43+
private static final Logger LOG = LoggerFactory.getLogger(BrokenStoreFileCleaner.class);
44+
public static final String BROKEN_STOREFILE_CLEANER_ENABLED =
45+
"hbase.region.broken.storefilecleaner.enabled";
46+
public static final boolean DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED = false;
47+
public static final String BROKEN_STOREFILE_CLEANER_TTL =
48+
"hbase.region.broken.storefilecleaner.ttl";
49+
public static final long DEFAULT_BROKEN_STOREFILE_CLEANER_TTL = 1000 * 60 * 60 * 12; //12h
50+
public static final String BROKEN_STOREFILE_CLEANER_DELAY =
51+
"hbase.region.broken.storefilecleaner.delay";
52+
public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY = 1000 * 60 * 60 * 2; //2h
53+
public static final String BROKEN_STOREFILE_CLEANER_DELAY_JITTER =
54+
"hbase.region.broken.storefilecleaner.delay.jitter";
55+
public static final double DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER = 0.25D;
56+
public static final String BROKEN_STOREFILE_CLEANER_PERIOD =
57+
"hbase.region.broken.storefilecleaner.period";
58+
public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD = 1000 * 60 * 60 * 6; //6h
59+
60+
private HRegionServer regionServer;
61+
private final AtomicBoolean enabled = new AtomicBoolean(true);
62+
private long fileTtl;
63+
64+
public BrokenStoreFileCleaner(final int delay, final int period, final Stoppable stopper,
65+
Configuration conf, HRegionServer regionServer) {
66+
super("BrokenStoreFileCleaner", stopper, period, delay);
67+
this.regionServer = regionServer;
68+
setEnabled(
69+
conf.getBoolean(BROKEN_STOREFILE_CLEANER_ENABLED, DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED));
70+
fileTtl = conf.getLong(BROKEN_STOREFILE_CLEANER_TTL, DEFAULT_BROKEN_STOREFILE_CLEANER_TTL);
71+
}
72+
73+
public boolean setEnabled(final boolean enabled) {
74+
return this.enabled.getAndSet(enabled);
75+
}
76+
77+
public boolean getEnabled() {
78+
return this.enabled.get();
79+
}
80+
81+
@Override
82+
public void chore() {
83+
if (getEnabled()) {
84+
long start = EnvironmentEdgeManager.currentTime();
85+
AtomicLong deletedFiles = new AtomicLong(0);
86+
AtomicLong failedDeletes = new AtomicLong(0);
87+
for (HRegion region : regionServer.getRegions()) {
88+
for (HStore store : region.getStores()) {
89+
//only do cleanup in stores not using tmp directories
90+
if (store.getStoreEngine().requireWritingToTmpDirFirst()) {
91+
continue;
92+
}
93+
Path storePath =
94+
new Path(region.getRegionFileSystem().getRegionDir(), store.getColumnFamilyName());
95+
96+
try {
97+
List<FileStatus> fsStoreFiles =
98+
Arrays.asList(region.getRegionFileSystem().fs.listStatus(storePath));
99+
fsStoreFiles.forEach(
100+
file -> cleanFileIfNeeded(file, store, deletedFiles, failedDeletes));
101+
} catch (IOException e) {
102+
LOG.warn("Failed to list files in {}, cleanup is skipped there",storePath);
103+
continue;
104+
}
105+
}
106+
}
107+
LOG.debug(
108+
"BrokenStoreFileCleaner on {} run for: {}ms. It deleted {} files and tried but failed "
109+
+ "to delete {}",
110+
regionServer.getServerName().getServerName(), EnvironmentEdgeManager.currentTime() - start,
111+
deletedFiles.get(), failedDeletes.get());
112+
} else {
113+
LOG.trace("Broken storefile Cleaner chore disabled! Not cleaning.");
114+
}
115+
}
116+
117+
private void cleanFileIfNeeded(FileStatus file, HStore store,
118+
AtomicLong deletedFiles, AtomicLong failedDeletes) {
119+
if(file.isDirectory()){
120+
LOG.trace("This is a Directory {}, skip cleanup", file.getPath());
121+
return;
122+
}
123+
124+
if(!validate(file.getPath())){
125+
LOG.trace("Invalid file {}, skip cleanup", file.getPath());
126+
return;
127+
}
128+
129+
if(!isOldEnough(file)){
130+
LOG.trace("Fresh file {}, skip cleanup", file.getPath());
131+
return;
132+
}
133+
134+
if(isActiveStorefile(file, store)){
135+
LOG.trace("Actively used storefile file {}, skip cleanup", file.getPath());
136+
return;
137+
}
138+
139+
// Compacted files can still have readers and are cleaned by a separate chore, so they have to
140+
// be skipped here
141+
if(isCompactedFile(file, store)){
142+
LOG.trace("Cleanup is done by a different chore for file {}, skip cleanup", file.getPath());
143+
return;
144+
}
145+
146+
if(isCompactionResultFile(file, store)){
147+
LOG.trace("The file is the result of an ongoing compaction {}, skip cleanup", file.getPath());
148+
return;
149+
}
150+
151+
deleteFile(file, store, deletedFiles, failedDeletes);
152+
}
153+
154+
private boolean isCompactionResultFile(FileStatus file, HStore store) {
155+
return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath());
156+
}
157+
158+
// Compacted files can still have readers and are cleaned by a separate chore, so they have to
159+
// be skipped here
160+
private boolean isCompactedFile(FileStatus file, HStore store) {
161+
return store.getStoreEngine().getStoreFileManager().getCompactedfiles().stream()
162+
.anyMatch(sf -> sf.getPath().equals(file.getPath()));
163+
}
164+
165+
private boolean isActiveStorefile(FileStatus file, HStore store) {
166+
return store.getStoreEngine().getStoreFileManager().getStorefiles().stream()
167+
.anyMatch(sf -> sf.getPath().equals(file.getPath()));
168+
}
169+
170+
boolean validate(Path file) {
171+
if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
172+
return true;
173+
}
174+
return StoreFileInfo.validateStoreFileName(file.getName());
175+
}
176+
177+
boolean isOldEnough(FileStatus file){
178+
return file.getModificationTime() + fileTtl < EnvironmentEdgeManager.currentTime();
179+
}
180+
181+
private void deleteFile(FileStatus file, HStore store, AtomicLong deletedFiles,
182+
AtomicLong failedDeletes) {
183+
Path filePath = file.getPath();
184+
LOG.debug("Removing {} from store", filePath);
185+
try {
186+
boolean success = store.getFileSystem().delete(filePath, false);
187+
if (!success) {
188+
failedDeletes.incrementAndGet();
189+
LOG.warn("Attempted to delete:" + filePath
190+
+ ", but couldn't. Attempt to delete on next pass.");
191+
}
192+
else{
193+
deletedFiles.incrementAndGet();
194+
}
195+
} catch (IOException e) {
196+
e = e instanceof RemoteException ?
197+
((RemoteException)e).unwrapRemoteException() : e;
198+
LOG.warn("Error while deleting: " + filePath, e);
199+
}
200+
}
201+
202+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void append(Cell cell) throws IOException {
7171
}
7272

7373
@Override
74-
protected Collection<StoreFileWriter> writers() {
74+
public Collection<StoreFileWriter> writers() {
7575
return lowerBoundary2Writer.values();
7676
}
7777

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,7 @@ public Path commitDaughterRegion(final RegionInfo regionInfo, List<Path> allRegi
609609
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
610610
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
611611
env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false);
612-
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
612+
insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
613613
}
614614
return regionDir;
615615
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,8 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
432432
*/
433433
final ServerNonceManager nonceManager;
434434

435+
private BrokenStoreFileCleaner brokenStoreFileCleaner;
436+
435437
@InterfaceAudience.Private
436438
CompactedHFilesDischarger compactedFileDischarger;
437439

@@ -1831,6 +1833,9 @@ private void startServices() throws IOException {
18311833
if (this.slowLogTableOpsChore != null) {
18321834
choreService.scheduleChore(slowLogTableOpsChore);
18331835
}
1836+
if (this.brokenStoreFileCleaner != null) {
1837+
choreService.scheduleChore(brokenStoreFileCleaner);
1838+
}
18341839

18351840
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
18361841
// an unhandled exception, it will just exit.
@@ -1910,6 +1915,22 @@ private void initializeThreads() {
19101915
this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
19111916
onlyMetaRefresh, this, this);
19121917
}
1918+
1919+
int brokenStoreFileCleanerPeriod = conf.getInt(
1920+
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
1921+
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
1922+
int brokenStoreFileCleanerDelay = conf.getInt(
1923+
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
1924+
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
1925+
double brokenStoreFileCleanerDelayJitter = conf.getDouble(
1926+
BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
1927+
BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
1928+
double jitterRate = (RandomUtils.nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
1929+
long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
1930+
this.brokenStoreFileCleaner =
1931+
new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
1932+
brokenStoreFileCleanerPeriod, this, conf, this);
1933+
19131934
registerConfigurationObservers();
19141935
}
19151936

@@ -3484,6 +3505,11 @@ protected boolean clusterMode() {
34843505
return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
34853506
}
34863507

3508+
@InterfaceAudience.Private
3509+
public BrokenStoreFileCleaner getBrokenStoreFileCleaner(){
3510+
return brokenStoreFileCleaner;
3511+
}
3512+
34873513
@Override
34883514
protected void stopChores() {
34893515
shutdownChore(nonceManagerChore);
@@ -3494,5 +3520,6 @@ protected void stopChores() {
34943520
shutdownChore(storefileRefresher);
34953521
shutdownChore(fsUtilizationChore);
34963522
shutdownChore(slowLogTableOpsChore);
3523+
shutdownChore(brokenStoreFileCleaner);
34973524
}
34983525
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1156,6 +1156,12 @@ protected List<HStoreFile> doCompaction(CompactionRequestImpl cr,
11561156
}
11571157
}
11581158
replaceStoreFiles(filesToCompact, sfs, true);
1159+
1160+
// This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
1161+
// CleanerChore know that compaction is done and the file can be cleaned up if compaction
1162+
// have failed.
1163+
storeEngine.resetCompactionWriter();
1164+
11591165
if (cr.isMajor()) {
11601166
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
11611167
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,11 @@
4242
import org.apache.hadoop.hbase.log.HBaseMarkers;
4343
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
4444
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
45+
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
4546
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
4647
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
4748
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
49+
import org.apache.hadoop.hbase.security.User;
4850
import org.apache.hadoop.hbase.util.ReflectionUtils;
4951
import org.apache.yetus.audience.InterfaceAudience;
5052
import org.slf4j.Logger;
@@ -532,6 +534,25 @@ public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
532534
}
533535
}
534536

537+
/**
538+
* Whether the implementation of the used storefile tracker requires you to write to temp
539+
* directory first, i.e, does not allow broken store files under the actual data directory.
540+
*/
541+
public boolean requireWritingToTmpDirFirst() {
542+
return storeFileTracker.requireWritingToTmpDirFirst();
543+
}
544+
545+
/**
546+
* Resets the compaction writer when the new file is committed and used as active storefile.
547+
* This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
548+
* CleanerChore know that compaction is done and the file can be cleaned up if compaction
549+
* have failed. Currently called in
550+
* @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List)
551+
*/
552+
public void resetCompactionWriter(){
553+
compactor.resetWriter();
554+
}
555+
535556
@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",
536557
allowedOnPath = ".*/TestHStore.java")
537558
ReadWriteLock getLock() {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void setNoStripeMetadata() {
5858
}
5959

6060
@Override
61-
protected Collection<StoreFileWriter> writers() {
61+
public Collection<StoreFileWriter> writers() {
6262
return existingWriters;
6363
}
6464

0 commit comments

Comments
 (0)