Skip to content

Commit d5510bd

Browse files
committed
HBASE-29238 ExportSnapshot support specify storage policy (#6874)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org> (cherry-picked from commit f6fdbbc)
1 parent 3bffb87 commit d5510bd

File tree

1 file changed

+42
-4
lines changed

1 file changed

+42
-4
lines changed

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@
2626
import java.util.ArrayList;
2727
import java.util.Collections;
2828
import java.util.Comparator;
29+
import java.util.HashMap;
2930
import java.util.HashSet;
3031
import java.util.LinkedList;
3132
import java.util.List;
33+
import java.util.Map;
3234
import java.util.Set;
3335
import java.util.concurrent.ExecutionException;
3436
import java.util.concurrent.ExecutorService;
@@ -122,6 +124,7 @@ public class ExportSnapshot extends AbstractHBaseTool implements Tool {
122124
"snapshot.export.copy.references.threads";
123125
private static final int DEFAULT_COPY_MANIFEST_THREADS =
124126
Runtime.getRuntime().availableProcessors();
127+
private static final String CONF_STORAGE_POLICY = "snapshot.export.storage.policy.family";
125128

126129
static class Testing {
127130
static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
@@ -159,6 +162,8 @@ static final class Options {
159162
new Option(null, "bandwidth", true, "Limit bandwidth to this value in MB/second.");
160163
static final Option RESET_TTL =
161164
new Option(null, "reset-ttl", false, "Do not copy TTL for the snapshot");
165+
static final Option STORAGE_POLICY = new Option(null, "storage-policy", true,
166+
"Storage policy for export snapshot output directory, with format like: f=HOT&g=ALL_SDD");
162167
}
163168

164169
// Export Map-Reduce Counters, to keep track of the progress
@@ -328,6 +333,13 @@ private void copyFile(final Context context, final SnapshotFileInfo inputInfo,
328333

329334
// Ensure that the output folder is there and copy the file
330335
createOutputPath(outputPath.getParent());
336+
String family = new Path(inputInfo.getHfile()).getParent().getName();
337+
String familyStoragePolicy = generateFamilyStoragePolicyKey(family);
338+
if (stringIsNotEmpty(context.getConfiguration().get(familyStoragePolicy))) {
339+
String key = context.getConfiguration().get(familyStoragePolicy);
340+
LOG.info("Setting storage policy {} for {}", key, outputPath.getParent());
341+
outputFs.setStoragePolicy(outputPath.getParent(), key);
342+
}
331343
FSDataOutputStream out = outputFs.create(outputPath, true);
332344

333345
long stime = EnvironmentEdgeManager.currentTime();
@@ -426,7 +438,7 @@ private boolean preserveAttributes(final Path path, final FileStatus refStat) {
426438
}
427439

428440
private boolean stringIsNotEmpty(final String str) {
429-
return str != null && str.length() > 0;
441+
return str != null && !str.isEmpty();
430442
}
431443

432444
private long copyData(final Context context, final Path inputPath, final InputStream in,
@@ -907,8 +919,8 @@ public boolean nextKeyValue() {
907919
*/
908920
private void runCopyJob(final Path inputRoot, final Path outputRoot, final String snapshotName,
909921
final Path snapshotDir, final boolean verifyChecksum, final String filesUser,
910-
final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB)
911-
throws IOException, InterruptedException, ClassNotFoundException {
922+
final String filesGroup, final int filesMode, final int mappers, final int bandwidthMB,
923+
final String storagePolicy) throws IOException, InterruptedException, ClassNotFoundException {
912924
Configuration conf = getConf();
913925
if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
914926
if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
@@ -923,6 +935,11 @@ private void runCopyJob(final Path inputRoot, final Path outputRoot, final Strin
923935
conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
924936
conf.set(CONF_SNAPSHOT_NAME, snapshotName);
925937
conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString());
938+
if (storagePolicy != null) {
939+
for (Map.Entry<String, String> entry : storagePolicyPerFamily(storagePolicy).entrySet()) {
940+
conf.set(generateFamilyStoragePolicyKey(entry.getKey()), entry.getValue());
941+
}
942+
}
926943

927944
String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName);
928945
Job job = new Job(conf);
@@ -1009,6 +1026,23 @@ private void setPermissionParallel(final FileSystem outputFs, final short filesM
10091026
}, conf);
10101027
}
10111028

1029+
private Map<String, String> storagePolicyPerFamily(String storagePolicy) {
1030+
Map<String, String> familyStoragePolicy = new HashMap<>();
1031+
for (String familyConf : storagePolicy.split("&")) {
1032+
String[] familySplit = familyConf.split("=");
1033+
if (familySplit.length != 2) {
1034+
continue;
1035+
}
1036+
// family is key, storage policy is value
1037+
familyStoragePolicy.put(familySplit[0], familySplit[1]);
1038+
}
1039+
return familyStoragePolicy;
1040+
}
1041+
1042+
private static String generateFamilyStoragePolicyKey(String family) {
1043+
return CONF_STORAGE_POLICY + "." + family;
1044+
}
1045+
10121046
private boolean verifyTarget = true;
10131047
private boolean verifySource = true;
10141048
private boolean verifyChecksum = true;
@@ -1023,6 +1057,7 @@ private void setPermissionParallel(final FileSystem outputFs, final short filesM
10231057
private int filesMode = 0;
10241058
private int mappers = 0;
10251059
private boolean resetTtl = false;
1060+
private String storagePolicy = null;
10261061

10271062
@Override
10281063
protected void processOptions(CommandLine cmd) {
@@ -1045,6 +1080,9 @@ protected void processOptions(CommandLine cmd) {
10451080
verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt());
10461081
verifySource = !cmd.hasOption(Options.NO_SOURCE_VERIFY.getLongOpt());
10471082
resetTtl = cmd.hasOption(Options.RESET_TTL.getLongOpt());
1083+
if (cmd.hasOption(Options.STORAGE_POLICY.getLongOpt())) {
1084+
storagePolicy = cmd.getOptionValue(Options.STORAGE_POLICY.getLongOpt());
1085+
}
10481086
}
10491087

10501088
/**
@@ -1213,7 +1251,7 @@ public int doWork() throws IOException {
12131251
// by the HFileArchiver, since they have no references.
12141252
try {
12151253
runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, filesUser,
1216-
filesGroup, filesMode, mappers, bandwidthMB);
1254+
filesGroup, filesMode, mappers, bandwidthMB, storagePolicy);
12171255

12181256
LOG.info("Finalize the Snapshot Export");
12191257
if (!skipTmp) {

0 commit comments

Comments
 (0)