Skip to content

Commit ed90b6a

Browse files
committed
HBASE-28565 Make map reduce jobs accept connection uri when specifying peer cluster
1 parent 317ad3c commit ed90b6a

16 files changed

+742
-255
lines changed

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java

Lines changed: 142 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.hadoop.hbase.mapreduce;
1919

2020
import java.io.IOException;
21+
import java.net.URI;
22+
import java.net.URISyntaxException;
2123
import java.util.HashMap;
2224
import java.util.Map;
2325
import java.util.UUID;
@@ -63,6 +65,11 @@ public class CopyTable extends Configured implements Tool {
6365
String startRow = null;
6466
String stopRow = null;
6567
String dstTableName = null;
68+
URI peerUri = null;
69+
/**
70+
* @deprecated Since 3.0.0, will be removed in 4.0.0. Use {@link #peerUri} instead.
71+
*/
72+
@Deprecated
6673
String peerAddress = null;
6774
String families = null;
6875
boolean allCells = false;
@@ -89,7 +96,7 @@ private Path generateUniqTempDir(boolean withDirCreated) throws IOException {
8996
return newDir;
9097
}
9198

92-
private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException {
99+
private void initCopyTableMapperJob(Job job, Scan scan) throws IOException {
93100
Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class;
94101
if (readingSnapshot) {
95102
TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true,
@@ -166,7 +173,7 @@ public Job createSubmittableJob(String[] args) throws IOException {
166173
job.setNumReduceTasks(0);
167174

168175
if (bulkload) {
169-
initCopyTableMapperReducerJob(job, scan);
176+
initCopyTableMapperJob(job, scan);
170177

171178
// We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
172179
TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
@@ -180,8 +187,15 @@ public Job createSubmittableJob(String[] args) throws IOException {
180187
admin.getDescriptor((TableName.valueOf(dstTableName))));
181188
}
182189
} else {
183-
initCopyTableMapperReducerJob(job, scan);
184-
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress);
190+
initCopyTableMapperJob(job, scan);
191+
if (peerUri != null) {
192+
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerUri);
193+
} else if (peerAddress != null) {
194+
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress);
195+
} else {
196+
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job);
197+
}
198+
185199
}
186200

187201
return job;
@@ -195,7 +209,7 @@ private static void printUsage(final String errorMsg) {
195209
System.err.println("ERROR: " + errorMsg);
196210
}
197211
System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] "
198-
+ "[--new.name=NEW] [--peer.adr=ADR] <tablename | snapshotName>");
212+
+ "[--new.name=NEW] [--peer.uri=URI|--peer.adr=ADR] <tablename | snapshotName>");
199213
System.err.println();
200214
System.err.println("Options:");
201215
System.err.println(" rs.class hbase.regionserver.class of the peer cluster");
@@ -208,9 +222,12 @@ private static void printUsage(final String errorMsg) {
208222
System.err.println(" endtime end of the time range. Ignored if no starttime specified.");
209223
System.err.println(" versions number of cell versions to copy");
210224
System.err.println(" new.name new table's name");
225+
System.err.println(" peer.uri The URI of the peer cluster");
211226
System.err.println(" peer.adr Address of the peer cluster given in the format");
212227
System.err.println(" hbase.zookeeper.quorum:hbase.zookeeper.client"
213228
+ ".port:zookeeper.znode.parent");
229+
System.err.println(" Do not take effect if peer.uri is specified");
230+
System.err.println(" Deprecated, please use peer.uri instead");
214231
System.err.println(" families comma-separated list of families to copy");
215232
System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. ");
216233
System.err.println(" To keep the same name, just give \"cfName\"");
@@ -247,144 +264,149 @@ private boolean doCommandLine(final String[] args) {
247264
printUsage(null);
248265
return false;
249266
}
250-
try {
251-
for (int i = 0; i < args.length; i++) {
252-
String cmd = args[i];
253-
if (cmd.equals("-h") || cmd.startsWith("--h")) {
254-
printUsage(null);
255-
return false;
256-
}
257-
258-
final String startRowArgKey = "--startrow=";
259-
if (cmd.startsWith(startRowArgKey)) {
260-
startRow = cmd.substring(startRowArgKey.length());
261-
continue;
262-
}
263-
264-
final String stopRowArgKey = "--stoprow=";
265-
if (cmd.startsWith(stopRowArgKey)) {
266-
stopRow = cmd.substring(stopRowArgKey.length());
267-
continue;
268-
}
269-
270-
final String startTimeArgKey = "--starttime=";
271-
if (cmd.startsWith(startTimeArgKey)) {
272-
startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
273-
continue;
274-
}
275-
276-
final String endTimeArgKey = "--endtime=";
277-
if (cmd.startsWith(endTimeArgKey)) {
278-
endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
279-
continue;
280-
}
281-
282-
final String batchArgKey = "--batch=";
283-
if (cmd.startsWith(batchArgKey)) {
284-
batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
285-
continue;
286-
}
287-
288-
final String cacheRowArgKey = "--cacheRow=";
289-
if (cmd.startsWith(cacheRowArgKey)) {
290-
cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length()));
291-
continue;
292-
}
267+
for (int i = 0; i < args.length; i++) {
268+
String cmd = args[i];
269+
if (cmd.equals("-h") || cmd.startsWith("--h")) {
270+
printUsage(null);
271+
return false;
272+
}
293273

294-
final String versionsArgKey = "--versions=";
295-
if (cmd.startsWith(versionsArgKey)) {
296-
versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
297-
continue;
298-
}
274+
final String startRowArgKey = "--startrow=";
275+
if (cmd.startsWith(startRowArgKey)) {
276+
startRow = cmd.substring(startRowArgKey.length());
277+
continue;
278+
}
299279

300-
final String newNameArgKey = "--new.name=";
301-
if (cmd.startsWith(newNameArgKey)) {
302-
dstTableName = cmd.substring(newNameArgKey.length());
303-
continue;
304-
}
280+
final String stopRowArgKey = "--stoprow=";
281+
if (cmd.startsWith(stopRowArgKey)) {
282+
stopRow = cmd.substring(stopRowArgKey.length());
283+
continue;
284+
}
305285

306-
final String peerAdrArgKey = "--peer.adr=";
307-
if (cmd.startsWith(peerAdrArgKey)) {
308-
peerAddress = cmd.substring(peerAdrArgKey.length());
309-
continue;
310-
}
286+
final String startTimeArgKey = "--starttime=";
287+
if (cmd.startsWith(startTimeArgKey)) {
288+
startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
289+
continue;
290+
}
311291

312-
final String familiesArgKey = "--families=";
313-
if (cmd.startsWith(familiesArgKey)) {
314-
families = cmd.substring(familiesArgKey.length());
315-
continue;
316-
}
292+
final String endTimeArgKey = "--endtime=";
293+
if (cmd.startsWith(endTimeArgKey)) {
294+
endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
295+
continue;
296+
}
317297

318-
if (cmd.startsWith("--all.cells")) {
319-
allCells = true;
320-
continue;
321-
}
298+
final String batchArgKey = "--batch=";
299+
if (cmd.startsWith(batchArgKey)) {
300+
batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
301+
continue;
302+
}
322303

323-
if (cmd.startsWith("--bulkload")) {
324-
bulkload = true;
325-
continue;
326-
}
304+
final String cacheRowArgKey = "--cacheRow=";
305+
if (cmd.startsWith(cacheRowArgKey)) {
306+
cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length()));
307+
continue;
308+
}
327309

328-
if (cmd.startsWith("--shuffle")) {
329-
shuffle = true;
330-
continue;
331-
}
310+
final String versionsArgKey = "--versions=";
311+
if (cmd.startsWith(versionsArgKey)) {
312+
versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
313+
continue;
314+
}
332315

333-
if (cmd.startsWith("--snapshot")) {
334-
readingSnapshot = true;
335-
continue;
336-
}
316+
final String newNameArgKey = "--new.name=";
317+
if (cmd.startsWith(newNameArgKey)) {
318+
dstTableName = cmd.substring(newNameArgKey.length());
319+
continue;
320+
}
337321

338-
if (i == args.length - 1) {
339-
if (readingSnapshot) {
340-
snapshot = cmd;
341-
} else {
342-
tableName = cmd;
343-
}
344-
} else {
345-
printUsage("Invalid argument '" + cmd + "'");
322+
final String peerUriArgKey = "--peer.uri=";
323+
if (cmd.startsWith(peerUriArgKey)) {
324+
try {
325+
peerUri = new URI(cmd.substring(peerUriArgKey.length()));
326+
} catch (URISyntaxException e) {
327+
LOG.error("Malformed peer uri specified: {}", cmd, e);
346328
return false;
347329
}
330+
continue;
348331
}
349-
if (dstTableName == null && peerAddress == null) {
350-
printUsage("At least a new table name or a peer address must be specified");
351-
return false;
332+
333+
final String peerAdrArgKey = "--peer.adr=";
334+
if (cmd.startsWith(peerAdrArgKey)) {
335+
peerAddress = cmd.substring(peerAdrArgKey.length());
336+
continue;
352337
}
353-
if ((endTime != 0) && (startTime > endTime)) {
354-
printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime);
355-
return false;
338+
339+
final String familiesArgKey = "--families=";
340+
if (cmd.startsWith(familiesArgKey)) {
341+
families = cmd.substring(familiesArgKey.length());
342+
continue;
356343
}
357344

358-
if (bulkload && peerAddress != null) {
359-
printUsage("Remote bulkload is not supported!");
360-
return false;
345+
if (cmd.startsWith("--all.cells")) {
346+
allCells = true;
347+
continue;
361348
}
362349

363-
if (readingSnapshot && peerAddress != null) {
364-
printUsage("Loading data from snapshot to remote peer cluster is not supported.");
365-
return false;
350+
if (cmd.startsWith("--bulkload")) {
351+
bulkload = true;
352+
continue;
366353
}
367354

368-
if (readingSnapshot && dstTableName == null) {
369-
printUsage("The --new.name=<table> for destination table should be "
370-
+ "provided when copying data from snapshot .");
371-
return false;
355+
if (cmd.startsWith("--shuffle")) {
356+
shuffle = true;
357+
continue;
372358
}
373359

374-
if (readingSnapshot && snapshot == null) {
375-
printUsage("Snapshot shouldn't be null when --snapshot is enabled.");
376-
return false;
360+
if (cmd.startsWith("--snapshot")) {
361+
readingSnapshot = true;
362+
continue;
377363
}
378364

379-
// set dstTableName if necessary
380-
if (dstTableName == null) {
381-
dstTableName = tableName;
365+
if (i == args.length - 1) {
366+
if (readingSnapshot) {
367+
snapshot = cmd;
368+
} else {
369+
tableName = cmd;
370+
}
371+
} else {
372+
printUsage("Invalid argument '" + cmd + "'");
373+
return false;
382374
}
383-
} catch (Exception e) {
384-
LOG.error("Failed to parse commandLine arguments", e);
385-
printUsage("Can't start because " + e.getMessage());
375+
}
376+
if (dstTableName == null && peerAddress == null) {
377+
printUsage("At least a new table name or a peer address must be specified");
378+
return false;
379+
}
380+
if ((endTime != 0) && (startTime > endTime)) {
381+
printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime);
386382
return false;
387383
}
384+
385+
if (bulkload && (peerUri != null || peerAddress != null)) {
386+
printUsage("Remote bulkload is not supported!");
387+
return false;
388+
}
389+
390+
if (readingSnapshot && (peerUri != null || peerAddress != null)) {
391+
printUsage("Loading data from snapshot to remote peer cluster is not supported.");
392+
return false;
393+
}
394+
395+
if (readingSnapshot && dstTableName == null) {
396+
printUsage("The --new.name=<table> for destination table should be "
397+
+ "provided when copying data from snapshot .");
398+
return false;
399+
}
400+
401+
if (readingSnapshot && snapshot == null) {
402+
printUsage("Snapshot shouldn't be null when --snapshot is enabled.");
403+
return false;
404+
}
405+
406+
// set dstTableName if necessary
407+
if (dstTableName == null) {
408+
dstTableName = tableName;
409+
}
388410
return true;
389411
}
390412

@@ -401,7 +423,9 @@ public static void main(String[] args) throws Exception {
401423
@Override
402424
public int run(String[] args) throws Exception {
403425
Job job = createSubmittableJob(args);
404-
if (job == null) return 1;
426+
if (job == null) {
427+
return 1;
428+
}
405429
if (!job.waitForCompletion(true)) {
406430
LOG.info("Map-reduce job failed!");
407431
if (bulkload) {

0 commit comments

Comments
 (0)