Skip to content

Commit 334accf

Browse files
DieterDP-ngndimiduk
authored andcommitted
HBASE-28767 Simplify backup bulk-loading code (#6134)
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
1 parent 4170fba commit 334accf

File tree

6 files changed

+197
-216
lines changed

6 files changed

+197
-216
lines changed

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupHFileCleaner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.backup;
1919

2020
import java.io.IOException;
21+
import java.util.ArrayList;
2122
import java.util.Collections;
2223
import java.util.HashSet;
2324
import java.util.List;
@@ -103,7 +104,7 @@ public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
103104
}
104105

105106
try (BackupSystemTable tbl = new BackupSystemTable(connection)) {
106-
fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
107+
fullyBackedUpTables = new ArrayList<>(tbl.getTablesIncludedInBackups());
107108
} catch (IOException ioe) {
108109
LOG.error("Failed to get tables which have been fully backed up, skipping checking", ioe);
109110
return Collections.emptyList();

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
package org.apache.hadoop.hbase.backup;
1919

2020
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.Collections;
2123
import java.util.List;
2224
import java.util.Map;
2325
import java.util.Optional;
26+
import java.util.Set;
2427
import org.apache.hadoop.conf.Configuration;
2528
import org.apache.hadoop.fs.Path;
2629
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -64,21 +67,8 @@ public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
6467
LOG.debug("skipping recording bulk load in postBulkLoadHFile since backup is disabled");
6568
return;
6669
}
67-
try (Connection connection = ConnectionFactory.createConnection(cfg);
68-
BackupSystemTable tbl = new BackupSystemTable(connection)) {
69-
List<TableName> fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
70-
RegionInfo info = ctx.getEnvironment().getRegionInfo();
71-
TableName tableName = info.getTable();
72-
if (!fullyBackedUpTables.contains(tableName)) {
73-
if (LOG.isTraceEnabled()) {
74-
LOG.trace(tableName + " has not gone thru full backup");
75-
}
76-
return;
77-
}
78-
tbl.writePathsPostBulkLoad(tableName, info.getEncodedNameAsBytes(), finalPaths);
79-
} catch (IOException ioe) {
80-
LOG.error("Failed to get tables which have been fully backed up", ioe);
81-
}
70+
71+
registerBulkLoad(ctx, finalPaths);
8272
}
8373

8474
@Override
@@ -89,19 +79,31 @@ public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironmen
8979
LOG.debug("skipping recording bulk load in preCommitStoreFile since backup is disabled");
9080
return;
9181
}
82+
83+
List<Path> hfiles = new ArrayList<>(pairs.size());
84+
for (Pair<Path, Path> pair : pairs) {
85+
hfiles.add(pair.getSecond());
86+
}
87+
registerBulkLoad(ctx, Collections.singletonMap(family, hfiles));
88+
}
89+
90+
private void registerBulkLoad(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
91+
Map<byte[], List<Path>> cfToHFilePaths) throws IOException {
92+
Configuration cfg = ctx.getEnvironment().getConfiguration();
93+
RegionInfo region = ctx.getEnvironment().getRegionInfo();
94+
TableName tableName = region.getTable();
95+
9296
try (Connection connection = ConnectionFactory.createConnection(cfg);
9397
BackupSystemTable tbl = new BackupSystemTable(connection)) {
94-
List<TableName> fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
95-
RegionInfo info = ctx.getEnvironment().getRegionInfo();
96-
TableName tableName = info.getTable();
97-
if (!fullyBackedUpTables.contains(tableName)) {
98+
Set<TableName> fullyBackedUpTables = tbl.getTablesIncludedInBackups();
99+
100+
if (fullyBackedUpTables.contains(tableName)) {
101+
tbl.registerBulkLoad(tableName, region.getEncodedNameAsBytes(), cfToHFilePaths);
102+
} else {
98103
if (LOG.isTraceEnabled()) {
99-
LOG.trace(tableName + " has not gone thru full backup");
104+
LOG.trace("Table {} has not gone through full backup - skipping.", tableName);
100105
}
101-
return;
102106
}
103-
tbl.writeFilesForBulkLoadPreCommit(tableName, info.getEncodedNameAsBytes(), family, pairs);
104-
return;
105107
}
106108
}
107109
}

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
4444
import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
4545
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
46-
import org.apache.hadoop.hbase.util.Pair;
4746
import org.apache.yetus.audience.InterfaceAudience;
4847
import org.slf4j.Logger;
4948
import org.slf4j.LoggerFactory;
@@ -355,8 +354,7 @@ public HashMap<String, Long> readRegionServerLastLogRollResult() throws IOExcept
355354
return systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir());
356355
}
357356

358-
public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
359-
readBulkloadRows(List<TableName> tableList) throws IOException {
357+
public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException {
360358
return systemTable.readBulkloadRows(tableList);
361359
}
362360

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java

Lines changed: 36 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@
7070
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
7171
import org.apache.hadoop.hbase.util.Bytes;
7272
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
73-
import org.apache.hadoop.hbase.util.Pair;
7473
import org.apache.yetus.audience.InterfaceAudience;
7574
import org.slf4j.Logger;
7675
import org.slf4j.LoggerFactory;
@@ -179,10 +178,6 @@ public String toString() {
179178
final static byte[] TBL_COL = Bytes.toBytes("tbl");
180179
final static byte[] FAM_COL = Bytes.toBytes("fam");
181180
final static byte[] PATH_COL = Bytes.toBytes("path");
182-
final static byte[] STATE_COL = Bytes.toBytes("state");
183-
// the two states a bulk loaded file can be
184-
final static byte[] BL_PREPARE = Bytes.toBytes("R");
185-
final static byte[] BL_COMMIT = Bytes.toBytes("D");
186181

187182
private final static String SET_KEY_PREFIX = "backupset:";
188183

@@ -378,7 +373,7 @@ public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<Table
378373
}
379374
files.add(new Path(path));
380375
if (LOG.isDebugEnabled()) {
381-
LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path);
376+
LOG.debug("found bulk loaded file : {} {} {}", tbl, Bytes.toString(fam), path);
382377
}
383378
}
384379

@@ -401,43 +396,22 @@ public void deleteBackupInfo(String backupId) throws IOException {
401396
}
402397
}
403398

404-
/*
405-
* For postBulkLoadHFile() hook.
406-
* @param tabName table name
407-
* @param region the region receiving hfile
408-
* @param finalPaths family and associated hfiles
399+
/**
400+
* Registers a bulk load.
401+
* @param tableName table name
402+
* @param region the region receiving hfile
403+
* @param cfToHfilePath column family and associated hfiles
409404
*/
410-
public void writePathsPostBulkLoad(TableName tabName, byte[] region,
411-
Map<byte[], List<Path>> finalPaths) throws IOException {
405+
public void registerBulkLoad(TableName tableName, byte[] region,
406+
Map<byte[], List<Path>> cfToHfilePath) throws IOException {
412407
if (LOG.isDebugEnabled()) {
413-
LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size()
414-
+ " entries");
408+
LOG.debug("Writing bulk load descriptor to backup {} with {} entries", tableName,
409+
cfToHfilePath.size());
415410
}
416411
try (BufferedMutator bufferedMutator = connection.getBufferedMutator(bulkLoadTableName)) {
417-
List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
412+
List<Put> puts = BackupSystemTable.createPutForBulkLoad(tableName, region, cfToHfilePath);
418413
bufferedMutator.mutate(puts);
419-
LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
420-
}
421-
}
422-
423-
/*
424-
* For preCommitStoreFile() hook
425-
* @param tabName table name
426-
* @param region the region receiving hfile
427-
* @param family column family
428-
* @param pairs list of paths for hfiles
429-
*/
430-
public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family,
431-
final List<Pair<Path, Path>> pairs) throws IOException {
432-
if (LOG.isDebugEnabled()) {
433-
LOG.debug(
434-
"write bulk load descriptor to backup " + tabName + " with " + pairs.size() + " entries");
435-
}
436-
try (Table table = connection.getTable(bulkLoadTableName)) {
437-
List<Put> puts =
438-
BackupSystemTable.createPutForPreparedBulkload(tabName, region, family, pairs);
439-
table.put(puts);
440-
LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
414+
LOG.debug("Written {} rows for bulk load of {}", puts.size(), tableName);
441415
}
442416
}
443417

@@ -459,33 +433,25 @@ public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
459433
}
460434
}
461435

462-
/*
436+
/**
463437
* Reads the rows from backup table recording bulk loaded hfiles
464438
* @param tableList list of table names
465-
* @return The keys of the Map are table, region and column family. Value of the map reflects
466-
* whether the hfile was recorded by preCommitStoreFile hook (true)
467-
*/
468-
public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
469-
readBulkloadRows(List<TableName> tableList) throws IOException {
470-
471-
Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>();
472-
List<byte[]> rows = new ArrayList<>();
473-
for (TableName tTable : tableList) {
474-
Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
475-
Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable);
476-
try (Table table = connection.getTable(bulkLoadTableName);
477-
ResultScanner scanner = table.getScanner(scan)) {
478-
Result res = null;
439+
*/
440+
public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws IOException {
441+
List<BulkLoad> result = new ArrayList<>();
442+
for (TableName table : tableList) {
443+
Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table);
444+
try (Table bulkLoadTable = connection.getTable(bulkLoadTableName);
445+
ResultScanner scanner = bulkLoadTable.getScanner(scan)) {
446+
Result res;
479447
while ((res = scanner.next()) != null) {
480448
res.advance();
481449
String fam = null;
482450
String path = null;
483-
boolean raw = false;
484-
byte[] row;
485451
String region = null;
452+
byte[] row = null;
486453
for (Cell cell : res.listCells()) {
487454
row = CellUtil.cloneRow(cell);
488-
rows.add(row);
489455
String rowStr = Bytes.toString(row);
490456
region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
491457
if (
@@ -498,35 +464,14 @@ public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
498464
BackupSystemTable.PATH_COL.length) == 0
499465
) {
500466
path = Bytes.toString(CellUtil.cloneValue(cell));
501-
} else if (
502-
CellUtil.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0,
503-
BackupSystemTable.STATE_COL.length) == 0
504-
) {
505-
byte[] state = CellUtil.cloneValue(cell);
506-
if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) {
507-
raw = true;
508-
} else {
509-
raw = false;
510-
}
511467
}
512468
}
513-
if (map.get(tTable) == null) {
514-
map.put(tTable, new HashMap<>());
515-
tblMap = map.get(tTable);
516-
}
517-
if (tblMap.get(region) == null) {
518-
tblMap.put(region, new HashMap<>());
519-
}
520-
Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region);
521-
if (famMap.get(fam) == null) {
522-
famMap.put(fam, new ArrayList<>());
523-
}
524-
famMap.get(fam).add(new Pair<>(path, raw));
469+
result.add(new BulkLoad(table, region, fam, path, row));
525470
LOG.debug("found orig " + path + " for " + fam + " of table " + region);
526471
}
527472
}
528473
}
529-
return new Pair<>(map, rows);
474+
return result;
530475
}
531476

532477
/*
@@ -793,20 +738,19 @@ public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) th
793738
return result;
794739
}
795740

796-
/*
797-
* Retrieve TableName's for completed backup of given type
798-
* @param type backup type
799-
* @return List of table names
741+
/**
742+
* Retrieve all table names that are part of any known backup
800743
*/
801-
public List<TableName> getTablesForBackupType(BackupType type) throws IOException {
744+
public Set<TableName> getTablesIncludedInBackups() throws IOException {
802745
Set<TableName> names = new HashSet<>();
803746
List<BackupInfo> infos = getBackupHistory(true);
804747
for (BackupInfo info : infos) {
805-
if (info.getType() == type) {
748+
// Incremental backups have the same tables as the preceding full backups
749+
if (info.getType() == BackupType.FULL) {
806750
names.addAll(info.getTableNames());
807751
}
808752
}
809-
return new ArrayList<>(names);
753+
return names;
810754
}
811755

812756
/**
@@ -1500,13 +1444,13 @@ private String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
15001444
return s.substring(index + 1);
15011445
}
15021446

1503-
/*
1504-
* Creates Put's for bulk load resulting from running LoadIncrementalHFiles
1447+
/**
1448+
* Creates Put's for bulk loads.
15051449
*/
1506-
static List<Put> createPutForCommittedBulkload(TableName table, byte[] region,
1507-
Map<byte[], List<Path>> finalPaths) {
1450+
private static List<Put> createPutForBulkLoad(TableName table, byte[] region,
1451+
Map<byte[], List<Path>> columnFamilyToHFilePaths) {
15081452
List<Put> puts = new ArrayList<>();
1509-
for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) {
1453+
for (Map.Entry<byte[], List<Path>> entry : columnFamilyToHFilePaths.entrySet()) {
15101454
for (Path path : entry.getValue()) {
15111455
String file = path.toString();
15121456
int lastSlash = file.lastIndexOf("/");
@@ -1516,10 +1460,8 @@ static List<Put> createPutForCommittedBulkload(TableName table, byte[] region,
15161460
put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
15171461
put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
15181462
put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file));
1519-
put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT);
15201463
puts.add(put);
1521-
LOG
1522-
.debug("writing done bulk path " + file + " for " + table + " " + Bytes.toString(region));
1464+
LOG.debug("Done writing bulk path {} for {} {}", file, table, Bytes.toString(region));
15231465
}
15241466
}
15251467
return puts;
@@ -1580,29 +1522,6 @@ public static void deleteSnapshot(Connection conn) throws IOException {
15801522
}
15811523
}
15821524

1583-
/*
1584-
* Creates Put's for bulk load resulting from running LoadIncrementalHFiles
1585-
*/
1586-
static List<Put> createPutForPreparedBulkload(TableName table, byte[] region, final byte[] family,
1587-
final List<Pair<Path, Path>> pairs) {
1588-
List<Put> puts = new ArrayList<>(pairs.size());
1589-
for (Pair<Path, Path> pair : pairs) {
1590-
Path path = pair.getSecond();
1591-
String file = path.toString();
1592-
int lastSlash = file.lastIndexOf("/");
1593-
String filename = file.substring(lastSlash + 1);
1594-
Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
1595-
Bytes.toString(region), BLK_LD_DELIM, filename));
1596-
put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
1597-
put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
1598-
put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, Bytes.toBytes(file));
1599-
put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE);
1600-
puts.add(put);
1601-
LOG.debug("writing raw bulk path " + file + " for " + table + " " + Bytes.toString(region));
1602-
}
1603-
return puts;
1604-
}
1605-
16061525
public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
16071526
List<Delete> lstDels = new ArrayList<>(lst.size());
16081527
for (TableName table : lst) {

0 commit comments

Comments
 (0)