Skip to content

HBASE-22390 Backport HBASE-22190 to branch-1 #230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.master.snapshot;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -32,6 +31,7 @@
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -40,6 +40,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.util.ArrayUtils;
import org.apache.hadoop.hbase.util.FSUtils;

/**
Expand Down Expand Up @@ -97,8 +98,6 @@ interface SnapshotFileInspector {
new HashMap<String, SnapshotDirectoryInfo>();
private final Timer refreshTimer;

private long lastModifiedTime = Long.MIN_VALUE;

/**
* Create a snapshot file cache for all snapshots under the specified [root]/.snapshot on the
* filesystem.
Expand All @@ -114,7 +113,7 @@ interface SnapshotFileInspector {
public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, String refreshThreadName,
SnapshotFileInspector inspectSnapshotFiles) throws IOException {
this(FSUtils.getCurrentFileSystem(conf), FSUtils.getRootDir(conf), 0, cacheRefreshPeriod,
refreshThreadName, inspectSnapshotFiles);
refreshThreadName, inspectSnapshotFiles);
}

/**
Expand All @@ -128,7 +127,8 @@ public SnapshotFileCache(Configuration conf, long cacheRefreshPeriod, String ref
* @param inspectSnapshotFiles Filter to apply to each snapshot to extract the files.
*/
public SnapshotFileCache(FileSystem fs, Path rootDir, long cacheRefreshPeriod,
long cacheRefreshDelay, String refreshThreadName, SnapshotFileInspector inspectSnapshotFiles) {
long cacheRefreshDelay, String refreshThreadName,
SnapshotFileInspector inspectSnapshotFiles) {
this.fs = fs;
this.fileInspector = inspectSnapshotFiles;
this.snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
Expand All @@ -141,25 +141,24 @@ public SnapshotFileCache(FileSystem fs, Path rootDir, long cacheRefreshPeriod,
/**
* Trigger a cache refresh, even if its before the next cache refresh. Does not affect pending
* cache refreshes.
* <p>
* <p/>
* Blocks until the cache is refreshed.
* <p>
* <p/>
* Exposed for TESTING.
*/
public void triggerCacheRefreshForTesting() {
public synchronized void triggerCacheRefreshForTesting() {
try {
SnapshotFileCache.this.refreshCache();
refreshCache();
} catch (IOException e) {
LOG.warn("Failed to refresh snapshot hfile cache!", e);
}
LOG.debug("Current cache:" + cache);
}

/**
* Check to see if any of the passed file names is contained in any of the snapshots.
* First checks an in-memory cache of the files to keep. If its not in the cache, then the cache
* is refreshed and the cache checked again for that file.
* This ensures that we never return files that exist.
* Check to see if any of the passed file names is contained in any of the snapshots. First checks
* an in-memory cache of the files to keep. If its not in the cache, then the cache is refreshed
* and the cache checked again for that file. This ensures that we never return files that exist.
* <p>
* Note this may lead to periodic false positives for the file being referenced. Periodically, the
* cache is refreshed even if there are no requests to ensure that the false negatives get removed
Expand All @@ -168,17 +167,16 @@ public void triggerCacheRefreshForTesting() {
* at that point, cache will still think the file system contains that file and return
* <tt>true</tt>, even if it is no longer present (false positive). However, if the file never was
* on the filesystem, we will never find it and always return <tt>false</tt>.
* @param files file to check, NOTE: Relies that files are loaded from hdfs before method
* is called (NOT LAZY)
* @param files file to check, NOTE: Relies that files are loaded from hdfs before method is
* called (NOT LAZY)
* @return <tt>unReferencedFiles</tt> the collection of files that do not have snapshot references
* @throws IOException if there is an unexpected error reaching the filesystem.
*/
// XXX this is inefficient to synchronize on the method, when what we really need to guard against
// is an illegal access to the cache. Really we could do a mutex-guarded pointer swap on the
// cache, but that seems overkill at the moment and isn't necessarily a bottleneck.
public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatus> files,
final SnapshotManager snapshotManager)
throws IOException {
final SnapshotManager snapshotManager) throws IOException {
List<FileStatus> unReferencedFiles = Lists.newArrayList();
boolean refreshed = false;
Lock lock = null;
Expand All @@ -188,8 +186,8 @@ public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatu
if (lock == null || lock.tryLock()) {
try {
if (snapshotManager != null && snapshotManager.isTakingAnySnapshot()) {
LOG.warn("Not checking unreferenced files since snapshot is running, it will "
+ "skip to clean the HFiles this time");
LOG.warn("Not checking unreferenced files since snapshot is running, it will " +
"skip to clean the HFiles this time");
return unReferencedFiles;
}
for (FileStatus file : files) {
Expand All @@ -212,69 +210,50 @@ public synchronized Iterable<FileStatus> getUnreferencedFiles(Iterable<FileStatu
return unReferencedFiles;
}

private synchronized void refreshCache() throws IOException {
// get the status of the snapshots directory and check if it is has changes
FileStatus dirStatus;
try {
dirStatus = fs.getFileStatus(snapshotDir);
} catch (FileNotFoundException e) {
if (this.cache.size() > 0) {
LOG.error("Snapshot directory: " + snapshotDir + " doesn't exist");
}
return;
}

// if the snapshot directory wasn't modified since we last check, we are done
if (dirStatus.getModificationTime() <= this.lastModifiedTime) return;

// directory was modified, so we need to reload our cache
// there could be a slight race here where we miss the cache, check the directory modification
// time, then someone updates the directory, causing us to not scan the directory again.
// However, snapshot directories are only created once, so this isn't an issue.

// 1. update the modified time
this.lastModifiedTime = dirStatus.getModificationTime();

// 2.clear the cache
private void refreshCache() throws IOException {
// just list the snapshot directory directly, do not check the modification time for the root
// snapshot directory, as some file system implementations do not modify the parent directory's
// modTime when there are new sub items, for example, S3.
FileStatus[] snapshotDirs = FSUtils.listStatus(fs, snapshotDir, new PathFilter() {
@Override
public boolean accept(Path path) {
return !path.getName().equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME);
}
});
// clear the cache, as in the below code, either we will also clear the snapshots, or we will
// refill the file name cache again.
this.cache.clear();
Map<String, SnapshotDirectoryInfo> known = new HashMap<String, SnapshotDirectoryInfo>();

// 3. check each of the snapshot directories
FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir);
if (snapshots == null) {
if (ArrayUtils.isEmpty(snapshotDirs)) {
// remove all the remembered snapshots because we don't have any left
if (LOG.isDebugEnabled() && this.snapshots.size() > 0) {
LOG.debug("No snapshots on-disk, cache empty");
LOG.debug("No snapshots on-disk, clear cache");
}
this.snapshots.clear();
return;
}

// 3.1 iterate through the on-disk snapshots
for (FileStatus snapshot : snapshots) {
String name = snapshot.getPath().getName();
// its not the tmp dir,
if (!name.equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME)) {
SnapshotDirectoryInfo files = this.snapshots.remove(name);
// 3.1.1 if we don't know about the snapshot or its been modified, we need to update the
// files the latter could occur where I create a snapshot, then delete it, and then make a
// new snapshot with the same name. We will need to update the cache the information from
// that new snapshot, even though it has the same name as the files referenced have
// probably changed.
if (files == null || files.hasBeenModified(snapshot.getModificationTime())) {
// get all files for the snapshot and create a new info
Collection<String> storedFiles = fileInspector.filesUnderSnapshot(snapshot.getPath());
files = new SnapshotDirectoryInfo(snapshot.getModificationTime(), storedFiles);
}
// 3.2 add all the files to cache
this.cache.addAll(files.getFiles());
known.put(name, files);
// iterate over all the cached snapshots and see if we need to update some, it is not an
// expensive operation if we do not reload the manifest of snapshots.
Map<String, SnapshotDirectoryInfo> newSnapshots = new HashMap<>();
for (FileStatus snapshotDir : snapshotDirs) {
String name = snapshotDir.getPath().getName();
SnapshotDirectoryInfo files = this.snapshots.remove(name);
// if we don't know about the snapshot or its been modified, we need to update the
// files the latter could occur where I create a snapshot, then delete it, and then make a
// new snapshot with the same name. We will need to update the cache the information from
// that new snapshot, even though it has the same name as the files referenced have
// probably changed.
if (files == null || files.hasBeenModified(snapshotDir.getModificationTime())) {
Collection<String> storedFiles = fileInspector.filesUnderSnapshot(snapshotDir.getPath());
files = new SnapshotDirectoryInfo(snapshotDir.getModificationTime(), storedFiles);
}
// add all the files to cache
this.cache.addAll(files.getFiles());
newSnapshots.put(name, files);
}

// 4. set the snapshots we are tracking
// set the snapshots we are tracking
this.snapshots.clear();
this.snapshots.putAll(known);
this.snapshots.putAll(newSnapshots);
}

/**
Expand All @@ -283,11 +262,17 @@ private synchronized void refreshCache() throws IOException {
public class RefreshCacheTask extends TimerTask {
@Override
public void run() {
try {
SnapshotFileCache.this.refreshCache();
} catch (IOException e) {
LOG.warn("Failed to refresh snapshot hfile cache!", e);
synchronized (SnapshotFileCache.this) {
try {
SnapshotFileCache.this.refreshCache();
} catch (IOException e) {
LOG.warn("Failed to refresh snapshot hfile cache!", e);
// clear all the cached entries if we meet an error
cache.clear();
snapshots.clear();
}
}

}
}

Expand Down