3131import org .apache .commons .lang3 .StringUtils ;
3232import org .apache .hadoop .conf .Configuration ;
3333import org .apache .hadoop .fs .FileSystem ;
34+ import org .apache .hadoop .fs .FileSystem ;
3435import org .apache .hadoop .hbase .DoNotRetryIOException ;
3536import org .apache .hadoop .hbase .HBaseConfiguration ;
3637import org .apache .hadoop .hbase .ReplicationPeerNotFoundException ;
5051import org .apache .hadoop .hbase .replication .ReplicationQueueStorage ;
5152import org .apache .hadoop .hbase .replication .ReplicationStorageFactory ;
5253import org .apache .hadoop .hbase .replication .ReplicationUtils ;
54+ import org .apache .hadoop .hbase .replication .regionserver .HBaseInterClusterReplicationEndpoint ;
5355import org .apache .hadoop .hbase .zookeeper .ZKClusterId ;
5456import org .apache .hadoop .hbase .zookeeper .ZKConfig ;
5557import org .apache .hadoop .hbase .zookeeper .ZKWatcher ;
@@ -73,25 +75,18 @@ public class ReplicationPeerManager implements ConfigurationObserver {
7375
7476 private final ConcurrentMap <String , ReplicationPeerDescription > peers ;
7577
76- private final String clusterId ;
77-
78- private volatile Configuration conf ;
79-
8078 // for dynamic recreating ReplicationPeerStorage.
8179 private final FileSystem fs ;
8280
8381 private final ZKWatcher zk ;
8482
85- ReplicationPeerManager (FileSystem fs , ZKWatcher zk , ReplicationPeerStorage peerStorage ,
86- ReplicationQueueStorage queueStorage , ConcurrentMap <String , ReplicationPeerDescription > peers ,
87- Configuration conf , String clusterId ) {
88- this .fs = fs ;
89- this .zk = zk ;
83+ ReplicationPeerManager (FileSystem fs , ZKWatcher zk , ReplicationPeerStorage peerStorage , ReplicationQueueStorage queueStorage ,
84+ ConcurrentMap <String , ReplicationPeerDescription > peers ) {
9085 this .peerStorage = peerStorage ;
9186 this .queueStorage = queueStorage ;
9287 this .peers = peers ;
93- this .conf = conf ;
94- this .clusterId = clusterId ;
88+ this .fs = fs ;
89+ this .zk = zk ;
9590 }
9691
9792 private void checkQueuesDeleted (String peerId )
@@ -181,12 +176,11 @@ ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerCon
181176 }
182177
183178 public void addPeer (String peerId , ReplicationPeerConfig peerConfig , boolean enabled )
184- throws ReplicationException {
179+ throws ReplicationException {
185180 if (peers .containsKey (peerId )) {
186181 // this should be a retry, just return
187182 return ;
188183 }
189- peerConfig = ReplicationPeerConfigUtil .updateReplicationBasePeerConfigs (conf , peerConfig );
190184 ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig .newBuilder (peerConfig ).build ();
191185 peerStorage .addPeer (peerId , copiedPeerConfig , enabled );
192186 peers .put (peerId , new ReplicationPeerDescription (peerId , enabled , copiedPeerConfig ));
@@ -281,25 +275,25 @@ void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
281275
282276 private void checkPeerConfig (ReplicationPeerConfig peerConfig ) throws DoNotRetryIOException {
283277 String replicationEndpointImpl = peerConfig .getReplicationEndpointImpl ();
284- ReplicationEndpoint endpoint = null ;
278+ boolean checkClusterKey = true ;
285279 if (!StringUtils .isBlank (replicationEndpointImpl )) {
280+ // try creating a instance
281+ ReplicationEndpoint endpoint ;
286282 try {
287- // try creating a instance
288- endpoint = Class .forName (replicationEndpointImpl ).asSubclass (ReplicationEndpoint .class )
289- .getDeclaredConstructor ().newInstance ();
283+ endpoint = Class .forName (replicationEndpointImpl )
284+ .asSubclass (ReplicationEndpoint .class ).getDeclaredConstructor ().newInstance ();
290285 } catch (Throwable e ) {
291286 throw new DoNotRetryIOException (
292287 "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl ,
293288 e );
294289 }
290+ // do not check cluster key if we are not HBaseInterClusterReplicationEndpoint
291+ if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint )) {
292+ checkClusterKey = false ;
293+ }
295294 }
296- // Endpoints implementing HBaseReplicationEndpoint need to check cluster key
297- if (endpoint == null || endpoint instanceof HBaseReplicationEndpoint ) {
295+ if (checkClusterKey ) {
298296 checkClusterKey (peerConfig .getClusterKey ());
299- // Check if endpoint can replicate to the same cluster
300- if (endpoint == null || !endpoint .canReplicateToSameCluster ()) {
301- checkSameClusterKey (peerConfig .getClusterKey ());
302- }
303297 }
304298
305299 if (peerConfig .replicateAllUserTables ()) {
@@ -396,25 +390,6 @@ private void checkClusterKey(String clusterKey) throws DoNotRetryIOException {
396390 }
397391 }
398392
399- private void checkSameClusterKey (String clusterKey ) throws DoNotRetryIOException {
400- String peerClusterId = "" ;
401- try {
402- // Create the peer cluster config for get peer cluster id
403- Configuration peerConf = HBaseConfiguration .createClusterConf (conf , clusterKey );
404- try (ZKWatcher zkWatcher = new ZKWatcher (peerConf , this + "check-peer-cluster-id" , null )) {
405- peerClusterId = ZKClusterId .readClusterIdZNode (zkWatcher );
406- }
407- } catch (IOException | KeeperException e ) {
408- throw new DoNotRetryIOException ("Can't get peerClusterId for clusterKey=" + clusterKey , e );
409- }
410- // In rare case, zookeeper setting may be messed up. That leads to the incorrect
411- // peerClusterId value, which is the same as the source clusterId
412- if (clusterId .equals (peerClusterId )) {
413- throw new DoNotRetryIOException ("Invalid cluster key: " + clusterKey
414- + ", should not replicate to itself for HBaseInterClusterReplicationEndpoint" );
415- }
416- }
417-
418393 public List <String > getSerialPeerIdsBelongsTo (TableName tableName ) {
419394 return peers .values ().stream ().filter (p -> p .getPeerConfig ().isSerial ())
420395 .filter (p -> p .getPeerConfig ().needToReplicate (tableName )).map (p -> p .getPeerId ())
@@ -425,21 +400,18 @@ public ReplicationQueueStorage getQueueStorage() {
425400 return queueStorage ;
426401 }
427402
428- public static ReplicationPeerManager create (FileSystem fs , ZKWatcher zk , Configuration conf ,
429- String clusterId ) throws ReplicationException {
403+ public static ReplicationPeerManager create (FileSystem fs , ZKWatcher zk , Configuration conf )
404+ throws ReplicationException {
430405 ReplicationPeerStorage peerStorage =
431406 ReplicationStorageFactory .getReplicationPeerStorage (fs , zk , conf );
432407 ConcurrentMap <String , ReplicationPeerDescription > peers = new ConcurrentHashMap <>();
433408 for (String peerId : peerStorage .listPeerIds ()) {
434409 ReplicationPeerConfig peerConfig = peerStorage .getPeerConfig (peerId );
435-
436- peerConfig = ReplicationPeerConfigUtil .updateReplicationBasePeerConfigs (conf , peerConfig );
437- peerStorage .updatePeerConfig (peerId , peerConfig );
438410 boolean enabled = peerStorage .isPeerEnabled (peerId );
439411 peers .put (peerId , new ReplicationPeerDescription (peerId , enabled , peerConfig ));
440412 }
441413 return new ReplicationPeerManager (fs , zk , peerStorage ,
442- ReplicationStorageFactory .getReplicationQueueStorage (zk , conf ), peers , conf , clusterId );
414+ ReplicationStorageFactory .getReplicationQueueStorage (zk , conf ), peers );
443415 }
444416
445417 /**
@@ -455,7 +427,6 @@ private boolean isStringEquals(String s1, String s2) {
455427
456428 @ Override
457429 public void onConfigurationChange (Configuration conf ) {
458- this .conf = conf ;
459430 this .peerStorage = ReplicationStorageFactory .getReplicationPeerStorage (fs , zk , conf );
460431 }
461432}
0 commit comments