Skip to content

Commit d8636e5

Browse files
committed
HBASE-27213 Add support for claim queue operation (#4708)
Signed-off-by: Xin Sun <ddupgs@gmail.com>
1 parent e93127c commit d8636e5

File tree

8 files changed

+258
-26
lines changed

8 files changed

+258
-26
lines changed

hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,7 @@ message UpdatePeerConfigStateData {
515515

516516
message RemovePeerStateData {
517517
optional ReplicationPeer peer_config = 1;
518+
repeated int64 ongoing_assign_replication_queues_proc_ids = 2;
518519
}
519520

520521
message EnablePeerStateData {
@@ -714,9 +715,8 @@ message ModifyColumnFamilyStoreFileTrackerStateData {
714715
}
715716

716717
enum AssignReplicationQueuesState {
717-
ASSIGN_REPLICATION_QUEUES_PRE_CHECK = 1;
718-
ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 2;
719-
ASSIGN_REPLICATION_QUEUES_CLAIM = 3;
718+
ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 1;
719+
ASSIGN_REPLICATION_QUEUES_CLAIM = 2;
720720
}
721721

722722
message AssignReplicationQueuesStateData {

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Iterator;
2424
import java.util.List;
2525
import java.util.Set;
26+
import java.util.stream.Collectors;
2627
import org.apache.hadoop.hbase.ServerName;
2728
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
2829
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
@@ -102,8 +103,12 @@ private void addMissingQueues(MasterProcedureEnv env) throws ReplicationExceptio
102103
}
103104

104105
private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException {
106+
Set<String> existingPeerIds = env.getReplicationPeerManager().listPeers(null).stream()
107+
.map(ReplicationPeerDescription::getPeerId).collect(Collectors.toSet());
105108
ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
106-
List<ReplicationQueueId> queueIds = storage.listAllQueueIds(crashedServer);
109+
// filter out replication queue for deleted peers
110+
List<ReplicationQueueId> queueIds = storage.listAllQueueIds(crashedServer).stream()
111+
.filter(q -> existingPeerIds.contains(q.getPeerId())).collect(Collectors.toList());
107112
if (queueIds.isEmpty()) {
108113
LOG.debug("Finish claiming replication queues for {}", crashedServer);
109114
// we are done
@@ -130,10 +135,6 @@ protected Flow executeFromState(MasterProcedureEnv env, AssignReplicationQueuesS
130135
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
131136
try {
132137
switch (state) {
133-
case ASSIGN_REPLICATION_QUEUES_PRE_CHECK:
134-
// TODO: reserved for implementing the fencing logic with Add/Remove/UpdatePeerProcedure
135-
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES);
136-
return Flow.HAS_MORE_STATE;
137138
case ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES:
138139
addMissingQueues(env);
139140
retryCounter = null;
@@ -183,7 +184,7 @@ protected int getStateId(AssignReplicationQueuesState state) {
183184

184185
@Override
185186
protected AssignReplicationQueuesState getInitialState() {
186-
return AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_PRE_CHECK;
187+
return AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES;
187188
}
188189

189190
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ protected abstract void prePeerModification(MasterProcedureEnv env)
7474
* update the peer storage.
7575
*/
7676
protected abstract void postPeerModification(MasterProcedureEnv env)
77-
throws IOException, ReplicationException;
77+
throws IOException, ReplicationException, ProcedureSuspendedException;
7878

7979
protected void releaseLatch(MasterProcedureEnv env) {
8080
ProcedurePrepareLatch.releaseLatch(latch, this);

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,17 @@
1818
package org.apache.hadoop.hbase.master.replication;
1919

2020
import java.io.IOException;
21+
import java.util.Arrays;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.stream.Collectors;
2125
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
2226
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
2327
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
28+
import org.apache.hadoop.hbase.procedure2.Procedure;
29+
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
2430
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
31+
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
2532
import org.apache.hadoop.hbase.replication.ReplicationException;
2633
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
2734
import org.apache.yetus.audience.InterfaceAudience;
@@ -40,6 +47,8 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
4047

4148
private ReplicationPeerConfig peerConfig;
4249

50+
private List<Long> ongoingAssignReplicationQueuesProcIds = Collections.emptyList();
51+
4352
public RemovePeerProcedure() {
4453
}
4554

@@ -64,15 +73,43 @@ protected void prePeerModification(MasterProcedureEnv env) throws IOException {
6473
@Override
6574
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
6675
env.getReplicationPeerManager().removePeer(peerId);
76+
// record ongoing AssignReplicationQueuesProcedures after we update the peer storage
77+
ongoingAssignReplicationQueuesProcIds = env.getMasterServices().getMasterProcedureExecutor()
78+
.getProcedures().stream().filter(p -> p instanceof AssignReplicationQueuesProcedure)
79+
.filter(p -> !p.isFinished()).map(Procedure::getProcId).collect(Collectors.toList());
6780
}
6881

6982
private void removeRemoteWALs(MasterProcedureEnv env) throws IOException {
7083
env.getMasterServices().getSyncReplicationReplayWALManager().removePeerRemoteWALs(peerId);
7184
}
7285

86+
private void checkAssignReplicationQueuesFinished(MasterProcedureEnv env)
87+
throws ProcedureSuspendedException {
88+
if (ongoingAssignReplicationQueuesProcIds.isEmpty()) {
89+
LOG.info("No ongoing assign replication queues procedures when removing peer {}, move on",
90+
peerId);
91+
}
92+
ProcedureExecutor<MasterProcedureEnv> procExec =
93+
env.getMasterServices().getMasterProcedureExecutor();
94+
long[] unfinishedProcIds =
95+
ongoingAssignReplicationQueuesProcIds.stream().map(procExec::getProcedure)
96+
.filter(p -> p != null && !p.isFinished()).mapToLong(Procedure::getProcId).toArray();
97+
if (unfinishedProcIds.length == 0) {
98+
LOG.info(
99+
"All assign replication queues procedures are finished when removing peer {}, move on",
100+
peerId);
101+
} else {
102+
throw suspend(env.getMasterConfiguration(), backoff -> LOG.info(
103+
"There are still {} pending assign replication queues procedures {} when removing peer {}, sleep {} secs",
104+
unfinishedProcIds.length, Arrays.toString(unfinishedProcIds), peerId, backoff / 1000));
105+
}
106+
}
107+
73108
@Override
74109
protected void postPeerModification(MasterProcedureEnv env)
75-
throws IOException, ReplicationException {
110+
throws IOException, ReplicationException, ProcedureSuspendedException {
111+
checkAssignReplicationQueuesFinished(env);
112+
76113
if (peerConfig.isSyncReplication()) {
77114
removeRemoteWALs(env);
78115
}
@@ -94,6 +131,7 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO
94131
if (peerConfig != null) {
95132
builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
96133
}
134+
builder.addAllOngoingAssignReplicationQueuesProcIds(ongoingAssignReplicationQueuesProcIds);
97135
serializer.serialize(builder.build());
98136
}
99137

@@ -104,5 +142,6 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
104142
if (data.hasPeerConfig()) {
105143
this.peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig());
106144
}
145+
ongoingAssignReplicationQueuesProcIds = data.getOngoingAssignReplicationQueuesProcIdsList();
107146
}
108147
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
236236
*/
237237
void init() throws IOException {
238238
for (String id : this.replicationPeers.getAllPeerIds()) {
239-
addSource(id);
239+
addSource(id, true);
240240
}
241241
}
242242

@@ -256,7 +256,7 @@ public void addPeer(String peerId) throws IOException {
256256
throw new IOException(e);
257257
}
258258
if (added) {
259-
addSource(peerId);
259+
addSource(peerId, false);
260260
}
261261
}
262262

@@ -322,11 +322,16 @@ private ReplicationSourceInterface createSource(ReplicationQueueData queueData,
322322
/**
323323
* Add a normal source for the given peer on this region server. Meanwhile, add new replication
324324
* queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal
325-
* group and do replication
325+
* group and do replication.
326+
* <p/>
327+
* We add a {@code init} parameter to indicate whether this is part of the initialization process.
328+
* If so, we should skip adding the replication queues as this may introduce dead lock on region
329+
* server start up and hbase:replication table online.
326330
* @param peerId the id of the replication peer
331+
* @param init whether this call is part of the initialization process
327332
* @return the source that was created
328333
*/
329-
void addSource(String peerId) throws IOException {
334+
void addSource(String peerId, boolean init) throws IOException {
330335
ReplicationPeer peer = replicationPeers.getPeer(peerId);
331336
if (
332337
ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
@@ -351,11 +356,16 @@ void addSource(String peerId) throws IOException {
351356
NavigableSet<String> wals = new TreeSet<>();
352357
wals.add(walPath.getName());
353358
walsByGroup.put(walPrefixAndPath.getKey(), wals);
354-
// Abort RS and throw exception to make add peer failed
355-
// TODO: can record the length of the current wal file so we could replicate less data
356-
abortAndThrowIOExceptionWhenFail(
357-
() -> this.queueStorage.setOffset(queueId, walPrefixAndPath.getKey(),
358-
new ReplicationGroupOffset(walPath.getName(), 0), Collections.emptyMap()));
359+
if (!init) {
360+
// Abort RS and throw exception to make add peer failed
361+
// Ideally we'd better use the current file size as offset so we can skip replicating
362+
// the data before adding replication peer, but the problem is that the file may not end
363+
// at a valid entry's ending, and the current WAL Reader implementation can not deal
364+
// with reading from the middle of a WAL entry. Can improve later.
365+
abortAndThrowIOExceptionWhenFail(
366+
() -> this.queueStorage.setOffset(queueId, walPrefixAndPath.getKey(),
367+
new ReplicationGroupOffset(walPath.getName(), 0), Collections.emptyMap()));
368+
}
359369
src.enqueueLog(walPath);
360370
LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId());
361371
}
@@ -794,9 +804,15 @@ public void postLogRoll(Path newLog) throws IOException {
794804
* @return {@code true} means we should replicate the given {@code wal}, otherwise {@code false}.
795805
*/
796806
private boolean shouldReplicate(ReplicationGroupOffset offset, String wal) {
797-
if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
807+
// skip replicating meta wals
808+
if (AbstractFSWALProvider.isMetaFile(wal)) {
798809
return false;
799810
}
811+
// if no offset or the offset is just a place marker, replicate
812+
if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
813+
return true;
814+
}
815+
// otherwise, compare the timestamp
800816
long walTs = AbstractFSWALProvider.getTimestamp(wal);
801817
long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal());
802818
if (walTs < startWalTs) {
@@ -891,7 +907,6 @@ Comparator.<Path, Long> comparing(p -> AbstractFSWALProvider.getTimestamp(p.getN
891907
LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(),
892908
groupOffset);
893909
}
894-
walFilesPQ.add(file);
895910
}
896911
// the method is a bit long, so assign it to null here to avoid later we reuse it again by
897912
// mistake, we should use the sorted walFilesPQ instead

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public void testClaim() throws Exception {
156156
hbaseAdmin.enableReplicationPeer(PEER_ID3);
157157

158158
EMPTY = false;
159-
// wait until the SCP finished, ClaimReplicationQueuesProcedure is a sub procedure of SCP
159+
// wait until the SCP finished, AssignReplicationQueuesProcedure is a sub procedure of SCP
160160
UTIL1.waitFor(30000, () -> master.getProcedures().stream()
161161
.filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess));
162162

0 commit comments

Comments
 (0)