Skip to content

Commit 510b38d

Browse files
committed
HBASE-28565 Make map reduce jobs accept connection uri when specifying peer cluster
1 parent 092ce0d commit 510b38d

16 files changed

+621
-140
lines changed

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.mapreduce;
1919

2020
import java.io.IOException;
21+
import java.net.URI;
2122
import java.util.HashMap;
2223
import java.util.Map;
2324
import java.util.UUID;
@@ -63,6 +64,11 @@ public class CopyTable extends Configured implements Tool {
6364
String startRow = null;
6465
String stopRow = null;
6566
String dstTableName = null;
67+
URI peerUri = null;
68+
/**
69+
* @deprecated Since 3.0.0, will be removed in 4.0.0. Use {@link #peerUri} instead.
70+
*/
71+
@Deprecated
6672
String peerAddress = null;
6773
String families = null;
6874
boolean allCells = false;
@@ -89,7 +95,7 @@ private Path generateUniqTempDir(boolean withDirCreated) throws IOException {
8995
return newDir;
9096
}
9197

92-
private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException {
98+
private void initCopyTableMapperJob(Job job, Scan scan) throws IOException {
9399
Class<? extends TableMapper> mapper = bulkload ? CellImporter.class : Importer.class;
94100
if (readingSnapshot) {
95101
TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true,
@@ -166,7 +172,7 @@ public Job createSubmittableJob(String[] args) throws IOException {
166172
job.setNumReduceTasks(0);
167173

168174
if (bulkload) {
169-
initCopyTableMapperReducerJob(job, scan);
175+
initCopyTableMapperJob(job, scan);
170176

171177
// We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
172178
TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
@@ -180,8 +186,15 @@ public Job createSubmittableJob(String[] args) throws IOException {
180186
admin.getDescriptor((TableName.valueOf(dstTableName))));
181187
}
182188
} else {
183-
initCopyTableMapperReducerJob(job, scan);
184-
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress);
189+
initCopyTableMapperJob(job, scan);
190+
if (peerUri != null) {
191+
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerUri);
192+
} else if (peerAddress != null) {
193+
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress);
194+
} else {
195+
TableMapReduceUtil.initTableReducerJob(dstTableName, null, job);
196+
}
197+
185198
}
186199

187200
return job;
@@ -195,7 +208,7 @@ private static void printUsage(final String errorMsg) {
195208
System.err.println("ERROR: " + errorMsg);
196209
}
197210
System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] "
198-
+ "[--new.name=NEW] [--peer.adr=ADR] <tablename | snapshotName>");
211+
+ "[--new.name=NEW] [--peer.uri=URI|--peer.adr=ADR] <tablename | snapshotName>");
199212
System.err.println();
200213
System.err.println("Options:");
201214
System.err.println(" rs.class hbase.regionserver.class of the peer cluster");
@@ -208,9 +221,12 @@ private static void printUsage(final String errorMsg) {
208221
System.err.println(" endtime end of the time range. Ignored if no starttime specified.");
209222
System.err.println(" versions number of cell versions to copy");
210223
System.err.println(" new.name new table's name");
224+
System.err.println(" peer.uri The URI of the peer cluster");
211225
System.err.println(" peer.adr Address of the peer cluster given in the format");
212226
System.err.println(" hbase.zookeeper.quorum:hbase.zookeeper.client"
213227
+ ".port:zookeeper.znode.parent");
228+
System.err.println(" Do not take effect if peer.uri is specified");
229+
System.err.println(" Deprecated, please use peer.uri instead");
214230
System.err.println(" families comma-separated list of families to copy");
215231
System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. ");
216232
System.err.println(" To keep the same name, just give \"cfName\"");
@@ -303,6 +319,12 @@ private boolean doCommandLine(final String[] args) {
303319
continue;
304320
}
305321

322+
final String peerUriArgKey = "--peer.uri=";
323+
if (cmd.startsWith(peerUriArgKey)) {
324+
peerUri = new URI(cmd.substring(peerUriArgKey.length()));
325+
continue;
326+
}
327+
306328
final String peerAdrArgKey = "--peer.adr=";
307329
if (cmd.startsWith(peerAdrArgKey)) {
308330
peerAddress = cmd.substring(peerAdrArgKey.length());
@@ -355,12 +377,12 @@ private boolean doCommandLine(final String[] args) {
355377
return false;
356378
}
357379

358-
if (bulkload && peerAddress != null) {
380+
if (bulkload && (peerUri != null || peerAddress != null)) {
359381
printUsage("Remote bulkload is not supported!");
360382
return false;
361383
}
362384

363-
if (readingSnapshot && peerAddress != null) {
385+
if (readingSnapshot && (peerUri != null || peerAddress != null)) {
364386
printUsage("Loading data from snapshot to remote peer cluster is not supported.");
365387
return false;
366388
}

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java

Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
package org.apache.hadoop.hbase.mapreduce;
1919

2020
import java.io.IOException;
21+
import java.net.URI;
22+
import java.net.URISyntaxException;
2123
import java.util.Collections;
2224
import java.util.Iterator;
25+
import org.apache.commons.lang3.StringUtils;
2326
import org.apache.hadoop.conf.Configuration;
2427
import org.apache.hadoop.conf.Configured;
2528
import org.apache.hadoop.fs.FileStatus;
@@ -65,7 +68,17 @@ public class SyncTable extends Configured implements Tool {
6568
static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir";
6669
static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name";
6770
static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
71+
static final String SOURCE_URI_CONF_KEY = "sync.table.source.uri";
72+
/**
73+
* @deprecated Since 3.0.0, will be removed in 4.0.0 Use {@link #SOURCE_URI_CONF_KEY} instead.
74+
*/
75+
@Deprecated
6876
static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
77+
static final String TARGET_URI_CONF_KEY = "sync.table.target.uri";
78+
/**
79+
* @deprecated Since 3.0.0, will be removed in 4.0.0 Use {@link #TARGET_URI_CONF_KEY} instead.
80+
*/
81+
@Deprecated
6982
static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
7083
static final String DRY_RUN_CONF_KEY = "sync.table.dry.run";
7184
static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes";
@@ -76,7 +89,17 @@ public class SyncTable extends Configured implements Tool {
7689
String sourceTableName;
7790
String targetTableName;
7891

92+
URI sourceUri;
93+
/**
94+
* @deprecated Since 3.0.0, will be removed in 4.0.0 Use {@link #sourceUri} instead.
95+
*/
96+
@Deprecated
7997
String sourceZkCluster;
98+
URI targetUri;
99+
/**
100+
* @deprecated Since 3.0.0, will be removed in 4.0.0 Use {@link #targetUri} instead.
101+
*/
102+
@Deprecated
80103
String targetZkCluster;
81104
boolean dryRun;
82105
boolean doDeletes = true;
@@ -89,9 +112,9 @@ public SyncTable(Configuration conf) {
89112
super(conf);
90113
}
91114

92-
private void initCredentialsForHBase(String zookeeper, Job job) throws IOException {
115+
private void initCredentialsForHBase(String clusterKey, Job job) throws IOException {
93116
Configuration peerConf =
94-
HBaseConfiguration.createClusterConf(job.getConfiguration(), zookeeper);
117+
HBaseConfiguration.createClusterConf(job.getConfiguration(), clusterKey);
95118
TableMapReduceUtil.initCredentialsForCluster(job, peerConf);
96119
}
97120

@@ -142,11 +165,17 @@ public Job createSubmittableJob(String[] args) throws IOException {
142165
jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString());
143166
jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName);
144167
jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName);
145-
if (sourceZkCluster != null) {
168+
if (sourceUri != null) {
169+
jobConf.set(SOURCE_URI_CONF_KEY, sourceUri.toString());
170+
TableMapReduceUtil.initCredentialsForCluster(job, jobConf, sourceUri);
171+
} else if (sourceZkCluster != null) {
146172
jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster);
147173
initCredentialsForHBase(sourceZkCluster, job);
148174
}
149-
if (targetZkCluster != null) {
175+
if (targetUri != null) {
176+
jobConf.set(TARGET_URI_CONF_KEY, targetUri.toString());
177+
TableMapReduceUtil.initCredentialsForCluster(job, jobConf, targetUri);
178+
} else if (targetZkCluster != null) {
150179
jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster);
151180
initCredentialsForHBase(targetZkCluster, job);
152181
}
@@ -165,8 +194,11 @@ public Job createSubmittableJob(String[] args) throws IOException {
165194
} else {
166195
// No reducers. Just write straight to table. Call initTableReducerJob
167196
// because it sets up the TableOutputFormat.
168-
TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, targetZkCluster);
169-
197+
if (targetUri != null) {
198+
TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, targetUri);
199+
} else {
200+
TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null, targetZkCluster);
201+
}
170202
// would be nice to add an option for bulk load instead
171203
}
172204

@@ -214,9 +246,10 @@ public static enum Counter {
214246
protected void setup(Context context) throws IOException {
215247
Configuration conf = context.getConfiguration();
216248
sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
217-
sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null);
218-
targetConnection =
219-
openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY, TableOutputFormat.OUTPUT_CONF_PREFIX);
249+
sourceConnection =
250+
openConnection(conf, SOURCE_URI_CONF_KEY, SOURCE_ZK_CLUSTER_CONF_KEY, null);
251+
targetConnection = openConnection(conf, TARGET_URI_CONF_KEY, TARGET_ZK_CLUSTER_CONF_KEY,
252+
TableOutputFormat.OUTPUT_CONF_PREFIX);
220253
sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
221254
targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
222255
dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
@@ -241,12 +274,21 @@ protected void setup(Context context) throws IOException {
241274
targetHasher.ignoreTimestamps = ignoreTimestamp;
242275
}
243276

244-
private static Connection openConnection(Configuration conf, String zkClusterConfKey,
245-
String configPrefix) throws IOException {
246-
String zkCluster = conf.get(zkClusterConfKey);
247-
Configuration clusterConf =
248-
HBaseConfiguration.createClusterConf(conf, zkCluster, configPrefix);
249-
return ConnectionFactory.createConnection(clusterConf);
277+
private static Connection openConnection(Configuration conf, String uriConfKey,
278+
String zkClusterConfKey, String configPrefix) throws IOException {
279+
String uri = conf.get(uriConfKey);
280+
if (!StringUtils.isBlank(uri)) {
281+
try {
282+
return ConnectionFactory.createConnection(new URI(uri), conf);
283+
} catch (URISyntaxException e) {
284+
throw new IOException(e);
285+
}
286+
} else {
287+
String zkCluster = conf.get(zkClusterConfKey);
288+
Configuration clusterConf =
289+
HBaseConfiguration.createClusterConf(conf, zkCluster, configPrefix);
290+
return ConnectionFactory.createConnection(clusterConf);
291+
}
250292
}
251293

252294
private static Table openTable(Connection connection, Configuration conf,
@@ -747,10 +789,18 @@ private static void printUsage(final String errorMsg) {
747789
System.err.println();
748790
System.err.println("Options:");
749791

792+
System.err.println(" sourceuri Cluster connection uri of the source table");
793+
System.err.println(" (defaults to cluster in classpath's config)");
750794
System.err.println(" sourcezkcluster ZK cluster key of the source table");
751795
System.err.println(" (defaults to cluster in classpath's config)");
796+
System.err.println(" Do not take effect if sourceuri is specified");
797+
System.err.println(" Deprecated, please use sourceuri instead");
798+
System.err.println(" targeturi Cluster connection uri of the target table");
799+
System.err.println(" (defaults to cluster in classpath's config)");
752800
System.err.println(" targetzkcluster ZK cluster key of the target table");
753801
System.err.println(" (defaults to cluster in classpath's config)");
802+
System.err.println(" Do not take effect if targeturi is specified");
803+
System.err.println(" Deprecated, please use targeturi instead");
754804
System.err.println(" dryrun if true, output counters but no writes");
755805
System.err.println(" (defaults to false)");
756806
System.err.println(" doDeletes if false, does not perform deletes");
@@ -792,13 +842,24 @@ private boolean doCommandLine(final String[] args) {
792842
printUsage(null);
793843
return false;
794844
}
845+
final String sourceUriKey = "--sourceuri=";
846+
if (cmd.startsWith(sourceUriKey)) {
847+
sourceUri = new URI(cmd.substring(sourceUriKey.length()));
848+
continue;
849+
}
795850

796851
final String sourceZkClusterKey = "--sourcezkcluster=";
797852
if (cmd.startsWith(sourceZkClusterKey)) {
798853
sourceZkCluster = cmd.substring(sourceZkClusterKey.length());
799854
continue;
800855
}
801856

857+
final String targetUriKey = "--targeturi=";
858+
if (cmd.startsWith(targetUriKey)) {
859+
targetUri = new URI(cmd.substring(targetUriKey.length()));
860+
continue;
861+
}
862+
802863
final String targetZkClusterKey = "--targetzkcluster=";
803864
if (cmd.startsWith(targetZkClusterKey)) {
804865
targetZkCluster = cmd.substring(targetZkClusterKey.length());

0 commit comments

Comments
 (0)