|
62 | 62 | import org.apache.hadoop.hbase.client.Put;
|
63 | 63 | import org.apache.hadoop.hbase.client.Result;
|
64 | 64 | import org.apache.hadoop.hbase.client.ResultScanner;
|
| 65 | +import org.apache.hadoop.hbase.client.Row; |
65 | 66 | import org.apache.hadoop.hbase.client.Scan;
|
66 | 67 | import org.apache.hadoop.hbase.client.SnapshotDescription;
|
67 | 68 | import org.apache.hadoop.hbase.client.Table;
|
|
76 | 77 |
|
77 | 78 | import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
|
78 | 79 | import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
|
| 80 | +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; |
79 | 81 |
|
80 | 82 | import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
|
81 | 83 | import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
|
101 | 103 | public final class BackupSystemTable implements Closeable {
|
102 | 104 |
|
103 | 105 | private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class);
|
| 106 | + private static final int BATCH_SIZE = 1000; |
104 | 107 |
|
105 | 108 | static class WALItem {
|
106 | 109 | String backupId;
|
@@ -414,7 +417,7 @@ public void writePathsPostBulkLoad(TableName tabName, byte[] region,
|
414 | 417 | }
|
415 | 418 | try (Table table = connection.getTable(bulkLoadTableName)) {
|
416 | 419 | List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
|
417 |
| - table.put(puts); |
| 420 | + executePartitionedBatches(table, puts); |
418 | 421 | LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
|
419 | 422 | }
|
420 | 423 | }
|
@@ -453,7 +456,7 @@ public void deleteBulkLoadedRows(List<byte[]> rows) throws IOException {
|
453 | 456 | lstDels.add(del);
|
454 | 457 | LOG.debug("orig deleting the row: " + Bytes.toString(row));
|
455 | 458 | }
|
456 |
| - table.delete(lstDels); |
| 459 | + executePartitionedBatches(table, lstDels); |
457 | 460 | LOG.debug("deleted " + rows.size() + " original bulkload rows");
|
458 | 461 | }
|
459 | 462 | }
|
@@ -558,7 +561,7 @@ public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Pa
|
558 | 561 | }
|
559 | 562 | }
|
560 | 563 | if (!puts.isEmpty()) {
|
561 |
| - table.put(puts); |
| 564 | + executePartitionedBatches(table, puts); |
562 | 565 | }
|
563 | 566 | }
|
564 | 567 | }
|
@@ -918,7 +921,7 @@ public void writeRegionServerLogTimestamp(Set<TableName> tables, Map<String, Lon
|
918 | 921 | puts.add(put);
|
919 | 922 | }
|
920 | 923 | try (Table table = connection.getTable(tableName)) {
|
921 |
| - table.put(puts); |
| 924 | + executePartitionedBatches(table, puts); |
922 | 925 | }
|
923 | 926 | }
|
924 | 927 |
|
@@ -1902,4 +1905,19 @@ private static void ensureTableEnabled(Admin admin, TableName tableName) throws
|
1902 | 1905 | }
|
1903 | 1906 | }
|
1904 | 1907 | }
|
| 1908 | + |
| 1909 | + /** |
| 1910 | + * Executes the given operations in partitioned batches of size {@link #BATCH_SIZE} |
| 1911 | + */ |
| 1912 | + private static void executePartitionedBatches(Table table, List<? extends Row> operations) |
| 1913 | + throws IOException { |
| 1914 | + List<? extends List<? extends Row>> operationBatches = Lists.partition(operations, BATCH_SIZE); |
| 1915 | + for (List<? extends Row> batch : operationBatches) { |
| 1916 | + try { |
| 1917 | + table.batch(batch, new Object[batch.size()]); |
| 1918 | + } catch (InterruptedException e) { |
| 1919 | + throw new RuntimeException(e); |
| 1920 | + } |
| 1921 | + } |
| 1922 | + } |
1905 | 1923 | }
|
0 commit comments